????已經有很長一段時間沒有寫代碼,為了不讓自己的代碼技能有所下降所以針對BeetleX擴展了一個MQTT協議來保持自己的代碼設計和編寫能力。接下來簡單介紹一下如何使用BeetleX.MQTT來構建對應的TCP或WebSocket服務。
????以下實現是針對MQTT 3.1.1版本,協議的實現也并不復雜就不介紹了,可以通過關注 https://github.com/beetlex-io/mqtt了解具體代碼。接下來分享使用BeetleX.MQTT實現TCP和Websocket服務
TCP服務
using BeetleX.MQTT.Messages;
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
namespace BeetleX.MQTT.Server
{class Program{private static ServerBuilder<MQTTApplication, MQTTUser, MQTTPacket> server;static void Main(string[] args){server = new ServerBuilder<MQTTApplication, MQTTUser, MQTTPacket>();server.ConsoleOutputLog = true;server.SetOptions(option =>{option.DefaultListen.Port = 9090;option.DefaultListen.Host = "127.0.0.1";option.LogLevel = EventArgs.LogType.Trace;}).OnMessageReceive<CONNECT>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.NetSession.RemoteEndPoint} connect name:{e.Message.UserName} password:{e.Message.Password}");e.Session.UserName = e.Message.UserName;e.Session.ID = e.Message.ClientID;CONNACK ack = new CONNACK();e.Return(ack);}).OnMessageReceive<SUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} subscribe {e.Message}");SUBACK ack = new SUBACK();ack.Identifier = e.Message.Identifier;ack.Status = QoSType.MostOnce;e.Return(ack);e.Application.RegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<UNSUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} unsubscribe {e.Message}");UNSUBACK ack = new UNSUBACK();e.Return(ack);e.Application.UnRegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<PUBLISH>(e =>{var data = Encoding.UTF8.GetString(e.Message.PayLoadData.Array, e.Message.PayLoadData.Offset, e.Message.PayLoadData.Count);e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} publish {e.Message.Topic}@ {e.Message.Identifier} data:{data}");PUBACK ack = new PUBACK();ack.Identifier = e.Message.Identifier;e.Return(ack);e.Application.Publish(e.Message);}).OnMessageReceive<PINGREQ>(e =>{PINGRESP resp = new PINGRESP();e.Return(resp);}).OnMessageReceive(e =>{}).Run();Console.Read();}}}
WebSocket服務
using BeetleX.MQTT.Messages;
using System;
using System.Text;namespace BeetleX.MQTT.WSServer
{class Program{private static MQTTWebsocketServer<MQTTApplication, MQTTUser> mServer;static void Main(string[] args){mServer = new MQTTWebsocketServer<MQTTApplication, MQTTUser>(8081);mServer.Setting((service, options) => {options.LogLevel = EventArgs.LogType.Trace;options.LogToConsole = true;options.WebSocketFrameSerializer = new MQTTFormater();});mServer.OnMessageReceive<CONNECT>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.NetSession.RemoteEndPoint} connect name:{e.Message.UserName} password:{e.Message.Password}");e.Session.UserName = e.Message.UserName;e.Session.ID = e.Message.ClientID;CONNACK ack = new CONNACK();e.Return(ack);}).OnMessageReceive<SUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} subscribe {e.Message}");SUBACK ack = new SUBACK();ack.Identifier = e.Message.Identifier;ack.Status = QoSType.MostOnce;e.Return(ack);e.Application.RegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<UNSUBSCRIBE>(e =>{e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} unsubscribe {e.Message}");UNSUBACK ack = new UNSUBACK();e.Return(ack);e.Application.UnRegisterSubscribe(e.Message, e.Session);}).OnMessageReceive<PUBLISH>(e =>{var data = Encoding.UTF8.GetString(e.Message.PayLoadData.Array, e.Message.PayLoadData.Offset, e.Message.PayLoadData.Count);e.GetLoger(EventArgs.LogType.Info)?.Log(EventArgs.LogType.Info, e.NetSession, $"{e.Session.ID} publish {e.Message.Topic}@ {e.Message.Identifier} data:{data}");PUBACK ack = new PUBACK();ack.Identifier = e.Message.Identifier;e.Return(ack);e.Application.Publish(e.Message);}).OnMessageReceive<PINGREQ>(e =>{PINGRESP resp = new PINGRESP();e.Return(resp);}).OnMessageReceive(e =>{}).Run();Console.Read();}}
}
BeetleX
開源跨平臺通訊框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服務網關開源組件
https://beetlex-io.com