目錄
1.總體設計
2.自定義協議設計(簡單版)
3.消息類型(1字節)
4.項目結構
5.核心功能代碼
(1)pom.xml(Maven依賴)
(2)IotServer.java(服務器啟動器)
(3)IotServerInitializer.java(Pipeline初始化)
(4)DeviceChannelManager.java(設備連接管理器)
(5)model/IotMessage.java(消息實體)
(6)IotMessageDecoder.java(自定義解碼器)
(7)IotMessageEncoder.java(自定義編碼器)
(8)IotServerHandler.java(核心業務邏輯處理)
這個項目已經支持了:
6.Netty物聯網服務器常見問題及解決方案
(1)粘包/拆包問題
問題描述
解決方案
(2)高并發性能問題
問題描述
解決方案
(3)連接管理與高可用性
問題描述
解決方案
(4)協議兼容性與擴展性
問題描述
解決方案
(5)資源泄漏與穩定性
問題描述
解決方案
(6)弱網絡環境適配
問題描述
解決方案
(7)安全與加密
問題描述
解決方案
(8)監控與日志
問題描述
解決方案
1.總體設計
模塊 | 說明 |
設備連接(Login) | 設備上線時登錄認證 |
心跳檢測(Heartbeat) | 定時檢測設備是否在線 |
消息上報(Upload) | 設備主動上傳數據到平臺 |
指令下發(PushCommand) | 平臺推送命令到指定設備 |
連接管理(Channel管理) | 按 deviceId 管理連接 |
編解碼器(自定義協議) | 處理消息粘包、拆包 |
日志監控 | 記錄通信過程 |
2.自定義協議設計(簡單版)
設備和平臺約定的消息格式,統一規范通信!
+----------+--------+------------+--------------+
| 消息頭部 | 消息類型 | 設備ID長度 | 消息體長度 |
| 4字節 | 1字節 | 1字節 | 4字節 |
+----------+--------+------------+--------------+
| 設備ID (變長) | 消息體 (變長) |
+-----------------------------------------------+
3.消息類型(1字節)
類型值 | 含義 |
0x01 | 登錄請求 |
0x02 | 心跳 |
0x03 | 上報數據 |
0x04 | 平臺推送命令 |
4.項目結構
iot-netty-server/├── IotServer.java // 啟動服務器├── IotServerInitializer.java // 初始化Pipeline├── IotMessageDecoder.java // 解碼器├── IotMessageEncoder.java // 編碼器├── IotServerHandler.java // 核心業務處理├── DeviceChannelManager.java // 連接管理├── model/│ ├── IotMessage.java // 消息模型└── pom.xml // Maven依賴
5.核心功能代碼
(1)pom.xml(Maven依賴)
<dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.109.Final</version></dependency>
</dependencies>
(2)IotServer.java(服務器啟動器)
public class IotServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new IotServerInitializer());ChannelFuture f = b.bind(9000).sync();System.out.println("IoT服務器啟動成功,監聽端口9000...");f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
(3)IotServerInitializer.java(Pipeline初始化)
public class IotServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IotMessageDecoder());pipeline.addLast(new IotMessageEncoder());pipeline.addLast(new IotServerHandler());}
}
(4)DeviceChannelManager.java(設備連接管理器)
public class DeviceChannelManager {private static final Map<String, Channel> deviceChannels = new ConcurrentHashMap<>();public static void register(String deviceId, Channel channel) {deviceChannels.put(deviceId, channel);}public static void remove(Channel channel) {deviceChannels.values().removeIf(value -> value.equals(channel));}public static Channel get(String deviceId) {return deviceChannels.get(deviceId);}
}
(5)model/IotMessage.java(消息實體)
public class IotMessage {private byte type;private String deviceId;private byte[] payload;// getter、setter省略
}
(6)IotMessageDecoder.java(自定義解碼器)
public class IotMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 10) return; // 最小消息長度檢查in.markReaderIndex();int header = in.readInt();byte type = in.readByte();byte deviceIdLength = in.readByte();int payloadLength = in.readInt();if (in.readableBytes(