1. 概述
本文分兩個章節講解MQTT相關的知識,第一部份主要講解MQTT的原理和相關配置,第二個章節主要講和Spring boot的integration相結合代碼的具體實現,如果想快速實現功能,可直接跳過第一章節查看第二章講。
1.1 MQTT搭建
為了實現MQTT通訊,服務端可以使用市面上常見的軟件進行安裝,推薦EMQX,他有配套的web頁面,并且有跨平臺的客戶端MQTTX方便測試,當然本文主要內容就是講解如何利用Spring boot自己寫一個MQTT的客戶端進行消息發布和消息訂閱。這里所說的客戶端MQTTX只是用來進行MQTT消息發布訂閱測試,并不能和業務代碼結合一起使用,結合業務內容的MQTT客戶端還需自己編寫,下個大章節有介紹。?
1.1.1 MQTT服務端部署
EMQX是一款實現了MQTT協議的,開源的MQTT消息代理軟件。MQTT定義了消息通訊的規則和流程,而EMQX則是遵循這些規則的軟件,使得設備能夠依據MQTT協議進行有效通訊。
官網:https://www.emqx.com/zh
安裝步驟:
1). 下載?
wget https://www.emqx.com/zh/downloads/enterprise/5.8.5/emqx-enterprise-5.8.5-ubuntu24.04-amd64.deb?
2)安裝
sudo apt install ./emqx-enterprise-5.8.5-ubuntu24.04-amd64.deb
3)啟動
sudo systemctl start emqx
4)查看
sudo systemctl status emqx?
常用端口如下:
1883 TCP端口
8083 WebSocket端口
8084 WebSocket Secure端口
8883 SSL/TLS端口
18083 Broker的Dashboard訪問端口
EMQX提供了一個內置的管理控制臺,即EMQX Dashboard 方便用戶通過web進行管理和監控EMQX集群,并配置和使用各項功能。
dashboard網址:localhost:18083? admin/public
1.1.2 MQTT客戶端部署
MQTTX是EMQX開源的一款跨平臺MQTT客戶端工具,包含三種類型工具:
MQTTX Desktop:MQTTX Desktop 是一款跨平臺的MQTT桌面客戶端工具
MQTTX CLI:MQTTX CLI是EMQ開源的一款MQTT5.0命令行客戶端工具
MQTT Web: MQTTX Web是一款基于瀏覽器訪問客戶端工具
官網:https://mqttx.app/zh
后續用Spring boot自己寫客戶端,和1.1.1章節安裝的服務端進行連接通訊測試,最好安裝一下這個客戶端,這樣比較好測試,用這個客戶端作為發布者發數據,用java寫代碼作為訂閱端收數據,反之亦然。
1.2 MQTT報文
剩余長度:剩余長度指示了當前報文剩余部分的字節數,也就是可變報頭和有效載荷這兩部份的長度。
報文的總長度:固定報頭的長度 + 剩余長度
有效載荷:在publish報文中,payload用于承載具體的應用消息內容,這也是publish報文最核心的功能。在subscribe報文中,payload包含了想要訂閱的主題以及對應的訂閱選項,這也是subscribe報文最主要的功能。
1.3?Qos介紹
Qos 三個常見取值的應用場景:
0:可能會丟數據,消息丟失的頻率依賴所處的網絡環境,傳遞效率最高,傳輸一些高頻且不那么重要的數據,比如周期性更新傳感器數據。
即發即棄,不需要等待確認,不需要存儲和重傳,接收端永遠不會接收到重復報文。
1: 保證消息到達不丟失,但可能會導致消息重復,傳輸一些較為重要的數據,比如下達關鍵指令。
如果發送報文失敗或者應答報文失敗,都會導致報文重傳,應答報文失敗會導致重復數據。packetID是本次報文的唯一標識,DUP為是否重傳的標識。
如果客戶端收到 DUP=1 的消息,但第一次接收的消息已經處理完畢并刪除了 Packet ID 記錄,那么 DUP 這個標志本身就沒法幫助客戶端去重。
?
2: 既可以保證消息到達,也可以保證消息不會重復,但傳輸成本最高,在金融、航空等行業場景下使用。通過對packet ID進行標記,保證了消息傳輸不重復。
packet ID需要進行鎖定和釋放放回,而不是一直生成新的:
防止 Packet ID 過快消耗:MQTT 規定 Packet ID 范圍為 1~65535,如果不控制使用,短時間內可能會用盡。
保證 ID 唯一性:如果 ID 生成過快,可能會導致不同的 PUBLISH 消息誤用相同的 ID,從而導致數據錯誤。
確保 PUBACK 邏輯正確:QoS 1 需要等到 PUBACK 確認 后才能釋放 ID,避免 ID 污染。
1.4 主題
1.4.1 主題通配符
MQTT主題通配符包括單層通配符 + 以及多層通配符 #,主要用于客戶端一次訂閱多個主題
單層通配符:加號("+")是用于單個主題層級匹配的通配符,在使用單層通配符時,單層通配符必須占據整個層級。
test/+ : test/1 , test/2, test/any
test/+/topic : test/1/topic , test/2/topic ,test/any/topic
多層通配符: 符號("#")用于匹配主題中任意層級的通配符,多層通配符表示它的父級和任意數量的子層級,在使用多層通配符時,它必須占據整個層級并且必須時主題的最后一個字符。
# : 匹配所有主題,用于搭建MQTT服務端集群用到
test/topic/# : test/topic/1 , test/topic/1/2
1.4.2 系統主題
以$SYS/ 開頭的主題為系統主題,系統主題主要是用于獲取MQTT服務器自身運行狀態,消息統計,客戶端上下線事件等數據。
集群狀態信息
主題 | 說明 |
---|---|
$SYS/brokers | 集群節點列表 |
$SYS/brokers/${node}/version | EMQX 版本 |
$SYS/brokers/${node}/uptime | EMQX 運行時間 |
$SYS/brokers/${node}/datetime | EMQX 系統時間 |
$SYS/brokers/${node}/sysdescr | EMQX 系統信息 |
客戶端上下線事件
$SYS 主題前綴:$SYS/brokers/${node}/clients/?
主題 (Topic) | 說明 |
---|---|
${clientid}/connected | 上線事件。當任意客戶端上線時,EMQX 就會發布該主題的消息 |
${clientid}/disconnected | 下線事件。當任意客戶端下線時,EMQX 就會發布該主題的消息 |
?更多內容請參考官方文檔:
系統主題 | EMQX 5.0 文檔
1.5 會話介紹
????????MQTT客戶端和MQTT服務器之間的連接被稱為會話,每個MQTT客戶端都可以啟動一個或多個會話,通過會話可以實現客戶端和服務器之間的消息傳遞。服務端使用client ID來唯一標識每個會話,如果客戶端想要在連接時復用之前的會話,那么必須使用與此前一致的client ID
1.5.1 clean start?參數配置
clean start :用于指示客戶端在和服務器建立連接的時候應該嘗試恢復之前的會話還是直接創建全新的會話。
等于0: 服務端存在一個關聯此客戶端標識符client ID的會話,服務端必須基于此會話的狀態恢復與客戶端的通信,之前的訂閱信息會再次綁定,并且會接收到客戶端斷開時發布者發布的消息。如果不存在任何關聯此客戶端標識符的會話,服務端必須創建一個新的會話。
等于1: 客戶端和服務端必須丟棄任何一存在的會話,并開始一個新的會話。
1.5.2 session expiry interval?參數配置
session expiry interval: 決定了會話狀態數據在服務端的存儲時長。
沒有指定此屬性或設置為0:?表示會話將在網絡連接斷開時立即結束。
設置為一個大于0的值:則表示會話將在網絡連接斷開的多少秒之后過期。
設置為0xFFFFFFF:session expiry interval 屬性能夠設置的最大值時,表示會話數據永不過期。
1.6 消息詳解
1.6.1 保留消息
普通消息:普通消息在發送之前所對應的主題如果不存在訂閱者,普通消息MQTT服務器會直接將其丟棄。
保留消息:保留消息可以保留MQTT服務器中,任何新的訂閱者訂閱與該保留消息中的主題匹配的主題時,都會立即接收到該消息,即使這個消息是在他們訂閱主題之前發布的。
常用場景:
a. 智能家居設備的狀態只有在變更時才會上報,但是控制端需要在上線后就能獲取到設備的狀態;
b. 傳感器上報數據的間隔時間太長,但是訂閱者需要在訂閱后立即獲取到最新的數據;
c. 傳感器的版本號,序列號等不經常變更的屬性,可在上線后發布一條保留消息告知后續的所有訂閱者。
注意:
a. 發布者發布保留消息,保留消息針對某一個主題,最多只能發布一個保留消息,即最后的一條保留消息。
b. 保留消息存儲在服務端的默認存儲方式是內存存儲,可以設置成磁盤存儲。
c. 通過發送一條空的保留消息進行覆蓋上一條保留消息,達成保留消息刪除的效果
d. 通過dashboard頁面進行刪除按鈕刪除
1.6.2 消息過期時間
?設置消息過期時間后,如果訂閱者在消息過期前連接并訂閱了相關主題,則能夠收到消息;否則,將無法收到消息。當訂閱者收到設置過期時間的消息時,消息會攜帶過期時間信息,根據接收數據的當前時間事實計算而得。
1.6.3 遺囑消息
在MQTT 中,客戶端可以在連接時在服務端注冊一個遺囑消息,當客戶端意外斷開連接,服務端會向其他訂閱了相應主題的客戶端發送此遺囑消息。這些訂閱者可以向用戶發送通知,切換備用設備等。正常關閉斷開不會觸發遺囑消息。
will delay interval 這個屬性決定了服務端與發布者網絡連接斷開后多久發布遺囑消息,單位秒。如果發布者在延遲時間內恢復連接,遺囑消息將不會被發布。這是為了避免發布者連續短暫網絡反復中斷而頻繁發布遺囑消息。
如果會話時間快要過期了,但遺囑消息延遲時間還未到達,此時也會發布遺囑消息。
1.6.4 延遲發布
延遲發布主題格式 $delayed/{DelayInterval}/{TopicName}?
$delayed:使用$delayed作為主題的前綴的消息都將被視為需要延遲發布的消息
DelayInterval:延遲發布的時間間隔,單位秒,允許最大值4294967秒(50天)
TopicName:MQTT消息主題名稱
1.7 訂閱詳解?
1.7.1 訂閱配置
1)Qos
服務端在向訂閱端發送消息時可以使用的最大Qos等級
情況1: 服務端支持的最大Qos < 客戶端訂閱時請求的最大Qos
服務端將無法滿足客戶端的要求,這時服務端就會通過訂閱的響應報文SUBACK告知訂閱端最終授予的最大Qos等級,訂閱端可以自行評估是否接受并繼續通信,此情況訂閱端依然能收到消息,只不過Qos不是訂閱端配置的等級,而是發布端配置的等級,降級處理。
情況2:訂閱時請求的最大Qos < 消息發布時的Qos
為了盡可能地投遞消息,服務端不會忽略這些消息,而是會在轉發時對這些消息對Qos進行降級處理。
2)no local
被用在橋接場景中,橋接的本質是兩個MQTT server建立一個MQTT連接,然后相互訂閱一些主題,server將客戶端的消息轉發給另一個server,而另一個server則可以將消息轉發給它的客戶端。為了避免server相互無限循環轉發的轉發風暴,兩個server均將此選項設置為1即可避免。
等于0(默認): 服務端可以將消息轉發給發布這個消息的客戶端
等于1: 服務端不可以將消息轉發給發布這個消息的客戶端,禁止本地轉發?
3)retain as published (RAP)
為了解決橋接場景下的問題,當server A將保留消息轉發給server B時,由于消息中的retain標識被刪除,server B將不會知道這原本是一條保留消息,自然不會再存儲它,這就導致了保留消息無法跨橋接使用。
等于0(默認):服務端在向此訂閱轉發應用消息時需要清楚消息中的retain標識不變
等于1: 服務端在向此訂閱轉發應用消息時需要保持消息中的retain標識不變
4)retain handling
雖然發布者有配置保留消息,訂閱者可以選擇是否接受這個保留消息,這個配置是被用來向服務端指示當訂閱建立時,是否需要發送保留消息。
等于0(默認):表示只要訂閱建立,就發送保留消息
等于1:表示只有建立全新的訂閱而不是重復訂閱時,才發送保留消息
等于2:表示訂閱建立時不要發送保留消息
1.7.2 共享訂閱
普通訂閱者:發布者每發布一條消息時,所有匹配的訂閱端都會收到該消息的副本,當某個訂閱者的消費速度無法跟上消息的生產速度時,broker沒辦法將其中一部份消息分流到其他訂閱端中來分擔壓力,這使得訂閱端容易成為整個消息系統的性能瓶頸。
共享訂閱者:共享訂閱者可以均衡的分配消息負載,各個客戶端共享一個訂閱,每個匹配該訂閱的消息都會有一個副本投遞給其中一個客戶端。提高了吞吐量,帶來了高可用,即使共享訂閱組中的一個客戶端斷開連接或發生故障,其他客戶端仍然可以繼續處理消息。
帶群組共享訂閱:通過在原始主題前添加 $share/<group-name> 前綴為分組的訂閱者啟用共享訂閱。組名可以是任意字符串。broker同時將消息轉發給不同的組,屬于同一組的訂閱者可以使用負載均衡接收消息。
?不帶群組共享訂閱:以$queue/為前綴的共享訂閱是不帶群組的共享訂閱,它是帶群組共享訂閱的特例,可以理解為所有共享訂閱者都在一個訂閱群組中。
負載均衡算法:
1. 隨機(random):在共享訂閱組內隨機選擇一個會話發送消息(推薦)
2. 輪詢(round robin):在共享訂閱組內按順序選擇一個會話發送消息,循環往復
3. 哈希(hash):基于某個字段的哈希結果來分配
4.粘性(sticky):在共享訂閱組內隨機選擇一個會話發送消息,此后保持這一選擇,直到該會話結束再重復這一過程,有點像是備用機的感覺。
5. 本地優先(local):隨機選擇,但優先選擇與消費的發布者同處于同一節點的會話,如果不存在這樣的會話,則退化為普通的隨機策略。?
1.7.3 排它訂閱
排它訂閱允許對主題進行互斥訂閱,一個主題同一時刻僅被允許存在一個訂閱者,在當前訂閱者未取消訂閱前,其他訂閱者都將無法訂閱對應主題。要進行排它訂閱,需要為主題名稱添加:
$exclusive/ 前綴
注意:
1. broker端需要開啟排它訂閱
2. 針對訂閱者進行設置,發布者正常配置topic即可
3. 即使有訂閱者對topic進行了排它訂閱,其他訂閱者依然可以對此topic進行正常訂閱,排它訂閱僅針對另一個排它訂閱生效排它。
1.7.4?自動訂閱
通過dashboard設置自動訂閱主題,所有客戶端包括發布者都會收到這個主題的消息
2.? Spring boot 實戰
2.1 準備工作
2.1.1 添加依賴
<!-- Spring Integration Core --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId>
<!-- <version>5.5.10</version>--></dependency>
<!-- Spring Integration MQTT -->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.10</version>
</dependency>
注意:這里的spring-integration-core千萬別指定版本號,不然在開發中,會遇到接口加了@MessagingGateway注解,接口旁邊沒有bean的標志,無法被Spring管理和注入使用。
?2.1.2 添加配置
spring:# Mqtt配置mqtt:username:password:url: tcp://192.168.1.214:1883subClientId: wuLang_subClient_01subTopic: worker/alert # 訂閱多個主題 用逗號隔開pubTopic: worker/locationpubClientId: wuLang_pubClient_01
2.2 創建配置類
2.2.1 參數配置類
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;/*** @Author: HarryLin* @Date: 2025/3/20 14:32* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttPropertiesConfig {private String username;private String password;private String url;private String subClientId;private String subTopic;private String pubClientId;private String pubTopic;
}
2.2.2 創建連接工廠?
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;/*** @Author: HarryLin* @Date: 2025/3/20 14:40* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@Configuration
public class MqttConfiguration {@Autowiredprivate MqttPropertiesConfig mqttPropertiesConfig;/** 創建連接工廠 **/@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(){DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true); //設置新會話options.setUserName(mqttPropertiesConfig.getUsername());options.setPassword(mqttPropertiesConfig.getPassword().toCharArray());options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()});factory.setConnectionOptions(options);return factory;}}
2.2.3 配置入站適配器
import com.wulang.pnt.handler.mqttservice.ReceiverMessageHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** @Author: HarryLin* @Date: 2025/3/20 14:54* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@Configuration
public class MqttInboundConfiguration {@Autowiredprivate MqttPropertiesConfig mqttPropertiesConfig;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;//消息通道@Beanpublic MessageChannel messageInboundChannel(){return new DirectChannel();}/*** 配置入站適配器* 作用: 設置訂閱主題,以及指定消息的通道 等相關屬性* */@Beanpublic MessageProducer messageProducer(){MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(mqttPropertiesConfig.getUrl(),mqttPropertiesConfig.getSubClientId(),mqttPahoClientFactory,mqttPropertiesConfig.getSubTopic().split(","));mqttPahoMessageDrivenChannelAdapter.setQos(1);mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());return mqttPahoMessageDrivenChannelAdapter;}/** 指定處理消息來自哪個通道 */@Bean@ServiceActivator(inputChannel = "messageInboundChannel")public MessageHandler messageHandler(){return receiverMessageHandler;}
}
2.2.4 配置出站適配器
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** @Author: HarryLin* @Date: 2025/3/20 15:46* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@Configuration
@Slf4j
public class MqttOutboundConfiguration {@Autowiredprivate MqttPropertiesConfig mqttPropertiesConfig;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;// 消息通道@Beanpublic MessageChannel mqttOutboundChannel(){return new DirectChannel();}/** 配置出站消息處理器 */@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定處理器針對哪個通道的消息進行處理public MessageHandler mqttOutboundMessageHandler(){MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttPropertiesConfig.getUrl(),mqttPropertiesConfig.getPubClientId(),mqttPahoClientFactory);mqttPahoMessageHandler.setDefaultQos(1);mqttPahoMessageHandler.setDefaultTopic("worker/location");mqttPahoMessageHandler.setAsync(true);return mqttPahoMessageHandler;}}
?2.2.5 配置出站網關
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** @Author: HarryLin* @Date: 2025/3/20 17:06* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload );
}
2.3 收發數據
2.3.1 訂閱數據
在入站適配器中配置了訂閱主題,MQTT服務端等信息后,并將以下類注入到入站適配器到消息處理方法中(2.2.3 最后一個方法),在這個方法中就能得到訂閱消息。只需用戶在此方法中寫消息處理方法即可。
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;import java.util.Objects;/*** @Author: HarryLin* @Date: 2025/3/20 15:24* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@Service
@Slf4j
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException{Object payload = message.getPayload();MessageHeaders headers = message.getHeaders();String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",payload,receivedTopic,receivedQos,timestamp);}
}
2.3.2 發布消息
封裝出站網關,調用如下方法即可發送消息
import com.wulang.pnt.config.mqtt.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;/*** @Author: HarryLin* @Date: 2025/3/20 16:16* @Company: 北京紅山信息科技研究院有限公司* @Email: linyun@***.com.cn**/
@Service
public class MqttMessageSender{@Autowiredprivate MqttGateway mqttGateway;public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) {mqttGateway.sendMsgToMqtt(topic,payload);}public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) {mqttGateway.sendMsgToMqtt(topic,qos,payload);}
}
@SpringBootTest
@Slf4j
public class MqttClientTest {@Autowiredprivate MqttGateway mqttGateway;@Testpublic void sendMsg(){mqttGateway.sendMsgToMqtt("worker/location","hello mqtt spring boot");log.info("message is send");}