Netty進階
1.黏包半包
1.1.黏包
服務端代碼
public class HelloWorldServer {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {logger.error("server error !", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
客戶端代碼
public class HelloWorldClient {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 會在連接 channel 成功后,觸發active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);// 連接建立后,模擬發送數據,每次發送 16個字節 一共發送 10 次for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});// 寫入channelchannel.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {logger.error("client error!");} finally {worker.shutdownGracefully();}}
}
半包
需要對服務端 和客戶端 的代碼稍微修改下
// 設置每次接收緩沖區的大小,所以但是客戶端每次發送的是16個字節 所以可以模擬半包情況
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);// 注意 如果不生效的話,建議服務端也設置響應的緩沖區大小
// 設置發送方緩沖區大小
bootstrap.option(ChannelOption.SO_SNDBUF, 10);
1.2.滑動窗口
TCP以一個段(
segment
)為單位,每次發送一個段就需要進行一次確認應答(ACK
),為了保證消息傳輸過程的穩定性,但是這樣做的缺點就是會導致包的往返時間越長,性能就越差。
為了解決這個問題,引入窗口的概念,窗口的大小決定了無需等待應答而可以繼續發送數據的最大值
窗口實際就起到一個緩沖區的作用,同時也能起到流量控制的作用
- 圖中深色的部門表示即將要發送的數據,高亮的部分就是窗口
- 窗口內的數據才允許被發送,當應答未到達前,窗口必須停止滑動
- 如果1001 - 2000 這個段的數據ACK回來了,窗口就可以向前滑動
- 接收方也會維護一個窗口,只有落在窗口內的數據才允許接收
1.3.黏包半包現象分析
- 黏包
- 現象
- 發送 abc def 接收 abdcef
- 原因
- 應用層:接收方ByteBuf設置太大(Netty默認1024)
- 滑動窗口:假設發送方 256 bytes 表示一個完整報文,但是由于接收方處理不及時,且窗口大小足夠大,這256 bytes 字節就會緩沖在接收方的滑動窗口中,當滑動窗口緩沖了多個報文就會黏包
- Nagle算法:會造成黏包
- 現象
- 半包
- 現象:發送 abcefg 接收方 abc efg
- 原因
- 應用層:接收方ByteBuf 設置容量大小,小于實際發送的數據量
- 滑動窗口:假設接收方的窗口只剩下了,128byte,發送方的報文大小是 256 byte,這時就會放不下,只能先發送 128 byte數據,然后等待ack確認后,才能發送剩下的部門,這時就造成了半包。
- MSS限制:當發送的數據超過了MSS的限制后,會將數據切割,然后分批發送,就會造成半包
- 為什么在數據傳輸截斷存在數據分割呢?一個
TCP報文的有效數據(凈荷數據)
是有大小容量限制的,這個報文有效數據的大小就被稱為**MSS(Mixinum Segment Size) 最大報文字段長度**
。具體MSS的值會在三次握手階段進行協商,但是最大長度不會超過**1460
**個字節
- 為什么在數據傳輸截斷存在數據分割呢?一個
出現黏包半包的主要原因就是 TCP的消息沒有邊界
1.4.黏包半包解決
1.4.1.短鏈接(解決黏包)
客戶端發送完后立馬進行斷開
短鏈接并不能半包問題
短鏈接雖然能解決黏包問題,但是缺點也是很明顯的
- 連接建立開銷高,因為需要進行握手等操作。
- 頻繁的連接管理會增加服務器負擔。
- 可能導致資源浪費,如 TCP 連接的建立和釋放。
- 存在網絡擁塞風險,特別是在高并發情況下。
- 難以維護狀態,增加開發和維護的復雜性。
public class HelloWorldClient {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 短鏈接發發送for (int i = 0; i < 10; i++) {shortLinkedSend();}}/*** 短鏈接發送 測試*/private static void shortLinkedSend() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);// 設置發送方緩沖區大小bootstrap.option(ChannelOption.SO_SNDBUF, 10);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 會在連接 channel 成功后,觸發active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);// 連接建立后,模擬發送數據ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});// 發送數據ctx.writeAndFlush(buffer);// 主動斷開鏈接ctx.channel().close();}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {logger.error("client error!");} finally {worker.shutdownGracefully();}}}
1.4.2.定長解碼器
- 固定長度限制:消息長度必須是固定的,這限制了處理可變長度消息的能力。
- 資源浪費:對于短消息,會浪費網絡帶寬和系統資源。
- 消息邊界問題:無法處理不符合固定長度的消息,可能導致解碼器阻塞或消息邊界錯誤。
- 不適用于多種消息類型:無法處理多種長度不同的消息類型。
- 性能影響:對于長消息,可能會影響性能。
客戶端代碼
public static void main(String[] args) {fixedLengthDecoder();}/*** 定長解碼器 測試*/private static void fixedLengthDecoder () {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);// 設置發送方緩沖區大小bootstrap.option(ChannelOption.SO_SNDBUF, 10);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 會在連接 channel 成功后,觸發active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);// 連接建立后,模擬發送數據ByteBuf buffer = ctx.alloc().buffer(16);for (int i = 0; i < 10; i++) {String s = "hello," + new Random().nextInt(100000000);logger.error("send data:{}", s);buffer.writeBytes(fillString(16, s));}// 發送數據ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {logger.error("client error!");} finally {worker.shutdownGracefully();}}/*** 編寫要給方法 給定一個長度,和數值,* 例如長度 16 數值 abc 剩下的填充**/private static byte[] fillString(int length, String value) {if (value.length() > length) {return value.substring(0, length).getBytes();}StringBuilder sb = new StringBuilder(value);for (int i = 0; i < length - value.length(); i++) {sb.append("*");}return sb.toString().getBytes();}
服務端
服務端的代碼沒有太大改動
@Override
protected void initChannel(SocketChannel channel) throws Exception {// 在打印日志前添加了定長解碼器// 添加定長解碼器 16 消息長度必須發送方 和 接收方一致// 注意順序,必須要先解碼,然后才能打印日志channel.pipeline().addLast(new FixedLengthFrameDecoder(16));channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
1.4.3.行解碼器(分隔符)
\r \r\n
客戶端
這里的客戶端 代碼 和上面一致,我們只針對客戶端消息代碼進行修改
// 每次發送消息的結尾加上換行符
String s = "hello," + new Random().nextInt(100000000) + "\n";
服務端
用的不多
// 添加行解碼器,設置每次接收的數據大小
// 注意順序,必須要先解碼,然后才能打印日志
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
1.4.4.LTC解碼器
LengthFieldBasedFrameDecoder
方法的工作原理以及各個參數的含義:
- maxFrameLength(最大幀長度):這個參數指定了一個幀的最大長度。當接收到的幀長度超過這個限制時,解碼器會拋出一個異常。設置一個適當的最大幀長度可以防止你的應用程序受到惡意或錯誤消息的影響。
- lengthFieldOffset(長度字段偏移量):這個參數表示長度字段的偏移量,也就是在接收到的字節流中,長度字段從哪里開始的位置。通常,這個偏移量是相對于字節流的起始位置而言的。
- lengthFieldLength(長度字段長度):這個參數指定了長度字段本身所占用的字節數。在接收到的字節流中,長度字段通常是一個固定長度的整數,用來表示消息的長度。
- lengthAdjustment(長度調整值):在某些情況下,長度字段可能包括了消息頭的長度,而不是整個消息的長度。這個參數允許你進行一些調整,以便準確地計算出消息的實際長度。
- initialBytesToStrip(要剝離的初始字節數):在解碼器將幀傳遞給處理器之前,會先從幀中剝離一些字節。通常,這些字節是長度字段本身,因為處理器只需要處理消息的有效負載部分。這個參數告訴解碼器要剝離的初始字節數。
假設有一個網絡協議,它的消息格式如下:
- 消息長度字段占據前4個字節。
- 長度字段之后是實際的消息內容。
現在假設你收到了一個包含以上格式的字節流。你希望用Netty的LengthFieldBasedFrameDecoder
來解碼這個消息。
在這種情況下,你需要設置以下參數:
lengthFieldOffset
: 偏移量為0,因為長度字段從消息的開頭開始。lengthFieldLength
: 長度字段本身是4個字節。lengthAdjustment
: 在這種情況下,長度字段表示的是消息內容的長度,不包括長度字段本身,所以這個值是0。initialBytesToStrip
: 需要剝離長度字段本身,也就是4個字節。(因為用4個字節表示了字段的長度)
假設你收到的字節流如下:
[消息長度字段] [消息內容]
[0, 0, 0, 5] [72, 101, 108, 108, 111]
- 長度字段
[0, 0, 0, 5]
表示消息長度為5個字節。 - 后面的5個字節
[72, 101, 108, 108, 111]
則是實際的消息內容,代表著 “Hello”。
LengthFieldBasedFrameDecoder
將會將這個字節流解析成一條消息,其中包含了 “Hello” 這個字符串。
測試
public class TestLengthFiledDecoder {private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());public static void main(String[] args) {// 創建一個 EmbeddedChannel 并添加一個 LengthFieldBasedFrameDecoder// 該解碼器會根據長度字段的值來解碼數據// EmbeddedChannel 是一個用于測試的 Channel 實現EmbeddedChannel channel = new EmbeddedChannel(/** maxFrameLength: 最大幀長度* lengthFieldOffset: 長度字段的偏移量* lengthFieldLength: 長度字段的長度* lengthAdjustment: 長度字段的值表示的長度與整個幀的長度之間的差值(如果消息后面再加上一個長度字段,那么這個字段的值就是lengthAdjustment* sendInfo("Netty",buffer);后面再加上一個長度字段,那么這個字段的值就是lengthAdjustment) 不加會報錯* initialBytesToStrip: 解碼后的數據需要跳過的字節數*/new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),new LoggingHandler(LogLevel.DEBUG));ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();// 4 個字節內容的長度 實際內容sendInfo("Hello,World111111111111111111111111111111111", buffer);sendInfo("Hello", buffer);sendInfo("Netty",buffer);// 模擬寫入數據channel.writeInbound(buffer);}private static void sendInfo(String s, ByteBuf buffer) {byte[] bytes = s.getBytes();// 寫入內容 大端模式 寫入長度 4 個字節int length = bytes.length;buffer.writeInt(length);buffer.writeBytes(bytes);}
}