文章目錄
- 一、前言
- 二、流程分析
- 1. 創建 EventLoopGroup
- 2. 指定 Channel 類型
- 2.1 Channel 的創建
- 2.2 Channel 的初始化
- 3. 配置自定義的業務處理器 Handler
- 3.1 ServerBootstrap#childHandler
- 3.2 handler 與 childHandler 的區別
- 4. 綁定端口服務啟動
- 三、bossGroup 與 workerGroup
- 1. EventLoopGroup 的指定
- 2. ServerBootstrap#bind
- 2.1 AbstractBootstrap#initAndRegister
- 2.1.1 反射創建 Channel
- 2.1.2 ServerBootstrap#init
- 2.1.3 Channel 注冊
- 2.2 AbstractBootstrap#doBind0
- 四、ServerBootstrapAcceptor
- 1. ServerBootstrapAcceptor#channelRead
- 1.1 child.pipeline().addLast(childHandler)
- 1.2 childGroup.register(child)
- 1.3 ChannelInitializer#initChannel 的觸發邏輯
- 2. ServerBootstrapAcceptor#channelRead 的觸發時機
- 3. 總結
- 五、 服務端 Selector 事件輪詢
- 1. SingleThreadEventExecutor#inEventLoop
- 2. SingleThreadEventExecutor#addTask
- 3. SingleThreadEventExecutor#startThread
- 3.1 SingleThreadEventExecutor#doStartThread
- 4. SingleThreadEventExecutor#wakeup
- 六、總結
- 1. 服務端啟動
- 2. 客戶端連接
- 七、參考內容
一、前言
本系列雖說本意是作為 《Netty4 核心原理》一書的讀書筆記,但在實際閱讀記錄過程中加入了大量個人閱讀的理解和內容,因此對書中內容存在大量刪改。
本篇涉及內容 :第七章 揭開Bootstrap的神秘面紗
本系列內容基于 Netty 4.1.73.Final 版本,如下:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.73.Final</version></dependency>
系列文章目錄:
【Netty4核心原理】【全系列文章目錄】
在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 內容中我們對 客戶端Bootstrap 進行了分析,本篇來對 服務端 Bootstrap 進行分析。
(核心流程的分析比較瑣碎和混亂,我盡力了,改了刪,刪了改一周,感覺也沒辦法講清楚 )
二、流程分析
我們以下面的例子來分析:
public static class ChatServer {public void start(int port) throws InterruptedException {// 1. 創建 boss 和 worker 線程NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup)// 2. 指定 NioServerSocketChannel 類型.channel(NioServerSocketChannel.class)// 3. 配置自定義的業務處理器 Handler.childHandler(new ChannelInitializer<SocketChannel>() {// 客戶端初始化@Overrideprotected void initChannel(SocketChannel client) throws Exception {...}})// 針對主線程配置 : 分配線程數量最大 128.option(ChannelOption.SO_BACKLOG, 128)// 針對子線程配置 保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true);// 4. 綁定端口服務啟動ChannelFuture channelFuture = bootstrap.bind(port).sync();System.out.println("服務啟動成功, 端口 : " + port);// 阻塞主線程,防止直接執行 finally 中語句導致服務關閉,當有關閉事件到來時才會放行channelFuture.channel().closeFuture().sync();} finally {// 關閉線程池bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
從上面的可以的代碼可以看出,服務端基本寫法與客戶端基本相同,基本上也是進行如下幾個部分的初始化。
- 創建 EventLoopGroup :無論是服務端還是客戶端,都必須指定 EventLoopGroup。在上面的代碼中,指定了 NioEventLoopGroup ,表示一個 NIO 的 EventLoopGroup,不過服務端需要指定兩個EventLoopGroup,一個是 bossGroup,用于處理客戶端的連接請求,另一個是 workerGroup ,用于處理與各個客戶端連接的 IO 操作。
- 指定 Channel 類型。服務端使用 NioServerSocketChannel(客戶端使用的是 NioSocketChannel 類型)。在后面會通過反射來創建對應類型的 Channel。
- 配置自定義的業務處理器 Handler。
- 綁定端口服務啟動。
我們按照上面的注釋的部分來對每一步具體分析:
1. 創建 EventLoopGroup
在服務端初始化時,我們指定了兩個 NioEventLoopGroup 對象,一個是 bossGroup ,另一個是 workerGroup。bossGroup 只用于服務端的 accept 事件,也就是用于處理客戶端新連接接入的請求,而 workerGroup 負責客戶端連接通道的 IO操作。(具體過程在下面【bossGroup 與 workerGroup】部分進行詳細分析。)
2. 指定 Channel 類型
2.1 Channel 的創建
Channel 是對 Java 底層 Socket 連接的抽象,在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 的 【NioSocketChannel 的創建】部分我們分析過客戶端的 NioSocketChannel 的創建過程,這里 NioServerSocketChannel 的創建也基本與之相同:簡單來說就是通過 ReflectiveChannelFactory#newChannel 反射創建 NioServerSocketChannel 實例,這里不再過多贅述。
2.2 Channel 的初始化
當我們通過反射創建 NioServerSocketChannel 實例時,在 NioServerSocketChannel 的構造函數中還存在一定的初始化邏輯,因此下面我們來看下 NioServerSocketChannel 的初始化過程,NioServerSocketChannel 的繼承層次結構如下:
NioServerSocketChannel 存在幾個重載的構造函數,不過最終都會調到如下方法:
public NioServerSocketChannel(ServerSocketChannel channel) {// 調用父類構造函數 AbstractChannel#AbstractChannelsuper(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
在這個方法中,調用父類構造函數時的入參是 SelectionKey.OP_ACCEPT,而客戶端傳入的參數則是 SelectionKey.OP_READ。因為服務端在啟動后需要監聽客戶端的連接請求,因此這里監聽的時間類型是 SelectionKey.OP_ACCEPT,也就是告知 Selector 當前服務對客戶端連接的事件感興趣。
上面調用的父類構造函數 AbstractChannel#AbstractChannel,其代碼如下:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {// 調用父類構造函數 AbstractChannel#AbstractChannelsuper(parent);this.ch = ch;// 保存興趣事件this.readInterestOp = readInterestOp;try {// 設置為非阻塞(NIO)模式ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {...}throw new ChannelException("Failed to enter non-blocking mode.", e);}}
這里又調用了父類構造函數 AbstractChannel#AbstractChannel, 代碼如下:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();// 1. Unsafe 屬性的初始化:服務端這里的類型是 NioMessageUnsafe,客戶端的類型是 NioByteUnsafeunsafe = newUnsafe();// 2. ChannelPipeline 的初始化pipeline = newChannelPipeline();}
上面的方法中主要有兩個方面:
unsafe = newUnsafe();
:Unsafe 屬性的初始化pipeline = newChannelPipeline();
:ChannelPipeline 的初始化
這里的邏輯基本與客戶端的初始化相同,具體邏輯不再贅述(如有需要可詳參 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】的 【AbstractNioByteChannel#AbstractNioByteChannel】部分 )。
這里我們提一下 unsafe
的類型:newUnsafe()
是個抽象方法,而客戶端這里的 Channel 類型是 NioSocketChannel,而 服務端這邊的類型是 NioServerSocketChannel。這兩個類的 newUnsafe
方法并不相同,如下:
-
NioSocketChannel#newUnsafe 方法如下:
@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();}
-
NioServerSocketChannel#newUnsafe 方法如下:
@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();}
因此,這里可以知道,客戶端創建的 Unsafe 類型是 NioSocketChannelUnsafe,而服務端創建的則是 NioMessageUnsafe 類型。
客戶端只有一種情況:
- 客戶端啟動時會與服務端創建連接,過程中會創建 NioSocketChannel 類型的 Channel,此時創建的 Unsafe 類型為 NioSocketChannelUnsafe。此時的興趣事件是 OP_READ
服務端有兩種情況:
- 服務端自身啟動時,綁定端口時會創建 NioServerSocketChannel 類型,此時會創建 NioMessageUnsafe 類型。此時的興趣事件是 OP_ACCEPT
- 當服務端啟動后,客戶端連接服務端時,服務端會為每個客戶端創建 Channel,此時連接的客戶端的 Channel 類型是 NioSocketChannel,因此此時會為每個客戶端 Channel 創建的 Unsafe 類型為 NioSocketChannelUnsafe。(在下面【ServerBootstrapAcceptor#channelRead 的觸發時機】部分會說明這個場景)此時的興趣事件是 OP_READ
綜上:NioMessageUnsafe 用來處理 OP_ACCEPT 事件,NioSocketChannelUnsafe 用來處理 OP_READ 事件
3. 配置自定義的業務處理器 Handler
3.1 ServerBootstrap#childHandler
在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 我們有分析過 【Handler 添加過程】,在客戶端中是調用 AbstractBootstrap#handler 方法添加 Handler, 而在服務端是調用 ServerBootstrap#childHandler 添加,除此之外基本邏輯相同,因此也不在贅述。
3.2 handler 與 childHandler 的區別
在客戶端時我們調用的是 Bootstrap#handler 來添加處理器(這里 Bootstrap#handler 實際上是 AbstractBootstrap#handler 方法) ,而服務端這里則是調用的 ServerBootstrap#childHandler,下面我們來介紹下這兩個方法:
-
方法所屬類及用途概述
- AbstractBootstrap#handler(ChannelHandler):AbstractBootstrap 是 Bootstrap(客戶端引導類)和 ServerBootstrap(服務器引導類)的父類。handler 方法可用于客戶端和服務器,它設置的處理器會應用于 Bootstrap 或 ServerBootstrap 自身的 Channel。
- ServerBootstrap#childHandler(ChannelHandler):ServerBootstrap 專門用于創建和配置 Netty 服務器。childHandler 方法設置的處理器會應用于服務器接受的新連接所創建的子 Channel。
-
作用對象
- AbstractBootstrap#handler(ChannelHandler):對于客戶端而言,該處理器應用于客戶端連接服務器的 Channel,也就是客戶端用于與服務器通信的通道。
對于服務器端,該處理器應用于負責接受新連接的 ServerSocketChannel。這個 Channel 的主要任務是監聽端口并接受新的客戶端連接。 - ServerBootstrap#childHandler(ChannelHandler):僅適用于服務器端,該處理器應用于服務器接受新連接后創建的子 Channel(即 SocketChannel)。這些子 Channel 用于與各個客戶端進行實際的數據通信。
- AbstractBootstrap#handler(ChannelHandler):對于客戶端而言,該處理器應用于客戶端連接服務器的 Channel,也就是客戶端用于與服務器通信的通道。
-
使用場景
- AbstractBootstrap#handler(ChannelHandler):客戶端:可以用于設置一些與客戶端連接建立過程相關的處理器,例如在連接建立前進行一些初始化操作,或者對連接狀態進行監控。
服務器端:可以用于處理服務器的啟動和關閉事件,或者對新連接的接受過程進行一些額外的處理。 - ServerBootstrap#childHandler(ChannelHandler):主要用于處理服務器與客戶端之間的實際數據交互。可以添加編解碼器、業務邏輯處理器等,對客戶端發送的數據進行解碼、處理,并將處理結果編碼后返回給客戶端。
- AbstractBootstrap#handler(ChannelHandler):客戶端:可以用于設置一些與客戶端連接建立過程相關的處理器,例如在連接建立前進行一些初始化操作,或者對連接狀態進行監控。
4. 綁定端口服務啟動
在 ServerBootstrap 做好準備工作后會調用 ServerBootstrap#bind 來綁定服務端口,在本文下面【ServerBootstrap#bind】中我們進行了詳細介紹。
三、bossGroup 與 workerGroup
在服務端初始化時,我們指定了兩個 NioEventLoopGroup 對象,一個是 bossGroup ,另一個是 workerGroup。bossGroup 只用于服務端的 accept 事件,也就是用于處理客戶端新連接接入的請求(在 Netty 的單端口模式下,bossGroup 線程池?只會使用其中一個線程?處理連接請求,其他線程處于空閑狀態),而 workerGroup 負責客戶端連接通道的 IO操作。如下圖:
這里思想是 多 Reactor 多線程模型的思想,在 【Netty4核心原理⑩】【大名鼎鼎的 EventLoop】 的【Reactor 線程模型】中詳細介紹。
簡單來說:服務端的 bossGroup 會不斷監聽是否有客戶端連接,當發現有一個新的客戶端連接到來時, bossGroup 就會為此連接初始化各項資源;然后從 workerGroup 中選出一個 EventLoop 綁定到此客戶端連接中;接下來服務端與客戶端的交互過程將全部在此分配的 EventLoop 中完成。
根據上面的內容,我們可以看到與 bossGroup 和 workerGroup 相關的有兩部分:
- EventLoopGroup 的指定 :通過
bootstrap.group(bossGroup, workerGroup)
將 bossGroup 和 workerGroup 綁定到 ServerBootstrap 中 - ServerBootstrap#bind :通過
bootstrap.bind(port).sync()
啟動服務,當接收到客戶端消息時,會根據規則分配給 bossGroup 或 workerGroup 來處理。
下面我們來詳細看看這兩部分的內容。
1. EventLoopGroup 的指定
在 ServerBootstrap 初始化時會調用 bootstrap.group(bossGroup, workerGroup)
方法設置了 workerGroup 和 bossGroup 兩個 EventLoopGroup,因此這里我們來看下ServerBootstrap#group的實現,代碼如下:
// io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {// 調用父類 AbstractBootstrap#group(io.netty.channel.EventLoopGroup) 構造函數,執行 this.group = group 的操作super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}// 保存到 ServerBootstrap 的 childGroup 屬性中this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}
AbstractBootstrap#group(io.netty.channel.EventLoopGroup) 代碼如下:
public B group(EventLoopGroup group) {ObjectUtil.checkNotNull(group, "group");if (this.group != null) {throw new IllegalStateException("group set already");}// 保存到 ServerBootstrap 的 group 屬性中this.group = group;return self();}
這里可以看到 ServerBootstrap#group 就是將 bossGroup 和 workerGroup 保存到 ServerBootstrap 的 group 和 childGroup 屬性中。
需要注意的是:在 Netty 的單端口模式下,bossGroup 線程池?只會使用其中一個線程?處理連接請求,其他線程處于空閑狀態。這個在我們下面源碼分析過程中也可以看到。
- 每個監聽的端口對應一個 NioServerSocketChannel,該 Channel 在初始化時從 bossGroup 中通過輪詢算法(next() 方法)選擇一個 EventLoop(線程)綁定,負責監聽該端口的 TCP 連接事件?。即使 bossGroup 配置了多個線程,單端口場景下僅需一個線程處理 accept 事件,其余線程不會參與工作?
- 官方建議在單端口場景中將 bossGroup 的線程數設為 1(如 new NioEventLoopGroup(1)),避免資源浪費?。多線程配置的 bossGroup 僅用于容錯(如線程異常終止后重啟)或多端口監聽場景,單端口模式下無實際作用?
2. ServerBootstrap#bind
在上面我們提到服務啟動時會調用 bootstrap.bind(port)
方法,因此這里我們來看下ServerBootstrap#bind 方法(這里調用的是父類 AbstractBootstrap#bind 方法)。而 AbstractBootstrap#bind 方法直接調用了AbstractBootstrap#doBind 方法,因此我們這里來看 AbstractBootstrap#doBind 的實現:
// doBind 方法主要負責執行 Channel 的綁定操作,即將 Channel 綁定到指定的本地地址(如本地的 IP 地址和端口號)上。// 它在整個過程中需要先處理 Channel 的初始化與注冊相關事宜,然后根據注冊結果來決定如何進行后續的綁定操作,同時要妥善處理各種成功、失敗以及異步等待的情況。private ChannelFuture doBind(final SocketAddress localAddress) {// 1.初始化并注冊 Channelfinal ChannelFuture regFuture = initAndRegister();// 獲取創建的 Channel, 并判斷創建過程中是否出現異常final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}// 當 regFuture.isDone() 返回 true,表明 Channel 的初始化和注冊操作已經完成。// 此時創建一個新的 ChannelPromise(用于表示綁定操作的結果),然后調用 doBind0 方法來執行實際的綁定操作,傳遞注冊相關的 ChannelFuture(regFuture)、Channel 實例、要綁定的本地地址以及剛創建的 ChannelPromise。最后返回這個 ChannelPromise,以便調用者后續可以通過它來跟蹤綁定操作的完成情況及獲取結果。if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();// 2. 如果注冊完成則調用 doBind0 方法doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.// 當 regFuture.isDone() 返回 false,也就是注冊操作尚未完成時,創建一個 PendingRegistrationPromise 實例(它也是 ChannelPromise 的一種實現,用于處理這種延遲完成的情況)。final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();// 2. 如果注冊完成則調用 doBind0 方法doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
上面注釋寫的比較清楚,核心就注釋中標注的兩個方法,我們按照注釋順序來看下面兩個方法:
- AbstractBootstrap#initAndRegister :該方法內部會完成 Channel 的初始化以及將其注冊到對應的 EventLoop 上,返回的 ChannelFuture(regFuture)用于表示這個初始化和注冊操作的結果。通過 regFuture.channel可以獲取到與之關聯的 Channel 實例。
- AbstractBootstrap#doBind0 :該方法主要用于服務端啟動時將 Channel 綁定到指定本地地址的具體操作。
關于 ChannelFuture 和 ChannelPromise 的介紹詳參 【Netty4核心原理?】【異步處理雙子星 Future 與 Promise】
下面我們來詳細看看這兩個方法:
2.1 AbstractBootstrap#initAndRegister
AbstractBootstrap#initAndRegister 方法的內容我們在 【Netty4核心原理⑦】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 一文中有過詳細分析,當時分析的是客戶端(Bootstrap)初始化的過程,而我們這里是服務端(ServerBootstrap)的初始化過程。不過邏輯基本類似,這里簡單來看下:
final ChannelFuture initAndRegister() {Channel channel = null;try {// 1. 反射創建 Channel ,這里是 NioServerSocketChannel 類型channel = channelFactory.newChannel();// 2. ServerBootstrap 重寫了該方法,這里調用的是 ServerBootstrap#init // 對新創建的 Channel 進行初始化。init 方法通常用于配置 Channel 的一些屬性,以及向 ChannelPipeline 中添加 ChannelHandlerinit(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}...// 3. 將 Channel 注冊到 Selector 上,這里 config().group() 返回的是 EventLoop 是 bossGroup ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}...return regFuture;}
2.1.1 反射創建 Channel
channelFactory.newChannel()
通過反射創建 Channel,這里已經在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 中做了詳細介紹,這里不再贅述。
這里與客戶端不同的點在于:服務端這里創建的類型是 NioServerSocketChannel,而客戶端的類型是 NioSocketChannel。
2.1.2 ServerBootstrap#init
這里的方法被 ServerBootstrap 重寫了,因此這里調用的是 ServerBootstrap#init。該方法主要負責對 Channel 進行初始化配置。這個初始化過程涵蓋了設置 Channel 的選項、屬性,以及向 Channel 的 ChannelPipeline 中添加特定的 ChannelHandler,特別是添加了一個 ChannelInitializer,它會在后續進一步完善 ChannelPipeline 的配置,為服務器端接收客戶端連接并處理相關事務做準備。
具體實現如下:
// channel 是 服務端的 channel void init(Channel channel) {// 為給定的 Channel 設置一系列的選項參數,這些選項可能涉及網絡連接的各種配置,比如緩沖區大小、超時時間等,同時會通過傳入的 logger 記錄相關的配置信息setChannelOptions(channel, newOptionsArray(), logger);// 給 Channel 設置一些自定義的屬性,這些屬性可以在后續的代碼中用于識別、區分或者傳遞與 Channel 相關的特定信息。setAttributes(channel, newAttributesArray());// 獲取 Channel 對應的 ChannelPipeline,后續將向這個管道中添加各種 ChannelHandler,用于處理不同階段和類型的事件。ChannelPipeline p = channel.pipeline();// 準備相關配置參數final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);// 添加 ChannelInitializer 到 ChannelPipelinep.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {// 首先獲取當前 Channel(ch)的 ChannelPipelinefinal ChannelPipeline pipeline = ch.pipeline();// 嘗試從配置(config)中獲取一個 ChannelHandlerChannelHandler handler = config.handler();if (handler != null) {// 如果不為 null,則將其添加到 pipeline 中,這個 ChannelHandler 可能是用于處理服務器端一些通用的業務邏輯或者預處理操作等。pipeline.addLast(handler);}// 通過 ch.eventLoop().execute 將添加 ServerBootstrapAcceptor 的操作提交到 Channel 的事件循環(EventLoop)中執行。// 這樣做確保了添加操作在合適的線程上下文中進行,避免了多線程并發問題。ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// ServerBootstrapAcceptor 是一個關鍵的 ChannelHandler,它主要用于在服務器端接受客戶端連接后,對新建立的客戶端連接對應的 Channel(子 Channel)進行進一步的配置和處理,// 比如將其關聯到指定的 EventLoopGroup,并添加相應的 ChannelHandler 等,使用之前準備好的 currentChildGroup、currentChildHandler、currentChildOptions 和 currentChildAttrs 等參數來完成這些配置pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
上面的注釋寫的比較清楚,具體不在贅述。需要注意上面代碼中有下面一句:
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
這里的代碼是將 ServerBootstrapAcceptor 添加到服務端 Channel 的 ChannelPipeline 中。關于 ServerBootstrapAcceptor 的內容我們下面 【ServerBootstrapAcceptor】部分會詳細說明。
2.1.3 Channel 注冊
上述代碼中通過 config().group().register(channel);
完成了 Channel 的注冊,在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 有過對客戶端的 Channel 注冊分析,服務端與客戶端在代碼實現的關鍵邏輯上都是一致的,因此這里不再贅述。
客戶端與服務端的不同之處:
config()
:客戶端的config()
的實現在 Bootstrap#config,返回的類型是 BootstrapConfig,而服務端的是在 ServerBootstrap#config,返回的類型是 ServerBootstrapConfigconfig().group()
: BootstrapConfig#group() 和 ServerBootstrap#group() 都會調用 io.netty.bootstrap.AbstractBootstrap#group() 方法,該方法都會返回 AbstractBootstrap#group屬性,因此從后續代碼執行的邏輯上來說,服務端和客戶端config().group().register(channel);
都會調用 EventLoopGroup#register(io.netty.channel.ChannelPromise) 方法(實現類是 MultithreadEventLoopGroup#register(Channel)),因此二者代碼執行上并無區別。
2.2 AbstractBootstrap#doBind0
AbstractBootstrap#doBind0 方法主要用于執行實際的 Channel 綁定操作,即將 Channel 綁定到指定的本地地址(例如本地 IP 地址和端口號)上。若注冊成功則進行綁定,若注冊失敗則相應地設置綁定操作的 ChannelPromise 為失敗狀態,以此來處理 Channel 綁定這一關鍵步驟,并通過合適的異步機制確保操作在 Channel 的事件循環線程中執行。
AbstractBootstrap#doBind0 具體實現如下:
// 入參// regFuture : 保存了 Channel 注冊結果// channel : 注冊的 服務端channel// localAddress : channel 將要綁定的本機地址// promise :保存 channel 與 localAddress 的綁定結果private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 下面的注釋指出該方法在 channelRegistered() 被觸發之前調用,目的是給用戶自定義的處理器(user handlers)提供機會,使其能夠在 channelRegistered() 方法實現中對 ChannelPipeline 進行相關設置。// 這體現了方法調用在 Channel 生命周期中的相對位置以及與其他相關操作的關聯性,確保整個流程的連貫性和靈活性,方便開發者基于 Channel 生命周期的不同階段進行定制化處理。// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.// 將操作放在 EventLoop 中執行,能夠保證操作在 Channel 對應的正確線程上下文中進行,避免多線程并發帶來的一些諸如數據不一致、資源競爭等問題,遵循了 Netty 的事件驅動和線程模型設計原則。channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 1. 如果Channel 注冊操作成功,則嘗試進行綁定操作,將 Channel 綁定到指定的本地地址 localAddress,并傳入 ChannelPromise(promise)用于跟蹤綁定操作的結果channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {// Channel 注冊失敗,直接將失敗結果保存等待返回promise.setFailure(regFuture.cause());}}});}
上面代碼可以看到,如果Channel 注冊操作成功,則會調用 channel.bind(localAddress, promise)
方法進行綁定操作(而這個執行線程是我們上面提到的 Channel 綁定的 NioEventLoop 對應的本地線程,下面 【服務端 Selector 事件輪詢】部分詳細說明),而這里 channel.bind(localAddress, promise)
調用的是 AbstractChannel#bindSocketAddress, ChannelPromise) 方法,內部會調用 DefaultChannelPipeline#bind(SocketAddress, ChannelPromise) 方法,如下:
@Overridepublic final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {// 調用 AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)return tail.bind(localAddress, promise);}
這里 tail 是我們之前提到過的 ChannelPipeline內部雙向鏈表的尾節點 TailContext 實例:
- 在 DefaultChannelPipeline 中維護了一個以 AbstractChannelHandlerContext 為節點元素的雙向鏈表,而 head 和 tail 分別指向雙向鏈表的頭尾節點。
- 這里 TailContext#bind 是出站方法,會從雙向鏈表的尾部Tail 節點往 頭部Head 節點傳播。
tail.bind(localAddress, promise)
實際調用的是 AbstractChannelHandlerContext#bind(SocketAddress, ChannelPromise) ,其實現如下:
// 將 Channel 綁定到指定的本地地址(localAddress)@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");// 校驗是否合法if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}// 1. 查找用于處理 bind 操作的 AbstractChannelHandlerContext// MASK_BIND 是一個掩碼,用于確定查找的是與連接操作相關的出站處理器。這個處理器將負責實際的連接邏輯。final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();// 如果當前線程是 EventExecutor 的事件循環線程(executor.inEventLoop()),則直接調用 next.invokeBind(localAddress, promise) 方法if (executor.inEventLoop()) {// 2. 調用綁定操作next.invokeBind(localAddress, promise);} else {// 如果當前線程不是 EventExecutor 的事件循環線程,通過 safeExecute 方法將連接操作提交到 EventExecutor 中執行。// safeExecute 方法會確保任務在 EventExecutor 中安全執行,并在執行過程中處理可能出現的異常。safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}}, promise, null, false);}return promise;}
關于 Pipeline 的傳播過程,在 【Netty4核心原理?】【Netty 大動脈 Pipeline】 中我們有詳細介紹,簡單來說就是從 雙向鏈表的頭或尾節點開始遍歷,找到能處理當前事件的 ChannelHandler 進行處理,篇幅所限這里不做過多介紹。這里我們簡單來看兩點:
-
findContextOutbound(MASK_BIND);
:該方法調用是 AbstractChannelHandlerContext#findContextOutbound ,其作用簡單來說就是從 DefaultChannelPipeline 內部的雙向鏈表的 Tail 開始,不斷向前找到第一個出站且支持指定操作的 ChannelHandlerContext AbstractChannelHandlerContext,然后調用它的 invokeBind 方法。例如,當執行 connect 操作時,會傳入與 connect 操作對應的掩碼 MASK_CONNECT。該方法會在 ChannelPipeline 中從當前位置開始向后查找,找到第一個出站且支持 connect 操作的 ChannelHandlerContext。這個找到的 ChannelHandlerContext 對應的 ChannelHandler 將負責處理 connect 操作的具體邏輯。
通過這種方式,Netty 能夠在 ChannelPipeline 中靈活地定位到合適的 ChannelHandler 來處理各種出站操作,實現了強大的可擴展性和定制性。而從 雙向鏈表的Tail 節點開始查找到第一個支持 bind 出站事件的 ChannelHandler 就是 Head 節點,因此這里調用的就是 HeadContext#invokeBind
-
next.invokeBind(localAddress, promise)
:該方法的實現是 AbstractChannelHandlerContext#invokeBind,其實現如下:private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {// 確定是否需要調用當前 Handler 來處理 channelRegistered 事件。// 這個方法會檢查 Handler 的狀態 以決定是否執行該 Handler 的邏輯if (invokeHandler()) {try {// 將 handler 轉換為 ChannelOutboundHandler 并調用其 bind 方法進行綁定操作// 這里 handler() 返回的是 HeadContext, 即雙向鏈表的 頭節點((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {// 如果發生異常,通知異常情況notifyOutboundHandlerException(t, promise);}} else {// 如果 invokeHandler() 返回 false,表示不需要當前 Handler 處理 bind事件。// 此時,調用 bind(localAddress, promise) 方法將 bind事件傳遞給 ChannelPipeline 中的下一個 Handler 繼續處理。bind(localAddress, promise);}}
上面可以看到核心在
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
中,而 HeadContext#handler 返回的是自身,也就是 HeadContext,因此這里調用的就是 HeadContext#bind 方法,如下:
@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {// 交由 AbstractChannel.AbstractUnsafe#bindunsafe.bind(localAddress, promise);}
AbstractUnsafe#bind 會調用 AbstractUnsafe#doBind 來完成綁定,如下:
protected void doBind(SocketAddress localAddress) throws Exception {// 判斷 Java 版本,調用不同的 APIif (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}
至此完成了服務端口的綁定,也即代表著 服務端可以正式提供服務了。
四、ServerBootstrapAcceptor
在上面【ServerBootstrap#init】部分我們提到了 ServerBootstrap#init 中會創建一個 ServerBootstrapAcceptor 添加到服務端 Channel 的 pipeline中, 如下:
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
ServerBootstrapAcceptor 是一個關鍵的 ChannelHandler,它主要用于在服務器端接受客戶端連接后,對新建立的客戶端連接對應的 Channel(子 Channel)進行進一步的配置和處理。也就是說,當有新的客戶端 Channel 連接到服務器后,ServerBootstrapAcceptor 會對其做一定的處理(通過 ServerBootstrapAcceptor#channelRead 方法)。
ServerBootstrapAcceptor 繼承結構如下:
這里我們可以看到 ServerBootstrapAcceptor 繼承了 ChannelInboundHandlerAdapter。實際上 ServerBootstrapAcceptor 還重寫了 ChannelInboundHandlerAdapter 的 channelRead 方法。
ServerBootstrapAcceptor#channelRead 是當 Channel 從網絡讀取到數據時觸發,數據會被傳遞給 ChannelPipeline 中的入站處理器進行處理,簡單來說就是當客戶端發起連接時會觸發該方法。
ChannelInboundHandlerAdapter 是 ChannelInboundHandler 接口的實現類,在 【Netty4核心原理⑦】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】== 的 【入站 和 出站】部分我們詳細介紹了 ChannelInboundHandler 各個方法的調用時機和作用。
下面我們詳細來看這個 ServerBootstrapAcceptor#channelRead 方法。
1. ServerBootstrapAcceptor#channelRead
ServerBootstrapAcceptor#channelRead 方法在新的客戶端連接被服務器接受時會被調用:當服務器監聽到新的連接請求時,會接受這個連接,并創建一個新的 Channel 來表示這個連接。這個新的 Channel 會被封裝成一個消息對象,通過 ChannelPipeline 進行傳遞,當這個新連接的消息對象傳遞到 ServerBootstrapAcceptor 時,channelRead 方法會被調用。
ServerBootstrapAcceptor#channelRead 實現如下:
/*** 重寫的 channelRead 方法,當有新的通道連接事件觸發時會被調用。* 該方法主要用于處理新連接的通道,為其添加處理器、設置通道選項和屬性,并將其注冊到子事件循環組中。** @param ctx 通道處理器上下文對象,提供了與通道和處理器相關的操作方法* @param msg 接收到的消息對象,這里表示新連接的通道*/
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 將接收到的消息對象強制轉換為 Channel 類型,代表新的客戶端的連接通道final Channel child = (Channel) msg;// 1.為新連接的通道的通道處理器鏈(pipeline)添加 處理器childHandler// childHandler 是通過 ServerBootstrap#childHandler 方法指定的處理器,因此這里是將我們自定義的 ChannelInitializer 添加到 客戶端的 Channel 上child.pipeline().addLast(childHandler);// 為新連接的通道設置通道選項(如 TCP_NODELAY 等)// childOptions 是一個包含通道選項的集合// logger 用于記錄操作過程中的日志信息setChannelOptions(child, childOptions, logger);// 為新連接的通道設置屬性// childAttrs 是一個包含通道屬性的集合setAttributes(child, childAttrs);try {// 2. 將新連接的通道注冊到子事件循環組(childGroup)中// 注冊操作是異步的,返回一個 ChannelFuture 對象表示操作的未來結果// childGroup 是 構造時傳入的 currentChildGroup,也就是 workerGroup 對象// childGroup.register(child) 將 workerGroup 中的某個 EventLoop 與 NioSocketChannel 進行關聯childGroup.register(child).addListener(new ChannelFutureListener() {/*** 當通道注冊操作完成時,該方法會被調用。** @param future 表示通道注冊操作的未來結果* @throws Exception 可能拋出的異常*/@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 檢查通道注冊操作是否成功if (!future.isSuccess()) {// 如果注冊操作失敗,調用 forceClose 方法強制關閉該通道// 并傳入失敗的原因(通過 future.cause() 獲取)forceClose(child, future.cause());}}});} catch (Throwable t) {// 如果在注冊通道的過程中發生異常,調用 forceClose 方法強制關閉該通道// 并傳入捕獲到的異常對象forceClose(child, t);}
}
上面代碼注釋比較清楚,我們按照注釋來看其中兩點:
1.1 child.pipeline().addLast(childHandler)
child.pipeline().addLast(childHandler)
: 將我們創建的 匿名類 ChannelInitializer ( childHandler
)添加到客戶端的 Channel 的 ChannelPipeline 中。
其中 child
就是 新連接的客戶端的連接通道 Channel, childHandler
則是我們通過 ServerBootstrap#childHandler 方法指定的處理器,也就是我們添加的一開始的匿名類 ChannelInitializer 。如下圖:
也就是說,這一步是會為新連接客戶端的 Channel 的 Pipeline 上添加了我們自定義的 childHandler,也就是說此時這個客戶端的 Channel 的 ChannelPipeline 結構如下:
1.2 childGroup.register(child)
childGroup.register(child)
:將 workerGroup 中的某個 EventLoop 與 NioSocketChannel 進行關聯。當這個 NioSocketChannel 有事件發生時,可以直接交由當前綁定的 EventLoop 來處理(EventLoop 可以簡單認為是一個本地線程的封裝。其中 childGroup
是調用 ServerBootstrap#group 方法時傳入的 currentChildGroup
,也就是示例中的 workerGroup
對象,實際類型是 NioEventLoopGroup;而 child
是 NioSocketChannel 的實例。
所以這里調用的是 NioEventLoopGroup#register,根據調用鏈路會調用到 SingleThreadEventLoop#register 方法,該方法在 【Netty4核心原理⑦】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 一文的【SingleThreadEventLoop#register 】部分有過詳細分析,這里不再贅述。)
在 Netty 中,NioEventLoop 和 NioSocketChannel 是一對多的綁定關系:
- NioEventLoop 職責:NioEventLoop 本質上是一個事件循環,其主要工作是負責監聽多個 Channel 上的 I/O 事件,像連接、讀寫等操作。它借助 Java NIO 的 Selector 來實現多路復用,能夠同時處理多個 Channel 的事件。
- NioSocketChannel 職責:NioSocketChannel 代表的是一個基于 Java NIO 的 TCP 套接字通道,用于網絡數據的讀寫。
1.3 ChannelInitializer#initChannel 的觸發邏輯
結合上面,我們可以得知:
-
當有新客戶端連接時,會觸發 ServerBootstrapAcceptor#channelRead 方法,該方法中會執行
child.pipeline().addLast(childHandler)
邏輯,將創建的 匿名類 ChannelInitializer (childHandler
) 添加到客戶端的 Channel 的 ChannelPipeline 中。此時 ChannelPipeline 雙向鏈表的結構如下圖所示:
-
隨后會調用
childGroup.register(child)
中會調用到 AbstractChannel.AbstractUnsafe#register0 方法,在該方法中會調用 DefaultChannelPipeline#invokeHandlerAddedIfNeeded 方法,而該方法會觸發 客戶端 Channel 中的 ChannelPipeline 每個 ChannelHandler 節點的 handlerAdded 方法。 -
因此這里觸發 匿名類 ChannelInitializer (
childHandler
)的 handlerAdded方法,而 ChannelInitializer#handlerAdded 中會調用 ChannelInitializer#initChannel 方法,也就是我們自己自定義的邏輯,從而將自定義的 ChannelHandler 添加到 客戶端的 ChannelPipeline 中,并且會將自身(ChannelInitializer)從 Pipeline 中移除,此時 ChannelPipeline 雙向鏈表的結構如下圖所示:(詳細分析在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 的 【Handler 添加過程】部分有過介紹)
2. ServerBootstrapAcceptor#channelRead 的觸發時機
上面我們介紹了 ServerBootstrapAcceptor#channelRead 方法,知道了當有新的客戶端連接時會觸發該方法,下面我們來詳細看看 ServerBootstrapAcceptor#channelRead 的觸發時機,如下:
-
當有新的客戶端連接請求到達服務器時,Selector 會檢測到 OP_ACCEPT(連接接受)事件接著就會調用 NioServerSocketChannel#doReadMessages 方法,該方法的作用是 :嘗試從底層的 Java NIO 通道中讀取新連接的客戶端信息,并將其封裝為 Netty 中的 NioSocketChannel 對象,添加到傳入的 List 中,以此來表示接收到了新的客戶端連接。具體實現如下:
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 獲取客戶端新連接的 SocketChannel 對象SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// this 即 NioServerSocketChannel 對象,ch 是與客戶端新創建的連接通道// 這里會創建一個新的 NioSocketChannel 對象添加到 buf 中buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {...}return 0;}
-
隨后會利用 Netty 的 ChannelPipeline 機制,將讀取事件逐級發送到各個 Handler 中,于是就會觸發ServerBootstrapAcceptor#channelRead 方法。
ChannelPipeline 是一個由多個 ChannelHandler 組成的管道。當 NioEventLoop 觸發OP_ACCEPT事件處理時,會從 ChannelPipeline 的頭部開始,依次調用每個 ChannelHandler 的channelRead方法(如果 ChannelHandler 實現了 channelRead 接口),在 【Netty4核心原理?】【Netty 大動脈 Pipeline】 一文中詳細介紹。
-
ServerBootstrapAcceptor 作為 ChannelPipeline中的一個 ChannelHandler,在這個事件傳播過程中,當輪到它處理時,其 channelRead方法就會被觸發,完成 NioEventLoop 和 NioSocketChannel 的綁定。
3. 總結
這里我們先簡單總結下 ServerBootstrapAcceptor 的作用:
-
在服務端啟動時會創建一個 Channel 代表本機與監聽端口的通道, 在 ServerBootstrap#init 中會為這個 Channel 添加一個 ServerBootstrapAcceptor 的 ChannelHandler,代表當這個 Channel 有事件發生時會觸發 ServerBootstrapAcceptor 的對應方法。
-
而作為服務端關系的事件是 ON_ACCEPT ,即客戶端連接事件。所以當有客戶端與服務端建立連接時,服務端會為每個客戶端連接創建一個 客戶端 Channel,隨后會觸發 ServerBootstrapAcceptor#channelRead。
調用鏈路簡化后如下:
- AbstractNioMessageChannel.NioMessageUnsafe#read -> ChannelPipeline#fireChannelRead -> AbstractChannelHandlerContext#invokeChannelRead -> AbstractChannelHandlerContext#invokeChannelRead -> ChannelInboundHandler#channelRead -> ChannelHandlerContext#fireChannelRead
-
ServerBootstrapAcceptor#channelRead 中會在 客戶端 Channel 的 Pipeline 上注冊
childHandler
,并且會從childGroup
中選擇一個 NioEventLoop 來負責這個 客戶端 Channel 的相關事件的處理,這樣就完成 NioEventLoop 和 NioSocketChannel 的綁定。
五、 服務端 Selector 事件輪詢
服務端在啟動時會調用 ServerBootstrap#bind 來完成 端口監聽與事件處理,上面【AbstractBootstrap#doBind0 】部分我們看到了如下代碼:
channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 如果服務端注冊操作成功,則嘗試進行綁定操作,將 Channel 綁定到指定的本地地址 localAddress,并傳入 ChannelPromise(promise)用于跟蹤綁定操作的結果channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
這里 channel.eventLoop()
會返回一個 EventLoop (實際是 bossGroup 池中的一個 NioEventLoop 實例),所以這里的 ExecutoLoop#execute 方法實際是 NioEventLoop#execute。
在上面 【ServerBootstrapAcceptor#channelRead】部分我們提到 Netty 會為將 EventLoop 與 NioSocketChannel 進行關聯,因此每個 Channel 都有一個自己的綁定的 EventLoop(但一個 EventLoop 可能對應多個 Channel)。
NioEventLoop#execute 實現在父類 SingleThreadEventExecutor#execute 中, SingleThreadEventExecutor#execute 的代碼如下:
/*** 執行給定的任務。該方法會根據當前線程是否在事件循環中,以及事件循環的狀態,來決定如何處理任務。** @param task 要執行的任務,以 Runnable 接口的形式表示* @param immediate 一個布爾值,指示是否立即喚醒事件循環來執行任務*/private void execute(Runnable task, boolean immediate) {// 1. 檢查當前線程是否在事件循環中boolean inEventLoop = inEventLoop();// 2. 將任務添加到任務隊列中addTask(task);// 如果當前線程不是 EventLoop 線程if (!inEventLoop) {// 3. 嘗試啟動一個 EventLoop線程,交由這個線程來處理任務。startThread();// 判斷線程狀態是否已經關閉,如果關閉則移除任務if (isShutdown()) {boolean reject = false;try {// 嘗試從任務隊列中移除剛剛添加的任務if (removeTask(task)) {// 如果移除成功,標記任務需要被拒絕reject = true;}} catch (UnsupportedOperationException e) {// 當任務隊列不支持移除操作時,捕獲該異常// 這種情況下,我們只能繼續執行,期望在任務完全終止前能夠處理它// 在最壞的情況下,我們會在終止時記錄日志}// 如果任務需要被拒絕if (reject) {// 調用拒絕任務的方法:拋出 RejectedExecutionException 異常 reject();}}}// 如果添加任務不會自動喚醒事件循環,并且 immediate 參數為 trueif (!addTaskWakesUp && immediate) {// 4. 喚醒事件循環,使其能夠立即處理任務wakeup(inEventLoop);}}
我們這里來簡單梳理下邏輯:
- 通過
inEventLoop()
來獲取當前線程的標識 ,該方法會檢查 當前線程是否是 SingleThreadEventExecutor#thread 線程。- Netty 采用單線程模型來處理 Channel 的 I/O 操作和相關任務。每個 EventExecutor 通常對應一個線程,這個線程負責執行注冊到該 EventExecutor 上的 Channel 的 I/O 事件處理邏輯以及任務隊列中的任務。
- 在 SingleThreadEventExecutor 類中,定義了一個
private volatile Thread thread
成員變量,用于存儲與該 EventExecutor 關聯的線程對象。 inEventLoop()
為 false 的情況有兩種:- SingleThreadEventExecutor#thread 為空 :這種情況說明 SingleThreadEventExecutor 綁定的線程還沒初始化,下面會調用
startThread()
來開啟一個新線程 - SingleThreadEventExecutor#thread 不為空,但是并不等于當前線程:這種情況說明當前調用線程并非是 SingleThreadEventExecutor#thread 線程,
- SingleThreadEventExecutor#thread 為空 :這種情況說明 SingleThreadEventExecutor 綁定的線程還沒初始化,下面會調用
- 將當前需要處理的任務通過
addTask(task)
方法添加到任務隊列(SingleThreadEventExecutor#taskQueue) 中。- SingleThreadEventExecutor 內部存在一個任務隊列 taskQueue 屬性
- 無論 inEventLoop() 返回什么結果, 都會將 task 添加到 SingleThreadEventExecutor 的任務隊列中。因此在某些情況下可能會存在將任務添加到錯誤的 SingleThreadEventExecutor 隊列中。
- SingleThreadEventExecutor(實際類型是 NioEventLoop) 會通過一個循環不停的從 taskQueue 中獲取任務執行。
- 如果當前線程并不是 SingleThreadEventExecutor#thread 線程中,則調用
startThread()
啟動一個新的 EventLoop 線程。startThread()
并不一定會啟動一個新線程,如果 SingleThreadEventExecutor 已經綁定了線程(SingleThreadEventExecutor#thread 不為空)則不會做任何處理
- 接著通過
isShutdown()
方法檢查事件循環是否已關閉,若已關閉,嘗試從任務隊列中移除該任務。若移除成功,調用reject()
方法拒絕該任務。 - 喚醒事件循環:如果添加任務不會自動喚醒事件循環,且 immediate 參數為 true,則調用
wakeup()
方法喚醒事件循環,使其能立即處理任務。
下面我們我們針對上面注釋的幾點來進行詳細分析:
1. SingleThreadEventExecutor#inEventLoop
SingleThreadEventExecutor#inEventLoop 用于判斷當前調用線程是否是 Channel 綁定的線程。Netty 會為每個 Channel 綁定一個 NioEventLoop,而每個 NioEventLoop 綁定一個 本地線程 Thread,因此 每個 Channel 都會綁定一個線程,這里就是判斷:調用該方法的當前線程是否是 該 Channel 綁定的線程。
SingleThreadEventExecutor#inEventLoop 調用的是其父類 AbstractEventExecutor#inEventLoop() 方法,實現如下:
// io.netty.util.concurrent.AbstractEventExecutor#inEventLoop@Overridepublic boolean inEventLoop() {// 接口方法,由其子類 SingleThreadEventExecutor 實現, 所以這里調用的是 SingleThreadEventExecutor#inEventLoopreturn inEventLoop(Thread.currentThread());}
而 SingleThreadEventExecutor#inEventLoop 實現如下:
@Overridepublic boolean inEventLoop(Thread thread) {// 判斷傳入的線程是否是 SingleThreadEventExecutor 所持有的線程。return thread == this.thread;}
2. SingleThreadEventExecutor#addTask
SingleThreadEventExecutor#addTask 是將任務添加到隊列中等待執行 ,實現如下:
可以看到無論 inEventLoop() 返回什么結果, 都會將 task 添加到 SingleThreadEventExecutor 的任務隊列中。因此在某些情況下可能會存在將任務添加到錯誤的 SingleThreadEventExecutor 隊列中。
// 添加任務到任務隊列中protected void addTask(Runnable task) {ObjectUtil.checkNotNull(task, "task");if (!offerTask(task)) {// 如果添加失敗則拒絕任務reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}// 將任務添加到任務隊列return taskQueue.offer(task);}// 拒絕任務protected final void reject(Runnable task) {rejectedExecutionHandler.rejected(task, this);}
邏輯比較簡單:就是將任務添加到 taskQueue 隊列中, 當任務添加失敗時會觸發拒絕策略。
3. SingleThreadEventExecutor#startThread
SingleThreadEventExecutor#startThread 的調用前提是 SingleThreadEventExecutor#inEventLoop 返回 false。每個 SingleThreadEventExecutor 都會綁定一個本地線程,并且只會初始化一次,所以在 SingleThreadEventExecutor#startThread中會通過 state
判斷 SingleThreadEventExecutor 是否進行過初始化,如果沒有則會調用 doStartThread()
方法進行初始化。
- STATE_UPDATER 是 SingleThreadEventExecutor 內部維護的一個屬性,他的作用是標識當前 Thread 的狀態。在初始的時候,STATE_UPDATER == ST_NOT_STARTED,因此第一次調用 startThread 方法時會進入 if 語句中,便會調用 doStartThread() 方法。
- 因為可能會存在多個 Channel 注冊到同一個 NioEventLoop上,所以存在 NioEventLoop 在當前 Channel 注冊前就已經被其他 Channel 注冊的情況,此時 NioEventLoop 的 thread 就已經初始化過了,這里便不會再調用 doStartThread() 方法。
SingleThreadEventExecutor#startThread 具體代碼如下:
private void startThread() {// CAS 確保原子性if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 里面會調用 SingleThreadEventExecutor.this.run(),這個 this 就是 NioEventLoop 對象doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
下面我們來看 SingleThreadEventExecutor#doStartThread 的實現。
3.1 SingleThreadEventExecutor#doStartThread
SingleThreadEventExecutor#doStartThread 方法是 SingleThreadEventExecutor 類中的一個關鍵方法,其主要功能是啟動一個新線程來執行事件循環邏輯。該方法會將事件循環的執行封裝在一個 Runnable 任務中,并通過 executor 來執行這個任務。在執行過程中,會處理線程中斷、事件循環的執行、狀態更新、任務清理以及線程資源的釋放等操作,確保事件循環的正常啟動和優雅關閉。其代碼如下:
private void doStartThread() {// 使用 assert 語句確保 thread 為 null,即當前線程還未啟動。這是一個防御性編程的檢查,防止線程被重復啟動。assert thread == null;// 交給線程池執行一個任務executor.execute(new Runnable() {@Overridepublic void run() {// 將當前正在執行的線程賦值給 thread 變量,以便后續對該線程進行管理。thread = Thread.currentThread();// 如果 interrupted 標志為 true,則中斷當前線程if (interrupted) {thread.interrupt();}boolean success = false;// 更新最后執行時間,用于記錄事件循環的執行時間updateLastExecutionTime();try {// 調用 SingleThreadEventExecutor 類的 run 方法,開始執行事件循環邏輯。run 方法通常包含了事件處理、任務執行等核心邏輯。SingleThreadEventExecutor.this.run();// 如果事件循環執行成功,將 success 標志設置為 truesuccess = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 對狀態更新和任務清理....}});}
這里我們來看下面兩點 :
-
executor.execute
: 在 NioEventLoopGroup 構建時默認會創建一個 ThreadPerTaskExecutor,它會為每個任務創建一個新線程,線程工廠使用 newDefaultThreadFactory() 創建(在 【Netty4核心原理⑥】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】 【創建工作線程】部分有提及) -
SingleThreadEventExecutor.this.run()
:當線程創建完成后會調用 SingleThreadEventExecutor#run 方法,該方法是抽象方法,實現在 NioEventLoop#run 中。在 NioEventLoop#run 內部有一個無限循環,在循環內部中 NioEventLoop 會做兩件事:- 會通過 Java NIO 的 Selector 監聽注冊在其上的 Channel 的 I/O 事件,當 Selector 檢測到有 Channel 發生 I/O 事件時,NioEventLoop 會調用相應的 Channel 處理器來處理這些事件;
- 執行任務隊列中積累的任務,這些任務可以是用戶提交的自定義任務,也可以是內部產生的系統任務。
因為這是個無限循環,這也就意味著,這個線程將 “終生” 陷入這個循環中,這樣也就完成了這個這個 NioEventLoop 所謂的事件循環(EventLoop)
Java NIO 的 Selector 允許一個線程同時監聽多個 Channel 的 IO 事件,因此當多個 Channel 注冊到同一個 NioEventLoop 上時會同時注冊興趣事件到 NioEventLoop#selector 屬性上。
關于 NioEventLoop#run 的具體內容,本篇篇幅所限,詳參 【Netty4核心原理⑨】【揭開Bootstrap的神秘面紗 - 服務端Bootstrap?】中 【NioEventLoop#run】部分。
4. SingleThreadEventExecutor#wakeup
上面代碼在執行最后會嘗試喚醒任務,如下:
// 如果添加任務不會自動喚醒事件循環,并且 immediate 參數為 trueif (!addTaskWakesUp && immediate) {// 4. 喚醒事件循環,使其能夠立即處理任務wakeup(inEventLoop);}
這里我們看下調用前的兩個條件:
addTaskWakesUp
:由 SingleThreadEventExecutor 構造函數入參傳入,在 NioEventLoop 中默認為 false。immediate
:取值來自于!(task instanceof LazyRunnable) && wakesUpForTask(task)
,默認為 true- task 類型為 Runnable,所以
!(task instanceof LazyRunnable)
為 true wakesUpForTask(task)
方法沒有邏輯,直接返回 true。
- task 類型為 Runnable,所以
綜上,我們這里是必定會調用 wakeup(inEventLoop)
,其實現為 NioEventLoop#wakeup,其主要作用是喚醒正在阻塞的 Selector。
在 NioEventLoop#run 中會判斷如果任務隊列中沒有任務,則會調用 Selector#select 來等待 NIO 事件發生,所以這里需要再將任務添加后喚醒 Selector。關于NioEventLoop#run的內容我們在 【Netty4核心原理⑨】【揭開Bootstrap的神秘面紗 - 服務端Bootstrap?】 中會詳細介紹。
如下:
@Overrideprotected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {// 喚醒 Selectorselector.wakeup();}}
其中 nextWakeupNanos.getAndSet(AWAKE) != AWAKE
:
nextWakeupNanos
是一個 AtomicLong 類型的變量,用于記錄下一次喚醒 Selector 的時間。getAndSet(AWAKE)
方法會先獲取 nextWakeupNanos 的當前值,然后將其設置為 AWAKE。!= AWAKE
表示當前 nextWakeupNanos 的值不是 AWAKE,即 Selector 處于阻塞狀態,需要被喚醒。
六、總結
上面的分析有點零碎,我們這里來一個完整總結。
1. 服務端啟動
-
服務端啟動引導(ServerBootstrapbootstrap )創建與配置
-
創建 ServerBootstrapbootstrap 對象:
- 首先會創建一個Bootstrap實例,它是 Netty 服務端的啟動引導類。例如:
ServerBootstrapbootstrap = new ServerBootstrap();。
- 首先會創建一個Bootstrap實例,它是 Netty 服務端的啟動引導類。例如:
-
設置線程模型(EventLoopGroup):
- 為 ServerBootstrap 配置 EventLoopGroup。通常來說,我們基于 主從 Reactor 多線程模型 會指定兩個 NioEventLoopGroup (bossGroup 和 workerGroup) 來處理服務端網絡事件和客戶端連接后的事件。
NioEventLoopGroup 內部會根據指定的線程數量通過 NioEventLoopGroup#newChild 方法來循環創建 NioEventLoop。每個 NioEventLoop 內部都與一個 JVM 本地線程綁定,因此 NioEventLoop實際上是一個單線程的執行器(Executor),它會在一個循環中不斷地獲取和處理事件。這個循環被稱為事件循環(Event Loop),這也是NioEventLoop名字的由來。
- 為 ServerBootstrap 配置 EventLoopGroup。通常來說,我們基于 主從 Reactor 多線程模型 會指定兩個 NioEventLoopGroup (bossGroup 和 workerGroup) 來處理服務端網絡事件和客戶端連接后的事件。
-
指定通道類型(Channel):
-
通過
bootstrap.channel(NioServerSocketChannel.class)
指定服務端使用的通道類型為 NioServerSocketChannel(基于 NIO 的套接字通道)。不同的通道類型適用于不同的網絡協議和傳輸方式。bootstrap.channel(NioServerSocketChannel.class) 是記錄當前使用的 Channel 類型,在 Bootstrap#bind方法中會通過反射創建 NioServerSocketChannel對象。
-
-
添加處理器(ChannelHandler)到ChannelPipeline:
-
使用handler方法添加一個ChannelInitializer,它是一個特殊的ChannelHandler,用于在Channel初始化時向ChannelPipeline添加其他ChannelHandler。例如:
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 在這里添加各種ChannelHandler,如編解碼器、業務邏輯處理器等ch.pipeline().addLast(new SomeDecoder());ch.pipeline().addLast(new MyBusinessHandler());ch.pipeline().addLast(new SomeEncoder());} });
ChannelInitializer#initChannel 方法只會執行一遍,當方法執行結束后會將其從 ChannelPipeline 中 移除,通過 ChannelPipeline#addLast 方法將用戶添加的自定義的 ChannelHandler 會在 ChannelPipeline內部形成一個雙向鏈表。
-
ChannelPipeline是一個由多個ChannelHandler組成的管道,用于處理Channel上的事件。每個ChannelHandler負責處理特定類型的事件或者對數據進行特定的操作,數據會按照添加的順序在這些ChannelHandler之間流動。
ChannelPipeline#addLast 方法會將添加的 ChannelHandler 包裝成 AbstractChannelHandlerContext,并會根據其實現的方法判斷當前 Handler 支持處理哪些事件,當對應事件來臨時會找到可以處理該事件的 Handler 并調用。
-
-
-
在上面對 ServerBootstrap 的基礎配置完成后會調用
bootstrap.bind(port)
方法完成服務端的端口綁定與監聽,在這個方法首先會調 AbstractBootstrap#initAndRegister 通過反射創建一個 NioServerSocketChannel 實例,并且會通過config().group().register(channel)
將NioServerSocketChannel 注冊到 NioEventLoop 上。如下:// 省略部分邏輯final ChannelFuture initAndRegister() {...// 反射創建 Channelchannel = channelFactory.newChannel();// 該方法的實現在 ServerBootstrap 中init(channel);...// 將 Channel 注冊到一個 NioEventLoop 上ChannelFuture regFuture = config().group().register(channel);...return regFuture;}
channelFactory.newChannel();
會通過工廠類通過反射創建一個 Channel,Channel 的類型是我們通過 AbstractBootstrap#channel 方法指定的類型,即 NioServerSocketChannel 類型。init(channel)
方法的實現在 ServerBootstrap 中,目的是完成 Channel創建后的一些初始化操作,在這里會往當前服務端 Channel 的 Pipeline 添加一個 ServerBootstrapAcceptor 對象,此時 Channel 的 Pipeline 鏈表指向是 :Head -> ServerBootstrapAcceptor -> Tail
config().group()
獲取到的是bossGroup
,config().group().register(channel)
調用的是 NioEventLoopGroup#register 方法,該方法會從bossGroup
中根據規則選擇出一個 NioEventLoop ,因此這里config().group().register(channel)
調用的就是 NioEventLoop#register。
-
NioEventLoop#register 中會調用到 AbstractChannel.AbstractUnsafe#register(在 【Netty4核心原理⑦】【揭開Bootstrap的神秘面紗 - 客戶端Bootstrap ?】== 中的【 AbstractChannel.AbstractUnsafe#register】有所提及)AbstractChannel.AbstractUnsafe#register 中存在如下邏輯:
// AbstractChannel.AbstractUnsafe#register 部分代碼... if (eventLoop.inEventLoop()) {register0(promise);} else {...// 調用 SingleThreadEventExecutor#executeeventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}...
- AbstractChannel.AbstractUnsafe#register 方法會通過
eventLoop.inEventLoop()
判斷 當前線程是否是 SingleThreadEventExecutor#thread 線程。由于我們這里是服務端的啟動過程,所以 SingleThreadEventExecutor#thread 為空,因此eventLoop.inEventLoop()
為 false- Netty 采用單線程模型來處理 Channel 的 I/O 操作和相關任務。每個 EventExecutor 通常對應一個線程,這個線程負責執行注冊到該 EventExecutor 上的 Channel 的 I/O 事件處理邏輯以及任務隊列中的任務。
- 在 SingleThreadEventExecutor 類中,定義了一個
private volatile Thread thread
成員變量,用于存儲與該 EventExecutor 關聯的線程對象。 inEventLoop()
為 false 的情況有兩種:- SingleThreadEventExecutor#thread 為空 :這種情況說明 SingleThreadEventExecutor 綁定的線程還沒初始化。
- SingleThreadEventExecutor#thread 不為空,但是并不等于當前線程:這種情況說明當前調用線程并非是 SingleThreadEventExecutor#thread 線程,
- 當
eventLoop.inEventLoop()
為 false 時會調用會調用eventLoop.execute
。eventLoop.execute
方法會將register0(promise);
封裝成一個 Runnable 添加到 SingleThreadEventExecutor#taskQueue 任務隊列中。然后判斷 NioEventLoop#thread 是否已經初始化(我們這個場景沒有初始化),如果沒有初始化則會通過 SingleThreadEventExecutor#doStartThread 新建一個線程thread
并啟動,然后將其賦值給 NioEventLoop#thread,確保 NioEventLoop#thread 只會初始化一次。
// SingleThreadEventExecutor#doStartThread 簡化代碼如下private void doStartThread() {assert thread == null;// 通過線程工廠創建一個線程executor.execute(new Runnable() {@Overridepublic void run() {...// 賦值給 SingleThreadEventExecutor#threadthread = Thread.currentThread();...// 調用 NioEventLoop#run 方法SingleThreadEventExecutor.this.run();...});}
- 新建的
thread
啟動時會執行SingleThreadEventExecutor.this.run();
來調用 NioEventLoop#run 方法,在 NioEventLoop#run 方法中存在一個死循環(事件循環),循環中的邏輯是監聽 Channel 事件 和 處理 SingleThreadEventExecutor#taskQueue 任務隊列中的任務。也就是說 NioEventLoop#thread 創建就 “終身” 陷在 “事件循環” 中不停的執行如下邏輯 :
- 會通過 Java NIO 的 Selector 監聽注冊在其上的 Channel 的 I/O 事件,當 Selector 檢測到有 Channel 發生 I/O 事件時,NioEventLoop 會調用相應的 Channel 處理器來處理這些事件;
- 執行任務隊列 taskQueue 中積累的任務,這些任務可以是用戶提交的自定義任務,也可以是內部產生的系統任務。
- 在 NioEventLoop#thread 處理任務隊列任務的時候,會將
register0(promise);
任務從隊列中取出,并在 NioEventLoop#thread 線程內部執行。這既是 Netty 的 無鎖化的串行理念。register0(promise);
中不僅會完成將當前的服務端 Channel 注冊到 Java NIO 的 Selector 上。在注冊前后會調用一些前后擴展邏輯。如在注冊后會調用 DefaultChannelPipeline#fireChannelRegistered 觸發 Channel 的 ChannelPipeline 中的 ChannelInboundHandler#channelRegistered 方法。
- AbstractChannel.AbstractUnsafe#register 方法會通過
-
至此,我們知道了 AbstractBootstrap#initAndRegister 會創建一個 Channel ,并注冊到 NioEventLoop 上,而在注冊過程中會為 NioEventLoop 創建一個線程與 NioEventLoop 綁定,并這個線程在 無限循環 中完成 監聽處理 Selector 上的 Channel 事件和處理任務隊列中的任務兩件事。
-
回到 AbstractBootstrap#doBind 中,當 AbstractBootstrap#initAndRegister 執行結束后會調用 AbstractBootstrap#doBind0 方法完成方法綁定,而在 AbstractBootstrap#doBind0 方法中則是調用
channel.eventLoop().execute
來執行channel.bind(localAddress, promise)
邏輯。 最終會會調用到 io.netty.channel.Channel.Unsafe#bind 方法來完成端口的綁定。- 這里的
channel.eventLoop().execute
調用的也是 SingleThreadEventExecutor#execute,方法,而在 AbstractBootstrap#initAndRegister 方法中我們已經對 channel 綁定的 NioEventLoop 做了線程初始化,這里調用的 SingleThreadEventExecutor#execute 方法中并不會再創建新的線程。 - 這里也是將
channel.bind(localAddress, promise)
封裝成一個 Runnable 任務添加到任務隊列 taskQueue 中,等待channel.eventLoop()
在 “事件循環” 中從任務隊列中將其取出并執行。 - 這里執行
channel.bind(localAddress, promise)
的線程是當前 Channel 綁定的 NioEventLoop 持有的線程。
- 這里的
// 入參// regFuture : 保存了 Channel 注冊結果// channel : 注冊的 channel// localAddress : channel 將要綁定的本機地址// promise :保存 channel 與 localAddress 的綁定結果private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 下面的注釋指出該方法在 channelRegistered() 被觸發之前調用,目的是給用戶自定義的處理器(user handlers)提供機會,使其能夠在 channelRegistered() 方法實現中對 ChannelPipeline 進行相關設置。channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 1. 如果Channel 注冊操作成功,則嘗試進行綁定操作,將 Channel 綁定到指定的本地地址 localAddress,并傳入 ChannelPromise(promise)用于跟蹤綁定操作的結果channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {// Channel 注冊失敗,直接將失敗結果保存等待返回promise.setFailure(regFuture.cause());}}});}
- 至此,服務端的啟動過程大體就已經完成,可以得知:服務端在啟動時會創建了一個 ServerSocketChannel 并在注冊到 bossGroup 中的某個 NioEventLoop 上,這個 NioEventLoop 會將其綁定到指定的端口上。綁定成功后,ServerSocketChannel 開始監聽客戶端的連接請求。當有新的連接到來時,bossGroup 中的線程會接受這個連接,并將新連接的 SocketChannel 注冊到 workerGroup 中的某個 EventLoop 上。
2. 客戶端連接
服務端啟動后,客戶端就可以發起連接客戶端的請求了,當客戶端發起連接請求時會與服務端端口建立 TCP 連接,我們在上面提到過,服務端在啟動后會將服務端的 NioServerSocketChannel 注冊到一個 NioEventLoop上,這個 NioEventLoop 會調用 run 方法,在這個方法中會通過 Java NIO 的 Selector 不斷監聽事件。因此,當客戶端發起連接時,NioEventLoop#run 方法中就會監聽到。
-
在 NioEventLoop#run 的事件循環過程中,服務端監聽到事件后后會通過 NioEventLoop#processSelectedKeys 來處理興趣事件,隨后會調用到 NioEventLoop#processSelectedKey 方法來處理事件,在這個方法中會判斷事件類型(OP_CONNECT、OP_WRITE、OP_READ、OP_ACCEPT),我們在上面分析代碼時說過服務端這邊關心的事件類型是 OP_ACCEPT,而這里遇到 OP_ACCEPT 事件后會調用AbstractNioMessageChannel.NioMessageUnsafe#read 來處理。
客戶端只有一種情況:
- 客戶端啟動時會與服務端創建連接,過程中會創建 NioSocketChannel 類型的 Channel,此時創建的 Unsafe 類型為 NioSocketChannelUnsafe。此時的興趣事件是 OP_READ
服務端有兩種情況:
- 服務端自身啟動時,綁定端口時會創建 NioServerSocketChannel 類型,此時會創建 NioMessageUnsafe 類型。此時的興趣事件是 OP_ACCEPT
- 當服務端啟動后,客戶端連接服務端時,服務端會為每個客戶端創建 Channel,此時連接的客戶端的 Channel 類型是 NioSocketChannel,因此此時會為每個客戶端 Channel 創建的 Unsafe 類型為 NioSocketChannelUnsafe。
綜上:NioMessageUnsafe 用來處理 OP_ACCEPT 事件,NioSocketChannelUnsafe 用來處理 OP_READ 事件。(
關于 NioMessageUnsafe 和 NioSocketChannelUnsafe 的分析詳參 【Netty4核心原理⑨】【揭開Bootstrap的神秘面紗 - 服務端Bootstrap?】 的【unsafe.read()】部分) -
AbstractNioMessageChannel.NioMessageUnsafe#read 中會通過 AbstractNioMessageChannel#doReadMessages 方法讀取連接消息,同時會獲取客戶端連接的 Channel ,并封裝成一個 NioSocketChannel 對象, 如下:
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {...// 獲取客戶端連接 ChannelSocketChannel ch = SocketUtils.accept(javaChannel());...// 封裝成 NioSocketChannel保存到 buf 中供后面獲取if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}...return 0;}
-
之后 AbstractNioMessageChannel.NioMessageUnsafe#read 會通過
pipeline.fireChannelRead(readBuf.get(i));
觸發 ChannelInboundHandler 的 ChannelRead 方法,這里便會觸發到 服務端的 ServerBootstrap.ServerBootstrapAcceptor#channelRead 方法到目前為止還是處于服務端的邏輯,執行線程都是服務端 Channel 綁定的 NioEventLoop 所持有的 Thread。
-
ServerBootstrap.ServerBootstrapAcceptor#channelRead 方法中存在如下邏輯:
- 代碼
child.pipeline().addLast(childHandler);
會將childHandler
(自定義的 ChannelInitializer )添加到當前 Channel (客戶端 Channel)的 Pipeline 中 ,此時客戶端 Channel 的 ChannelPipeline 雙向鏈表的結構是Head -> ChannelInitializer -> Tail
- 代碼
childGroup.register(child)
會從 workerGroup 中選擇一個 NioEventLoop 并將 客戶端 Channel 注冊到這個 NioEventLoop 中,而在注冊過程中會調用 AbstractChannel.AbstractUnsafe#register,這里就與服務端 Channel 注冊的邏輯相同了,簡單來說就是會初始化 NioEventLoop 中的 thread (如果需要的話),并啟動這個線程并調用SingleThreadEventExecutor.this.run();
使得這個線程陷入 “事件循環”中。這樣這個 NioEventLoop 就會在 “事件循環” 中不斷監聽當前客戶端 Channel 的事件并分發處理以及處理 NioEventLoop 中的任務一個 NioEventLoop 可能會對應多個 Channel,因此如果這個 NioEventLoop 已經綁定過 Channel ,那么說明這個 NioEventLoop 的 thread 已經初始化完成了,那么此時就不會再初始化 thread,那么當前所需要做的就是將 客戶端 Channel 注冊到 NioEventLoop 的 Selector 上,因為Java NIO Selector 允許一個線程監聽多個 Channel,所以一個 NioEventLoop 可以處理多個Channel。
- 這樣就完成了 客戶端 Channel 注冊代 NioEventLoop 的邏輯,這個 NioEventLoop 會一直關注 客戶端 Channel 的信息,當這個 Channel 有事件發生時 NioEventLoop 便可以及時響應處理,并分發到 ChannelPipeline 中對應事件的 ChannelHandler 中處理。
這里客戶端類型是 NioSocketChannel 類型,持有的 Unsafe 類型是 NioSocketChannelUnsafe,關心的事件類型是 OP_READ,因此當客戶端發送消息時,OP_READ 事件會就緒,NioEventLoop 中的事件循環會感知到并調用 NioSocketChannelUnsafe#read 來處理, NioSocketChannelUnsafe#read 與 NioMessageUnsafe#read 的邏輯并不相同,NioSocketChannelUnsafe#read 會從SocketChannel 中讀取數據并進行相應的處理和事件通知。
- 代碼
七、參考內容
- 《Netty4核心原理》
- 豆包