通信庫:DotNetty
封裝實現:TcpServer、TcpClient、Udp
TCP協議特性:面向連接協議;每個新連接都會創建獨立的ChannelHandler實例;TcpHandler構造函數在每次客戶端連接時觸發
UDP協議特性:無連接協議;整個服務端只創建單例UdpHandler;所有UDP數據包共享同一個處理器實例;UdpHandler構造函數僅在服務啟動時觸發一次
Server
1. 端口復用
2. 定時清理鏈路
TcpClient
1. 指定本地ip及端口
2. 自動重連
其他
1. 上線,離線,數據接收消息通知
2. 接收緩沖數據存儲
using DotNetty.Buffers; using FusionDrive.DotNetty.Util; using FusionDrive.DotNetty.Util.decode; using FusionDrive.DotNetty.Util.socket; using System.Net; using System.Text;namespace FusionDrive.DotNetty {internal class Program{static void Main(string[] args){//Test_Server();Test_Client();Console.ReadLine();}static void Test_Server(){OHMDecode ohm = new OHMDecode();PipeManager.CheckLink();var server = new CommunicationServer();//server.Listen(8801);server.Listen(8801, isTcp: true, isUdp: true, udpTargetEndPoint: new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8802));server.OnPipeOnline += async (session) =>{Logger.Info($"-> {session.RemoteAddress?.ToString()} 上線"); };server.OnPipeOffline += async (session) =>{Logger.Info($"<- {session.RemoteAddress?.ToString()} 離線");};server.OnPipeReceived += async (session, buffer) =>{// dtu模式if (session.IsFirstConnect){// 心跳包session.IsFirstConnect = false;var sessionId = Encoding.UTF8.GetString(buffer);session.UpdateSessionId(sessionId);// 下發TCP指令SendData(session.SessionId);// 清理PipeManager.ClearByteBuffer(sessionId);}else{ohm.ProtocolFormat(session.SessionId, session.ByteBuffer);}// 下發UDP指令SendData(session.SessionId, session.TargetEndPoint);SendData(session.SessionId, session.RemoteAddress);};}static void Test_Client(){AirDecode air = new AirDecode();var tcpClient = new TcpClient();tcpClient.Connect("127.0.0.1", 8801, autoConect:true);tcpClient.OnPipeOnline += async (session) =>{Logger.Info($"-> {session.RemoteAddress?.ToString()} 上線");};tcpClient.OnPipeOffline += async (session) =>{Logger.Info($"<- {session.RemoteAddress?.ToString()} 離線");};tcpClient.OnPipeReceived += async (session, buffer) =>{air.ProtocolFormat(session.SessionId, session.ByteBuffer);////tcpClient.DisConnect();};}static void SendData(string sessionId, EndPoint endPoint = null){IByteBuffer message = Unpooled.Buffer();message.WriteString("*TRG\r\n", Encoding.UTF8);var p = PipeManager.CheckSession(sessionId, out string errmsg);p.Send(message, endPoint);} } }
using DotNetty.Buffers; using System.Text;namespace FusionDrive.DotNetty.Util.decode {public class AirDecode : NettyDecode{const byte HEAD = 02;public override void ProtocolFormat(string sessionId, IByteBuffer oldBuffer){ try{if (IsHeadLost(oldBuffer, 1)) return ;// 掉包bool headok = false;// 垃圾包處理oldBuffer.MarkReaderIndex();while (oldBuffer.IsReadable()){byte head1 = oldBuffer.ReadByte();// 1字節:標識頭if (HEAD == head1){headok = true;break;}else{oldBuffer.MarkReaderIndex();}}if (!oldBuffer.IsReadable()){if (!headok) return ; // 垃圾包}// 3字節:ascii長度if (IsBagLost(oldBuffer, 3)) return ;// 掉包byte[] arrlen = new byte[3]; oldBuffer.ReadBytes(arrlen);int len = Convert.ToInt32(Encoding.ASCII.GetString(arrlen));// n字節:數據包len = len + 2;// 包含結束幀及校驗位if (IsBagLost(oldBuffer, len)) return ;// 掉包byte[] data = new byte[len]; oldBuffer.ReadBytes(data);Logger.Info($"AirDecode sessionId:{sessionId},data:{Common.ToHexString(data, data.Length, true)}");// 壓力值string pressureValue = "999";var pressureFlg = Encoding.ASCII.GetString(new byte[] { data[55] });var pressureNumber = Encoding.ASCII.GetString(new byte[] {data[56], data[57], data[58], data[59], data[60],data[61], data[62], data[63], data[64], data[65] });// 可能為----------- if (double.TryParse(pressureNumber, out double result1)){pressureNumber = result1.ToString();pressureValue = pressureFlg + pressureNumber;}Logger.Info($"AirDecode 壓力值:{pressureValue}");if (oldBuffer.IsReadable()){ProtocolFormat(sessionId, oldBuffer);// 處理粘包}else{oldBuffer.Clear();// 包完整,清理} }catch (Exception ex){Logger.Error($"AirDecode 協議解析錯誤,sessionId:{sessionId}:{ex.Message}");} }} }