1.關于Netty客戶端連接接入問題整理
一.Netty是在哪里檢測有新連接接入的?
答:boss線程第一個過程輪詢出ACCEPT事件,然后boss線程第二個過程通過JDK底層Channel的accept()方法創建一條連接。
二.新連接是怎樣注冊到NioEventLoop線程的?
答:boss線程調用chooser的next()方法拿到一個NioEventLoop,然后將新連接注冊到NioEventLoop的Selector上。
2.Reactor線程模型和服務端啟動流程
(1)Netty中的Reactor線程模型
Netty中最核心的是兩種類型的Reactor線程,這兩種類型的Reactor線程可以看作Netty中的兩組發動機,驅動著Netty整個框架的運轉。一種類型是boss線程,專門用來接收新連接,然后將連接封裝成Channel對象傳遞給worker線程。另一種類型是worker線程,專門用來處理連接上的數據讀寫。
boss線程和worker線程所做的事情均分為3步。第一是輪詢注冊在Selector上的IO事件,第二是處理IO事件,第三是執行異步任務。對boss線程來說,第一步輪詢出來的基本都是ACCEPT事件,表示有新的連接。對worker線程來說,第一步輪詢出來的基本都是READ事件或WRITE事件,表示網絡的讀寫。
(2)服務端啟動流程
服務端是在用戶線程中開啟的,通過ServerBootstrap.bind()方法,在第一次添加異步任務的時候啟動boss線程。啟動之后,當前服務器就可以開啟監聽。
3.Netty新連接接入的整體處理邏輯
新連接接入的處理總體就是:檢測新連接 + 注冊Reactor線程,具體就可以分為如下4個過程。
一.檢測新連接
服務端Channel對應的NioEventLoop會輪詢該Channel綁定的Selector中是否發生了ACCEPT事件,如果是則說明有新連接接入了。
二.創建NioSocketChannel
檢測出新連接之后,便會基于JDK NIO的Channel創建出一個NioSocketChannel,也就是客戶端Channel。
三.分配worker線程及注冊Selector
接著Netty給客戶端Channel分配一個NioEventLoop,也就是分配worker線程。然后把這個客戶端Channel注冊到這個NioEventLoop對應的Selector上,之后這個客戶端Channel的讀寫事件都會由這個NioEventLoop進行處理。
四.向Selector注冊讀事件
最后向這個客戶端Channel對應的Selector注冊READ事件,注冊的邏輯和服務端Channel啟動時注冊ACCEPT事件的一樣。
4.新連接接入之檢測新連接
(1)何時會檢測到有新連接
當調用輔助啟動類ServerBootstrap的bind()方法啟動服務端之后,服務端的Channel也就是NioServerSocketChannel就會注冊到boss的Reactor線程上。boss的Reactor線程會不斷檢測是否有新的事件,直到檢測出有ACCEPT事件發生即有新連接接入。此時boss的Reactor線程將通過服務端Channel的unsafe變量來進行實際操作。
注意:服務端Channel的unsafe變量是一個NioMessageUnsafe對象,客戶端Channel的unsafe變量是一個NioByteUnsafe對象。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {Selector selector;private SelectedSelectionKeySet selectedKeys;private boolean needsToSelectAgain;private int cancelledKeys;...@Overrideprotected void run() {for (;;) {...//1.調用select()方法執行一次事件輪詢select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}...//2.處理產生IO事件的ChannelneedsToSelectAgain = false;processSelectedKeys();...//3.執行外部線程放入TaskQueue的任務runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}private void processSelectedKeys() {if (selectedKeys != null) {//selectedKeys.flip()會返回一個數組processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {//1.首先取出IO事件final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//Help GC//2.然后獲取對應的Channel和處理該Channel//默認情況下,這個a就是NioChannel,也就是服務端啟動時經過Netty封裝的Channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {//網絡事件的處理processSelectedKey(k, (AbstractNioChannel) a);} else {//NioTask主要用于當一個SelectableChannel注冊到Selector時,執行的一些任務NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}//3.最后判斷是否應該再進行一次輪詢if (needsToSelectAgain) {for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();//selectedKeys.flip()會返回一個數組selectedKeys = this.selectedKeys.flip();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {//If the channel implementation throws an exception because there is no event loop, //we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch.return;}//Only close ch if ch is still registerd to this EventLoop. //ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process, //but the channel is still healthy and should not be closed.if (eventLoop != this || eventLoop == null) {return;}//close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise//the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop//boss的Reactor線程已經輪詢到有ACCEPT事件,即表明有新連接接入//此時將調用Channel的unsafe變量來進行實際操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//進行新連接接入處理unsafe.read();if (!ch.isOpen()) {//Connection already closed - no need to handle write.return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}...
}
(2)新連接接入的流程梳理
一.NioMessageUnsafe的read()方法說明
首先使用一條斷言確保該read()方法必須來自Reactor線程調用,然后獲得Channel對應的Pipeline和RecvByteBufAllocator.Handle。
接著調用NioServerSocketChannel的doReadMessages()方法不斷地讀取新連接到readBuf容器。然后使用for循環處理readBuf容器里的新連接,也就是通過pipeline.fireChannelRead()方法讓每個新連接都經過一層服務端Channel的Pipeline邏輯處理,最后清理容器并執行pipeline.fireChannelReadComplete()。
//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {//臨時存放讀到的連接NioSocketChannelprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {//斷言確保該read()方法必須來自Reactor線程調用assert eventLoop().inEventLoop();//獲得Channel對應的Pipelinefinal ChannelPipeline pipeline = pipeline();//獲得Channel對應的RecvByteBufAllocator.Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();do {//1.調用NioServerSocketChannel的doReadMessages()方法創建NioSocketChannel//通過JDK的accept()方法去創建JDK Channel,然后把它包裝成Netty自定義的Channelint localRead = doReadMessages(readBuf);if (localRead == 0) {break;}} while (allocHandle.continueReading());//控制連接的接入速率,默認一次性讀取16個連接//2.設置并綁定NioSocketChannelint size = readBuf.size();for (int i = 0; i < size; i ++) {pipeline.fireChannelRead(readBuf.get(i));}//3.清理容器并觸發pipeline.fireChannelReadComplete()readBuf.clear();pipeline.fireChannelReadComplete();}}//Read messages into the given array and return the amount which was read.protected abstract int doReadMessages(List<Object> buf) throws Exception;...
}
二.新連接接入的流程梳理
首先會從服務端Channel對應的NioEventLoop的run()方法的第二個步驟處理IO事件開始。然后會調用服務端Channel的unsafe變量的read()方法,也就是NioMessageUnsafe對象的read()方法。
接著循環調用NioServerSocketChannel的doReadMessages()方法來創建新連接對象NioSocketChannel。其中創建新連接對象最核心的方法就是調用JDK Channel的accept()方法來創建JDK Channel。
與服務端啟動一樣,Netty會把JDK底層Channel包裝成Netty自定義的NioSocketChannel。
NioEventLoop.processSelectedKeys(key, channel) //入口NioMessageUnsafe.read() //新連接接入處理NioServerSocketChannel.doReadMessages() //創建新連接對象NioSocketChanneljavaChannel.accept() //創建JDK Channel
(3)新連接接入的總結
在服務端Channel對應的NioEventLoop的run()方法的processSelectedKeys()方法里,發現產生的IO事件是ACCEPT事件之后,會通過JDK Channel的accept()方法取創建JDK的Channel,并把它包裝成Netty自定義的NioSocketChannel。在這個過程中會通過一個RecvByteBufAllocator.Handle對象控制連接接入的速率,默認一次性讀取16個連接。
5.新連接接入之創建NioSocketChannel
(1)doReadMessages()方法相關說明
首先通過javaChannel().accept()創建一個JDK的Channel,即客戶端Channel。然后把服務端Channel和這個客戶端Channel作為參數傳入NioSocketChannel的構造方法中,從而把JDK的Channel封裝成Netty自定義的NioSocketChannel。最后把封裝好的NioSocketChannel添加到一個List里,以便外層可以遍歷List進行處理。
//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.
public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();private final ServerSocketChannelConfig config;...@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {//1.創建JDK的ChannelSocketChannel ch = javaChannel().accept();//2.封裝成Netty的Channel,即把服務端Channel和客戶端Channel當作參數傳遞到NioSocketChannel的構造方法里if (ch != null) {//先創建一個NioSocketChannel對象,再添加到buf里buf.add(new NioSocketChannel(this, ch));return 1;}return 0;}//Create a new instancepublic NioServerSocketChannel() {//創建服務端Channelthis(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {//創建服務端Channelreturn provider.openServerSocketChannel();}//Create a new instance using the given ServerSocketChannel.public NioServerSocketChannel(ServerSocketChannel channel) {//創建服務端Channel,關注ACCEPT事件super(null, channel, SelectionKey.OP_ACCEPT);//javaChannel().socket()會調用JDK Channel的socket()方法config = new NioServerSocketChannelConfig(this, javaChannel().socket());} @Overrideprotected ServerSocketChannel javaChannel() {//返回一個JDK的Channel -> ServerSocketChannelreturn (ServerSocketChannel) super.javaChannel();}...
}//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...//創建服務端Channelprotected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();}...
}//SocketChannel which uses NIO selector based implementation.
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {private final SocketChannelConfig config;...//Create a new instance//@param parent,the Channel which created this instance or null if it was created by the user//@param socket,the SocketChannel which will be usedpublic NioSocketChannel(Channel parent, SocketChannel socket) {//創建客戶端Channelsuper(parent, socket);config = new NioSocketChannelConfig(this, socket.socket());}@Overrideprotected SocketChannel javaChannel() {//返回一個JDK的Channel -> ServerSocketChannelreturn (SocketChannel) super.javaChannel();}private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {super(channel, javaSocket);}...}...
}//The default SocketChannelConfig implementation.
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {protected final Socket javaSocket;//Creates a new instance.public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {...this.javaSocket = javaSocket;setTcpNoDelay(true);//禁止Nagle算法...}...
}//AbstractNioChannel base class for Channels that operate on bytes.
public abstract class AbstractNioByteChannel extends AbstractNioChannel {...//Create a new instance//@param parent,the parent Channel by which this instance was created. May be null//@param ch,the underlying SelectableChannel on which it operatesprotected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {//創建客戶端Channel,關注READ事件super(parent, ch, SelectionKey.OP_READ);}@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioByteUnsafe();}...
}//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {private final SelectableChannel ch;protected final int readInterestOp;...//Create a new instance//@param parent,the parent Channel by which this instance was created. May be null//@param ch,the underlying SelectableChannel on which it operates//@param readInterestOp,the ops to set to receive data from the SelectableChannelprotected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;ch.configureBlocking(false);...}protected SelectableChannel javaChannel() {return ch;}@Overridepublic NioUnsafe unsafe() {return (NioUnsafe) super.unsafe();}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final Channel parent;private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;...//Creates a new instance.//@param parent,the parent of this channel. null if there's no parent.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}//Returns a new DefaultChannelId instance. //Subclasses may override this method to assign custom ChannelIds to Channels that use the AbstractChannel#AbstractChannel(Channel) constructor.protected ChannelId newId() {return DefaultChannelId.newInstance();}//Create a new AbstractUnsafe instance which will be used for the life-time of the Channelprotected abstract AbstractUnsafe newUnsafe();//Returns a new DefaultChannelPipeline instance.protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}@Overridepublic Unsafe unsafe() {return unsafe;}@Overridepublic ChannelPipeline pipeline() {return pipeline;}@Overridepublic EventLoop eventLoop() {EventLoop eventLoop = this.eventLoop;if (eventLoop == null) throw new IllegalStateException("channel not registered to an event loop");return eventLoop;}protected abstract class AbstractUnsafe implements Unsafe {@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//綁定事件循環器,即綁定一個NioEventLoop到該Channel上AbstractChannel.this.eventLoop = eventLoop;//注冊Selector,并啟動一個NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通過啟動這個NioEventLoop線程來調用register0()方法將這個服務端Channel注冊到Selector上//其實執行的是SingleThreadEventExecutor的execute()方法eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}...}...
}
(2)創建NioSocketChannel的流程梳理
NioServerSocketChannel和NioSocketChannel都有同一個父類AbstractNioChannel,所以創建NioSocketChannel的模版和創建NioServerSocketChannel保持一致。
但要注意的是:客戶端Channel是通過new關鍵字創建的,服務端Channel是通過反射的方式創建的。
此外,Nagle算法會讓小數據包盡量聚合成大的數據包再發送出去,Netty為了使數據能夠及時發送出去會禁止該算法。
new NioSocketChannel(p, ch) //入口,客戶端Channel是通過new關鍵字創建的,服務端Channel是通過反射的方式創建的new AbstractNioByteChannel(p, ch) //逐層調用父類的構造方法new AbstractNioChannel(p, ch, op_read) //逐層調用父類的構造方法ch.configureBlocking(false) + save op //配置此Channel為非阻塞,以及將感興趣的讀事件保存到成員變量以方便后續注冊到Selector new AbstractChannel() //創建Channel的相關組件:newId() //id作為Channel的唯一標識newUnsafe() //unsafe用來進行底層數據讀寫newChannelPipeline() //pipeline作為業務邏輯載體new NioSocketChannelConfig() //創建和NioSocketChannel綁定的配置類setTcpNoDelay(true) //禁止Nagle算法
(3)創建NioSocketChannel的總結
創建NioSocketChannel的邏輯可以分成兩部分。
第一部分是逐層調用父類的構造方法,其中會設置這個客戶端Channel的阻塞模式為false,然后再把感興趣的讀事件OP_READ保存到這個Channel的成員變量中以便后續注冊到Selector,接著會創建一系列的組件,包括作為Channel唯一標識的Id組件、用來進行底層數據讀寫的unsafe組件、用來作為業務邏輯載體的pipeline組件。
第二部分是創建和這個客戶端Channel相關的config對象,該config對象會設置關閉Nagle算法,從而讓小數據包盡快發送出去、降低延時。
(4)Netty中的Channel分類
說明一:
Channel繼承Comparable表示Channel是一個可以比較的對象。
說明二:
Channel繼承AttributeMap表示Channel是一個可以綁定屬性的對象,我們經常在代碼中使用channel.attr(...)來給Channel綁定屬性,其實就是把屬性設置到AttributeMap中。
說明三:
AbstractChannel用來實現Channel的大部分方法,在AbstractChannel的構造方法中會創建一個Channel對象所包含的基本組件,這里的Channel通常是指SocketChannel和ServerSocketChannel。
說明四:
AbstractNioChannel繼承了AbstractChannel,然后通過Selector處理一些NIO相關的操作。比如它會保存JDK底層SelectableChannel的引用,并且在構造方法中設置Channel為非阻塞模式。注意:設置非阻塞模式是NIO編程必須的。
說明五:
Netty的兩大Channel是指:服務端的NioServerSocketChannel和客戶端NioSocketChannel,分別對應著服務端接收新連接的過程和服務端新連接讀寫數據的過程。
說明六:
服務端Channel和客戶端Channel的區別是:服務端Channel通過反射方式創建,客戶端Channel通過new關鍵字創建。服務端Channel注冊的是ACCEPT事件,對應接收新連接。客戶端Channel注冊的是READ事件,對應新連接讀寫。服務端Channel和客戶端Channel底層都會依賴一個unsafe對象,這個unsafe對象會用來實現這兩種Channel底層的數據讀寫操作。對于讀操作,服務端的讀是讀一條連接doReadMessages(),客戶端的讀是讀取數據doReadBytes()。最后每一個Channel都會綁定一個ChannelConfig,每一個ChannelConfig都會實現Channel的一些配置。
6.新連接接入之綁定NioEventLoop線程
(1)將新連接綁定到Reactor線程的入口
創建完NioSocketChannel后,接下來便要對NioSocketChannel進行一些設置,并且需要將它綁定到一個正在執行的Reactor線程中。
NioMessageUnsafe.read()方法里的readBuf容器會承載著所有新建的連接,如果某個時刻Netty輪詢到多個連接,那么通過使用for循環就可以批量處理這些NioSocketChannel連接。
處理每個NioSocketChannel連接時,是通過NioServerSocketChannel的pipeline的fireChannelRead()方法來處理的。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {Selector selector;private SelectedSelectionKeySet selectedKeys;private boolean needsToSelectAgain;private int cancelledKeys;...@Overrideprotected void run() {for (;;) {...//1.調用select()方法執行一次事件輪詢select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}...//2.處理產生IO事件的ChannelneedsToSelectAgain = false;processSelectedKeys();...//3.執行外部線程放入TaskQueue的任務runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}private void processSelectedKeys() {if (selectedKeys != null) {//selectedKeys.flip()會返回一個數組processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {//1.首先取出IO事件final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//Help GC//2.然后獲取對應的Channel和處理該Channel//默認情況下,這個a就是NioChannel,也就是服務端啟動時經過Netty封裝的Channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {//網絡事件的處理processSelectedKey(k, (AbstractNioChannel) a);} else {//NioTask主要用于當一個SelectableChannel注冊到Selector時,執行的一些任務NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}//3.最后判斷是否應該再進行一次輪詢if (needsToSelectAgain) {for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();//selectedKeys.flip()會返回一個數組selectedKeys = this.selectedKeys.flip();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();...try {int readyOps = k.readyOps();//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise//the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop//boss的Reactor線程已經輪詢到有ACCEPT事件,即表明有新連接接入//此時將調用Channel的unsafe變量來進行實際操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//進行新連接接入處理unsafe.read();if (!ch.isOpen()) {//Connection already closed - no need to handle write.return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}...
}//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {//臨時存放讀到的連接NioSocketChannelprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {//斷言確保該read()方法必須來自Reactor線程調用assert eventLoop().inEventLoop();//獲得Channel對應的Pipelinefinal ChannelPipeline pipeline = pipeline();//獲得Channel對應的RecvByteBufAllocator.Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();do {//1.調用NioServerSocketChannel的doReadMessages()方法創建NioSocketChannel//通過JDK的accept()方法去創建JDK Channel,然后把它包裝成Netty自定義的Channelint localRead = doReadMessages(readBuf);if (localRead == 0) {break;}} while (allocHandle.continueReading());//控制連接的接入速率,默認一次性讀取16個連接//2.設置并綁定NioSocketChannelint size = readBuf.size();for (int i = 0; i < size; i ++) {//調用DefaultChannelPipeline的fireChannelRead()方法//開始處理每個NioSocketChannel連接pipeline.fireChannelRead(readBuf.get(i));}//3.清理容器并觸發DefaultChannelPipeline的fireChannelReadComplete()方法readBuf.clear();//結束處理每個NioSocketChannel連接pipeline.fireChannelReadComplete();}}//Read messages into the given array and return the amount which was read.protected abstract int doReadMessages(List<Object> buf) throws Exception;...
}
(2)服務端Channel的Pipeline介紹
在Netty的各種類型的Channel中,都會包含一個Pipeline。Pipeline可理解為一條流水線,流水線有起點有結束,中間還會有各種各樣的流水線關卡。對Channel的處理會在流水線的起點開始,然后經過各個流水線關卡的加工,最后到達流水線的終點結束。
流水線Pipeline的開始是HeadContext,結束是TailContext。HeadContext中會調用Unsafe進行具體的操作,TailContext中會向用戶拋出流水線Pipeline中未處理異常和未處理消息的警告。
在服務端的啟動過程中,Netty會給服務端Channel自動添加一個Pipeline處理器ServerBootstrapAcceptor,并且會將用戶代碼中設置的一系列參數傳入到這個ServerBootstrapAcceptor的構造方法中。
服務端Channel的Pipeline如下所示:
所以服務端Channel的Pipeline在傳播ChannelRead事件時首先會從HeadContext處理器開始,然后傳播到ServerBootstrapAcceptor處理器,最后傳播到TailContext處理器結束。
(3)服務端Channel默認的Pipeline處理器
首先,服務端啟動時會給服務端Channel的Pipeline添加一個ServerBootstrapAcceptor處理器。
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...@Overridevoid init(Channel channel) throws Exception {//1.設置服務端Channel的Option與Attrfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}//2.設置客戶端Channel的Option與Attrfinal EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}//3.配置服務端啟動邏輯ChannelPipeline p = channel.pipeline();//p.addLast()用于定義服務端啟動過程中需要執行哪些邏輯p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {//一.添加用戶自定義的Handler,注意這是handler,而不是childHandlerfinal ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) pipeline.addLast(handler);//二.添加一個特殊的Handler用于接收新連接//自定義的childHandler會作為參數傳入連接器ServerBootstrapAcceptorch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//調用DefaultChannelPipeline的addLast()方法pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final DefaultChannelPipeline pipeline;...//Creates a new instance.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}@Overridepublic ChannelPipeline pipeline() {return pipeline;}...
}//The default ChannelPipeline implementation.
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {...for (ChannelHandler h: handlers) {if (h == null) break;addLast(executor, null, h);}return this;}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);...}...}//往Pipeline中添加ChannelHandler處理器private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}...
}
然后,新連接接入調用到服務端Channel的Pipeline的fireChannelRead()方法時,便會觸發調用ServerBootstrapAcceptor處理器的channelRead()方法。最終會調用NioEventLoop的register()方法注冊這個新連接Channel,即給新連接Channel綁定一個Reactor線程。
//The default ChannelPipeline implementation.
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {//從Pipeline的第一個HeadContext處理器開始調用AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//調用AbstractChannelHandlerContext的fireChannelRead()方法ctx.fireChannelRead(msg);}@Overridepublic ChannelHandler handler() {return this;}...}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {...static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}}private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {//首先調用的是Pipeline的第一個處理器HeadContext的channelRead()方法//注意:HeadContext繼承了AbstractChannelHandlerContext((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(), msg);return this;}private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {//注意:HeadContext繼承了AbstractChannelHandlerContext//所以如果this是HeadContext,那么這里會獲取下一個節點ServerBootstrapAcceptorctx = ctx.next;} while (!ctx.inbound);return ctx;}...
}public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;...//channelRead()方法在新連接接入時被調用@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;//1.給新連接的Channel添加用戶自定義的Handler處理器//這里的childHandler其實是一個特殊的Handler: ChannelInitializerchild.pipeline().addLast(childHandler);//2.設置ChannelOption,主要和TCP連接一些底層參數及Netty自身對一個連接的參數有關for (Entry<ChannelOption<?>, Object> e: childOptions) {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}}//3.設置新連接Channel的屬性for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}//4.綁定Reactor線程//childGroup是一個NioEventLoopGroup,所以下面會調用其父類的register()方法childGroup.register(child);}...}...
}// MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {......
}//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {...@Overridepublic ChannelFuture register(Channel channel) {//最終會調用NioEventLoop的register()方法注冊這個新連接Channelreturn next().register(channel);}@Overridepublic EventLoop next() {//獲取一個NioEventLoopreturn (EventLoop) super.next();}...
}//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutorChooserFactory.EventExecutorChooser chooser;...//Create a new instance.protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);}//Create a new instance.protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {...children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);...}//創建chooserchooser = chooserFactory.newChooser(children);...}@Overridepublic EventExecutor next() {//調用chooser的next()方法獲得一個NioEventLoopreturn chooser.next();}...
}public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();private DefaultEventExecutorChooserFactory() { }@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTowEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}}
}
(4)服務端Channel處理新連接的步驟
ServerBootstrapAcceptor處理新連接的步驟:
一.給客戶端Channel添加childHandler
給客戶端Channel添加childHandler也就是將用戶自定義的childHandler添加到新連接的pipeline里。
pipeline.fireChannelRead(NioSocketChannel)最終會調用到ServerBootstrapAcceptor的channelRead()方法,而且這個channelRead()方法一上來就會把入參的msg強制轉換為Channel。
拿到新連接的Channel后就可以拿到其對應的Pipeline,這個Pipeline是在調用AbstractChannel構造方法時創建的。于是可以將用戶代碼中的childHandler添加到Pipeline中,而childHandler其實就是用戶代碼中的ChannelInitializer。所以新連接Channel的Pipeline的構成是:Head -> ChannelInitializer -> Tail。
二.設置客戶端Channel的options和attr
所設置的childOptions和childAttrs也是在用戶代碼中設置的,這些設置項最終會傳遞到ServerBootstrapAcceptor的channelRead()方法中進行具體設置。
三.選擇NioEventLoop綁定客戶端Channel
childGroup.register(child)中的childGroup就是用戶代碼里創建的workerNioEventLoopGroup。NioEventLoopGroup的register()方法會調用next()由其父類通過線程選擇器chooser返回一個NioEventLoop。所以childGroup.register(child)最終會調用到NioEventLoop的register()方法,這和注冊服務端Channel時調用config().group().register(channel)一樣。
(5)總結
服務端Channel在檢測到新連接并且創建完客戶端Channel后,會通過服務端Channel的Pipeline的一個處理器ServerBootstrapAcceptor做一些處理。這些處理包括:給客戶端Channel的Pipeline添加childHandler處理器、設置客戶端Channel的options和attrs、調用線程選擇器chooser選擇一個NioEventLoop進行綁定。綁定時會將該客戶端Channel注冊到NioEventLoop的Selector上,此時還不會關心事件。
7.新連接接入之注冊Selector和注冊讀事件
NioEventLoop的register()方法是由其父類SingleThreadEventLoop實現的,并最終調用到AbstractChannel的內部類AbstractUnsafe的register0()方法。
步驟一:注冊Selector
和服務端啟動過程一樣,先調用AbstractNioChannel的doRegister()方法進行注冊。其中javaChannel().register()會將新連接NioSocketChannel綁定到Reactor線程的Selector上,這樣后續這個新連接NioSocketChannel所有的事件都由綁定的Reactor線程的Selector來輪詢。
步驟二:配置自定義Handler
此時新連接NioSocketChannel的Pipeline中有三個Handler:Head -> ChannelInitializer -> Tail。invokeHandlerAddedIfNeeded()最終會調用ChannelInitializer的handlerAdded()方法。
步驟三:傳播ChannelRegistered事件
pipeline.fireChannelRegistered()會把新連接的注冊事件從HeadContext開始往下傳播,調用每一個ChannelHandler的channelRegistered()方法。
步驟四:注冊讀事件
接著還會傳播ChannelActive事件。傳播完ChannelActive事件后,便會繼續調用HeadContetx的readIfIsAutoRead()方法注冊讀事件。由于創建NioSocketChannel時已將SelectionKey.OP_READ的事件代碼保存到其成員變量中,所以AbstractNioChannel的doBeginRead()方法,就可以將SelectionKey.OP_READ事件注冊到Selector中完成讀事件的注冊。
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;private volatile EventLoop eventLoop;...//Creates a new instance.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}//Returns a new DefaultChannelPipeline instance.protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}//Unsafe implementation which sub-classes must extend and use.protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//綁定事件循環器,即綁定一個NioEventLoop到該Channel上AbstractChannel.this.eventLoop = eventLoop;//注冊Selector,并啟動一個NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通過啟動這個NioEventLoop線程來調用register0()方法將這個Channel注冊到Selector上eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}private void register0(ChannelPromise promise) {...boolean firstRegistration = this.neverRegistered;//1.調用JDK底層注冊Channel到Selector上doRegister();this.neverRegistered = false;this.registered = true;//2.配置自定義Handlerthis.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);//3.傳播channelRegisterd事件this.pipeline.fireChannelRegistered();//4.注冊讀事件if (isActive()) {if (firstRegistration) {//會進入這個方法,傳播完ChannelActive事件后,再注冊讀事件this.pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}...}@Overridepublic final void beginRead() {...//調用AbstractNioChannel實現的doBeginRead()方法doBeginRead();...}...}//Is called after the Channel is registered with its EventLoop as part of the register process.//Sub-classes may override this methodprotected void doRegister() throws Exception {// NOOP}//Schedule a read operation.protected abstract void doBeginRead() throws Exception;@Overridepublic Channel read() {//調用DefaultChannelPipeline的read()方法pipeline.read();return this;}...
}//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {private final SelectableChannel ch;//這是NIO中的Channelprotected final int readInterestOp;volatile SelectionKey selectionKey;...//Create a new instance//@param parent,the parent Channel by which this instance was created. May be null.//@param ch,he underlying SelectableChannel on which it operates//@param readInterestOp,the ops to set to receive data from the SelectableChannelprotected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//NioServerSocketChannel.newSocket()方法通過JDK底層創建的Channel對象會被緩存在其父類AbstractNioChannel的變量ch中//可以通過NioServerSocketChannel.javaChannel()方法獲取其父類AbstractNioChannel的變量chthis.ch = ch;this.readInterestOp = readInterestOp;...//設置Channel對象為非阻塞模式ch.configureBlocking(false);...}@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {...//首先獲取前面創建的JDK底層NIO的Channel,然后調用JDK底層NIO的register()方法,//將this也就是NioServerSocketChannel對象當作attachment綁定到JDK的Selector上;//這樣綁定是為了后續從Selector拿到對應的事件后,可以把Netty領域的Channel拿出來;//而且注冊的ops值是0,表示此時還不關注任何事件;selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;...}}protected SelectableChannel javaChannel() {return ch;}@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {//將SelectionKey.OP_READ讀事件注冊到Selector上,表示這個客戶端Channel可以處理讀事件了selectionKey.interestOps(interestOps | readInterestOp);}}...
}//The default ChannelPipeline implementation.
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...@Overridepublic final ChannelPipeline fireChannelActive() {//調用HeadContext的channelActive()方法AbstractChannelHandlerContext.invokeChannelActive(head);return this;}@Overridepublic final ChannelPipeline read() {//從TailContext開始,最終會調用到HeadContext的read()方法tail.read();return this;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;...@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();//傳播ChannelActive事件readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {//調用AbstractChannel的read()方法channel.read();}}@Overridepublic void read(ChannelHandlerContext ctx) {//調用AbstractChannel.AbstractUnsafe的beginRead()方法unsafe.beginRead();}...}
}
8.注冊Reactor線程總結
一.首先當boss Reactor線程在檢測到有ACCEPT事件之后,會創建JDK底層的Channel。
二.然后使用一個NioSocketChannel包裝JDK底層的Channel,把用戶設置的ChannelOption、ChannelAttr、ChannelHandler都設置到該NioSocketChannel中。
三.接著從worker Reactor線程組中,也就是worker NioEventLoopGroup中,通過線程選擇器chooser選擇一個NioEventLoop出來。
四.最后把NioSocketChannel包裝的JDK底層Channel當作key,自身NioSocketChannel當作attachment,注冊到NioEventLoop對應的Selector上。這樣后續有讀寫事件發生時,就可以從底層Channel直接獲得attachment即NioSocketChannel來進行讀寫數據的邏輯處理。
9.新連接接入總結
新連接接入整體可以分為兩部分:一是檢測新連接,二是注冊Reactor線程。
一.首先在Netty服務端的Channel(也就是NioServerSocketChannel)綁定的NioEventLoop(也就是boss線程)中,輪詢到ACCEPT事件。
二.然后調用JDK的服務端Channel的accept()方法獲取一個JDK的客戶端Channel,并且將其封裝成Netty的客戶端Channel(即NioSocketChannel)。
三.封裝過程中會創建這個NioSocketChannel一系列的組件,如unsafe組件和pipeline組件。unsafe組件主要用于進行Channel的讀寫,pipeline組件主要用于處理Channel數據的業務邏輯。
四.接著Netty服務端Channel的Pipeline的一個處理器ServerBootstrapAcceptor,會給當前Netty客戶端Channel分配一個NioEventLoop并將客戶端Channel綁定到Selector上。
五.最后會傳播ChannelRegistered事件和ChannelActive事件,并將客戶端Channel的讀事件注冊到Selector上。
至此,新連接NioSocketChannel便可以開始正常讀寫數據了。
文章轉載自:東陽馬生架構
原文鏈接:Netty源碼—4.客戶端接入流程 - 東陽馬生架構 - 博客園
體驗地址:引邁 - JNPF快速開發平臺_低代碼開發平臺_零代碼開發平臺_流程設計器_表單引擎_工作流引擎_軟件架構