參考文章 1
參考文章 2
官網API文檔
Reactor
模型
Netty模型
Netty主要基于主從Reactor多線程
模型進行了一定的修改,該模型包括以下幾個組件:
-
MainReactor
(主Reactor):負責處理客戶端的連接請求。它監聽服務器上的端口,接收客戶端的連接請求,并將請求分發給SubReactor進行處理。 -
SubReactor
(從Reactor):負責處理連接成功后的通道的IO讀寫請求。每個SubReactor負責管理一組通道,它們使用多路復用技術(如Java NIO)來監聽通道上的事件,例如可讀、可寫等事件。一般情況下,每個SubReactor都對應一個線程。 -
Worker Threads
(工作線程):負責處理非IO請求,即具體的業務邏輯處理。當有非IO請求需要處理時,這些任務會被寫入一個隊列中,等待工作線程來處理。工作線程可以是線程池中的線程,也可以是其他類型的線程。
模塊組件
Bootstrap
、ServerBootstrap
: 在Netty中,Bootstrap
和ServerBootstrap
是用于啟動和配置Netty應用程序的引導類。Bootstrap
類是用于客戶端程序的啟動引導類,ServerBootstrap
類是服務端啟動引導類。NioEventLoopGroup
:NioEventLoopGroup
是Netty中用于管理NioEventLoop
的組件,它是一個線程池,包含多個NioEventLoop
實例,它對應著主從Reactor多線程模型中Reactor。NioEventLoopGroup
負責創建、管理和分配NioEventLoop
,處理異常情況,并提供優雅關閉的機制。它是Netty實現高性能的重要組件之一。NioEventLoop
:NioEventLoop
是Netty中的核心組件,負責處理I/O事件和執行任務。它使用Selector
來監聽和處理注冊在其上的Channel
的I/O事件,同時支持異步提交和執行任務。NioEventLoop
還管理定時任務和處理異常情況,是實現高性能和事件驅動的重要組成部分。
什么是Selector
-
Selector
:Netty基于Selector
對象實現了I/O多路復用。通過將多個Channel
注冊到一個Selector
中,并使用一個線程來監聽和處理這些Channel
的事件,可以高效地管理多個Channel
。Selector
會自動不斷地查詢這些注冊的Channel
,以檢查它們是否有已就緒的I/O事件。 -
Channel
:在Netty中,Channel
表示一個開放的網絡連接,可以用于讀取、寫入和處理網絡數據。它是網絡通信的基本單元,負責處理底層的數據傳輸和事件通知。NioSocketChannel
:異步的客戶端 TCP Socket 連接NioServerSocketChannel
:異步的服務器端 TCP Socket 連接NioDatagramChannel
:異步的 UDP 連接NioSctpChannel
:異步的客戶端 Sctp 連接NioSctpServerChannel
:異步的 Sctp 服務器端連接
-
ChannelHandler
:在Netty中,ChannelHandler
是一個接口,用于處理I/O事件或攔截I/O操作,并將其轉發到ChannelPipeline
中的下一個處理程序。ChannelHandler
是Netty的核心組件之一,它負責處理各種事件,如連接建立、數據讀寫、異常發生等。ChannelInboundHandler
:用于處理入站I/O事件ChannelOutboundHandler
:用于處理出站I/O操作
-
ChannelHandlerContext
:ChannelHandlerContext
保存了與特定Channel
相關的所有上下文信息,同時關聯一個ChannelHandler
對象,并提供了訪問Channel
、ChannelHandler
和ChannelPipeline
的方法。 -
ChannelPipline
: 它是一個保存ChannelHandler
的列表,用于處理或攔截Channel
的入站事件和出站操作。ChannelPipeline
實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式以及Channel
中各個的ChannelHandler
如何相互交互。
在 Netty 中每個Channel
都有且僅有一個ChannelPipeline
與之對應, 它們的組成關系如下:
一個Channel
包含了一個ChannelPipeline
,而ChannelPipeline
中維護了一個由ChannelHandlerContext
組成的雙向鏈表。入站事件和出站事件在這個雙向鏈表中進行傳遞,入站事件從鏈表的head
往后傳遞到最后一個入站的ChannelHandler
,出站事件從鏈表的tail
往前傳遞到最前一個出站的ChannelHandler
,兩種類型的ChannelHandler
互不干擾。通過這種設計,ChannelPipeline
實現了事件的順序傳遞和處理。 -
Future
、ChannelFuture
: Netty中的IO操作都是異步的,這意味著在發起一個IO操作后,無需等待其完成就可以繼續執行后續的代碼邏輯。為了獲取操作的執行結果或者在操作完成時得到通知,Netty提供了Future
和ChannelFuture
。
實例代碼
服務端
public class MyServer {public static void main(String[] args) throws Exception {//創建兩個線程組 boosGroup、workerGroupEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {//創建服務端的啟動對象,設置參數ServerBootstrap bootstrap = new ServerBootstrap();//設置兩個線程組boosGroup和workerGroupbootstrap.group(bossGroup, workerGroup)//設置服務端通道實現類型 .channel(NioServerSocketChannel.class)//設置線程隊列得到連接個數 .option(ChannelOption.SO_BACKLOG, 128)//設置保持活動連接狀態 .childOption(ChannelOption.SO_KEEPALIVE, true)//使用匿名內部類的形式初始化通道對象 .childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//給pipeline管道設置處理器socketChannel.pipeline().addLast(new MyServerHandler());}});//給workerGroup的EventLoop對應的管道設置處理器System.out.println("java技術愛好者的服務端已經準備就緒...");//綁定端口號,啟動服務端ChannelFuture channelFuture = bootstrap.bind(6666).sync();//對關閉通道進行監聽channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
/*** 自定義的Handler需要繼承Netty規定好的HandlerAdapter* 才能被Netty框架所關聯,有點類似SpringMVC的適配器模式**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//獲取客戶端發送過來的消息ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到客戶端" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//發送消息給客戶端ctx.writeAndFlush(Unpooled.copiedBuffer("服務端已收到消息,并給你發送一個問號?", CharsetUtil.UTF_8));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//發生異常,關閉通道ctx.close();}
}
客戶端
public class MyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();try {//創建bootstrap對象,配置參數Bootstrap bootstrap = new Bootstrap();//設置線程組bootstrap.group(eventExecutors)//設置客戶端的通道實現類型 .channel(NioSocketChannel.class)//使用匿名內部類初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加客戶端通道的處理器ch.pipeline().addLast(new MyClientHandler());}});System.out.println("客戶端準備就緒,隨時可以起飛~");//連接服務端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();//對通道關閉進行監聽channelFuture.channel().closeFuture().sync();} finally {//關閉線程組eventExecutors.shutdownGracefully();}}
}
public class MyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//發送消息到服務端ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~馬來西亞~", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收服務端發送過來的消息ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到服務端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}
}