上一篇:《Spring Boot 從Socket 到Netty網絡編程(上):SOCKET 基本開發(BIO)與改進(NIO)》
前言
????????前文中我們簡單介紹了基于Socket的BIO(阻塞式)與NIO(非阻塞式)網絡編程,無疑NIO在網絡傳輸中具備更先進;但是采用Socket的NIO需要大量的優化,對于開發人員來說我們要站在巨人的肩膀上開發,所以Netty才是真愛。
? ? ? ? TCP/IP? -> Socket ->Netty 是這樣一個關系,? ? ? ? Netty是基于Socket,Socket是基于TCP/IP。
基本開發
Netty
個人總結:基于NIO,提供簡潔的API,開發者專注于業務邏輯實現,而非關注于實現實現細節。
maven
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.94.Final</version>
</dependency>
服務端 ServerBootstrap
服務
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) throws Exception {// 創建一個線程組,用于接收客戶端的連接EventLoopGroup bossGroup = new NioEventLoopGroup();// 創建數據處理線程組,用于處理已經連接的客戶端的數據讀寫操作EventLoopGroup workerGroup = new NioEventLoopGroup();try {//創建服務端的啟動對象,設置參數ServerBootstrap bootstrap = new ServerBootstrap();//設置兩個線程組boosGroup和workerGroupbootstrap.group(bossGroup, workerGroup)//設置服務端通道實現類型.channel(NioServerSocketChannel.class)//設置線程隊列得到連接個數.option(ChannelOption.SO_BACKLOG, 128)//設置保持活動連接狀態.childOption(ChannelOption.SO_KEEPALIVE, true)//使用匿名內部類的形式初始化通道對象.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//給pipeline管道設置處理器socketChannel.pipeline().addLast(new NettyServerHandler());//服務端處理器}});//給workerGroup的EventLoop對應的管道設置處理器System.out.println("服務已啟動...");//綁定端口號,啟動服務端ChannelFuture channelFuture = bootstrap.bind(6666).sync();//對關閉通道進行監聽channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
方法類
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();try {//創建bootstrap對象,配置參數Bootstrap bootstrap = new Bootstrap();//設置線程組bootstrap.group(eventExecutors)//設置客戶端的通道實現類型.channel(NioSocketChannel.class)//使用匿名內部類初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加客戶端通道的處理器ch.pipeline().addLast(new NettyClientHandler());//給pipeline管道設置處理器}});System.out.println("客戶端準備就緒");//連接服務端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();//對通道關閉進行監聽channelFuture.channel().closeFuture().sync();} finally {//關閉線程組eventExecutors.shutdownGracefully();}}}
客戶端 Bootstrap
客戶端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();try {//創建bootstrap對象,配置參數Bootstrap bootstrap = new Bootstrap();//設置線程組bootstrap.group(eventExecutors)//設置客戶端的通道實現類型.channel(NioSocketChannel.class)//使用匿名內部類初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加客戶端通道的處理器ch.pipeline().addLast(new NettyClientHandler());//給pipeline管道設置處理器}});System.out.println("客戶端準備就緒");//連接服務端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();//對通道關閉進行監聽channelFuture.channel().closeFuture().sync();} finally {//關閉線程組eventExecutors.shutdownGracefully();}}}
方法類
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;import java.io.BufferedReader;
import java.io.InputStreamReader;public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String userInput;System.out.println("請輸入消息(輸入 exit 斷開連接):");BufferedReader console = new BufferedReader(new InputStreamReader(System.in));while ((userInput = console.readLine()) != null) {ctx.writeAndFlush(Unpooled.copiedBuffer(userInput , CharsetUtil.UTF_8));if ("exit".equalsIgnoreCase(userInput)) {System.out.println("連接已關閉。");break;}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收服務端發送過來的消息ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到服務端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}
}
效果
優化加強
心跳機制
????????為了解決主從之間的服務的斷開與連接情況是否更深,所以需要用心跳做連接試驗從而在業務上做斷開重連等業務需求實現。
原理設計
- IdleStateHandler 配置心跳
- 繼承類處理心跳事件:userEventTriggered
- Pipeline中處理
?服務
方法
客戶
方法
效果
處理粘包與拆包
????????由于網絡原因可能會出來數據切割不完整的情況,為了在拆解數據時能得到完整的數據需要用一定的約束使得雙方統一成一個固定標準。
原理設計
- 配置類型:一般使用動態長度 LengthFieldBasedFrameDecoder
- 自定義加碼與解碼
- 配置到 pipeline中去
代碼
服務端
客戶端
加碼
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;public class NetyyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {if (byteBuf.readableBytes() < 4){return;}byteBuf.markReaderIndex();int dataLength = byteBuf.readInt();byte[] data = new byte[dataLength];byteBuf.readBytes(data);list.add(new String(data, "UTF-8"));}
}
解碼
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;import java.nio.charset.StandardCharsets;public class NetyyEncoder extends MessageToByteEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {byte[] bytes = s.getBytes(StandardCharsets.UTF_8);int length = bytes.length;byteBuf.writeInt(length);byteBuf.writeBytes(bytes);}
}
后記
以上代碼環境是JDK17、Spring Boot 3.X+ ,對Netty這個框架做了一些基本的介紹,當然憑這短短的一篇文章不可能做到詳盡,僅供初學都參考。整理不易,如有幫助請收藏,轉發請注明出處,關注博主不斷更新更多技術文章。