學習鏈接
NIO&Netty - 專欄
- Netty核心技術十–Netty 核心源碼剖析
- Netty核心技術九–TCP 粘包和拆包及解決方案
- Netty核心技術七–Google Protobuf
- Netty核心技術六–Netty核心模塊組件
- Netty核心技術五–Netty高性能架構設計
聊聊Netty那些事兒 - 專欄
- 一文搞懂Netty發送數據全流程 | 你想知道的細節全在這里
netty源碼解析 - 系列
- Netty源碼分析 (一)----- NioEventLoopGroup
- Netty源碼分析 (二)----- ServerBootstrap
- Netty源碼分析 (三)----- 服務端啟動源碼分析
- Netty源碼分析 (四)----- ChannelPipeline
- Netty源碼分析 (五)----- 數據如何在 pipeline 中流動
- Netty源碼分析 (六)----- 客戶端接入accept過程
- Netty源碼分析 (七)----- read過程 源碼分析
- Netty源碼分析 (八)----- write過程 源碼分析
- Netty源碼分析 (九)----- 拆包器的奧秘
- Netty源碼分析 (十)----- 拆包器之LineBasedFrameDecoder
- Netty源碼分析 (十一)----- 拆包器之LengthFieldBasedFrameDecoder
- Netty源碼分析 (十二)----- 心跳服務之 IdleStateHandler 源碼分析
文章目錄
- 學習鏈接
- 1. 源碼分析
- 1.1 啟動剖析
- AbstractBootstrap#doBind
- AbstractBootstrap#initAndRegister
- ServerBootstrap#init
- AbstractUnsafe#register
- - ChannelInitializer#initChannel
- AbstractBootstrap#doBind0
- AbstractUnsafe#bind
- - NioServerSocketChannel#doBind
- - HeadContext#channelActive
- -- AbstractNioChannel#doBeginRead
- 1.2 NioEventLoop 剖析
- NioEventLoop的重要組成
- selector何時創建
- nio線程在何時啟動
- SingleThreadEventExecutor#execute
- *NioEventLoop#run
- NioEventLoop#select
- NioEventLoop#processSelectedKeys
- - NioEventLoop#processSelectedKey
- 1.3 accept 剖析
- AbstractNioMessageChannel.NioMessageUnsafe#read
- ServerBootstrapAcceptor#channelRead
- AbstractChannel.AbstractUnsafe#register
- *AbstractUnsafe#register0
- - HeadContext#channelActive
- --AbstractNioChannel#doBeginRead
- 1.4 read 剖析
- AbstractNioByteChannel.NioByteUnsafe#read
- NioSocketChannel#doReadBytes
- MaxMessageHandle#continueReading
- 1.5 write剖析
- write:寫隊列
- flush:刷新寫隊列
- writeAndFlush: 寫隊列并刷新
1. 源碼分析
1.1 啟動剖析
我們就來看看 netty 中對下面的代碼是怎樣進行處理的
(先明確Java nio的基礎步驟如下,而netty在啟動過程中,也是需要做下面的事情的)
//1 netty 中使用 NioEventLoopGroup (簡稱 nio boss 線程)來封裝線程和 selector
Selector selector = Selector.open(); //2 創建 NioServerSocketChannel,同時會初始化它關聯的 handler,以及為原生 ssc 存儲 config
NioServerSocketChannel attachment = new NioServerSocketChannel();//3 創建 NioServerSocketChannel 時,創建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//4 啟動 nio boss 線程執行接下來的操作//5 注冊(僅關聯 selector 和 NioServerSocketChannel),未關注事件
// (注意,這里將NioServerSocketChannel作為附件綁定到了selectionKey上,當此ServerSocketChannel有可連接事件時,就可以獲取到此selectionKey,從而獲取到對應的NioServerSocketChannel)
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);// (ServerBootstrapAcceptor是ChannelInboundHandlerAdapter入站類型的處理器)
//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只為添加 acceptor//7 綁定端口
serverSocketChannel.bind(new InetSocketAddress(8080));//8 觸發 channel active 事件,在 head 中關注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
源碼入口,可以從下面的代碼進入
(暫時先不看NioEventLoopGroup,而Selector是存在于NioEventLoop中的,所以selector.open暫時不看)
public class TestSourceServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8880); // 以bind為入口}
}
AbstractBootstrap#doBind
入口 io.netty.bootstrap.ServerBootstrap#bind
關鍵代碼 io.netty.bootstrap.AbstractBootstrap#doBind
(1、注意main線程和nio線程的切換;
? 2、initAndRegister 對應 nio中 創建ServerSocketChannel 和 把ServerSocketChannel注冊到selector上
? 3、doBind0 對應 nio中 bind監聽端口)
private ChannelFuture doBind(final SocketAddress localAddress) {// 1. 執行初始化和注冊 regFuture 會由 initAndRegister 設置其是否完成,從而回調 3.2 處代碼final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}// 2. 因為 initAndRegister 是異步執行,需要分兩種情況來看,調試時也需要通過 suspend 斷點類型加以區分// 2.1 如果已經完成(如果前面做的比較快,就進入這個if塊)if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();// 3.1 立刻調用 doBind0doBind0(regFuture, channel, localAddress, promise);return promise;} // 2.2 還沒有完成else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 3.2 回調 doBind0regFuture.addListener(new ChannelFutureListener() {// (這個operationComplete是由nio線程來調用的)@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 處理異常...promise.setFailure(cause);} else {promise.registered();// 3. 由注冊線程去執行 doBind0doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
AbstractBootstrap#initAndRegister
關鍵代碼 io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {Channel channel = null;try {//(1、這里會去調用NioServerSocketChannel的無參構造方法去得到channel// 2、在NioServerSocketChannel的無參構造方法中會去創建javanio的ServerSocketChannel,// 并且將該ServerSocketChannel維護在NioServerSocketChannel中,// 并配置為非阻塞模式,感興趣的事件是OP_ACCEPT,但是還沒注冊到selector上,// 只是維護了這些基本信息到NioServerSocketChannel。// 并且在NioServerSocketChannel的構造方法中會去創建NioServerSocketChannelConfig// 維護到NioServerSocketChannel中。// 并且NioServerSocketChannel的構造方法中會去創建DefaultChannelPipeline// 維護到NioServerSocketChannel中)channel = channelFactory.newChannel();// 1.1 初始化 - 做的事就是添加一個初始化器 ChannelInitializerinit(channel);} catch (Throwable t) {// 處理異常...return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 1.2 注冊 - 做的事就是將原生 channel 注冊到 selector 上// (這里會從eventLoopGroup中挑選出1個eventLoop來注冊ServerSocketChannel)ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {// 處理異常...}return regFuture;
}
ServerBootstrap#init
關鍵代碼 io.netty.bootstrap.ServerBootstrap#init
(這里會給ServerSocketChannel的pipeline中添加1個ChannelInitializer初始化器,該初始化器只會執行1次,后續將會移除掉。)
// 這里 channel 實際上是 NioServerSocketChannel
void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// 為 NioServerSocketChannel 的pipeline 添加初始化器!!!// (1、該初始化器的initChannel方法只會執行1次,后續該初始化器將會移除掉,// 移除動作是在ChannelInitializer#initChannel中操作的。// 2、注意該初始化器的initChannel方法在此處尚未被調用。// 3、initChannel方法的調用時機是在AbstractChannel的register0方法中,// 在做完將channel注冊到selector上之后的// pipeline.invokeHandlerAddedIfNeeded()這句代碼調用的)p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// (這里意味著可以通過配置給config1個handler,// 從而給serverSocketChannel的pipeline添加1個handler)ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// 1、初始化器的職責是將 ServerBootstrapAcceptor 加入至 NioServerSocketChannel// 2、ServerBootstrapAcceptor的作用是在selector觸發可連接事件時,建立連接// 3、保證添加這個動作是在nio線程中完成的ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
AbstractUnsafe#register
關鍵代碼 io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些檢查,略...AbstractChannel.this.eventLoop = eventLoop;// (判斷當前線程是不是eventLoop的線程,因為順著剛剛的邏輯,當前還在主線程中,所以走else)if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 首次執行 execute 方法時,會啟動 nio 線程,之后注冊等操作在 nio 線程上執行// 因為只有一個 NioServerSocketChannel 因此,也只會有一個 boss nio 線程// 這行代碼完成的事實是 main -> nio boss 線程的切換eventLoop.execute(new Runnable() {@Overridepublic void run() {// (該方法在nio線程上執行,并且注意promise傳進去了,用于通知其它線程)register0(promise);}});} catch (Throwable t) {// 日志記錄...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
#####- *AbstractUnsafe#register0
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;// 1.2.1 【將原生的 nio channel 綁定到 selector 上】,// 注意此時沒有注冊 selector 關注事件,附件為 NioServerSocketChanneldoRegister();neverRegistered = false;registered = true;// 1.2.2 執行 NioServerSocketChannel 初始化器的 initChannel//(1、調用到ServerBootstrap的init方法中為NioServerSocketChannel的// pipeline添加的初始化器的initChannel方法。// 2、該initChannel方向pipeline中添加了ServerBootstrapAcceptor這個入站處理器)pipeline.invokeHandlerAddedIfNeeded();// (給promise對象1個成功的結果,這樣前面的監聽就能收到這個結果觸發operationComplete方法,// 就會去通知前面在AbstractBootstrap#doBind方法中注冊的監聽去做doBind0綁定監聽端口)// 回調 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0safeSetSuccess(promise);pipeline.fireChannelRegistered();// 對應 server socket channel 還未綁定,isActive 為 falseif (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 將java的channel注冊到了eventLoop的selector上// (此時,尚未注冊感興趣的事件。同時,注意當前this作為附件綁定到了selectionKey)selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {eventLoop().selectNow();selected = true;} else {throw e;}}}}
- ChannelInitializer#initChannel
關鍵代碼 io.netty.channel.ChannelInitializer#initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {// 1.2.2.1 執行初始化!!!(調用前面添加的初始化器的initChannel方法)initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {// 1.2.2.2 移除初始化器!!!(調用完成后,移除初始化器)ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}}return true;}return false;
}
AbstractBootstrap#doBind0
關鍵代碼 io.netty.bootstrap.AbstractBootstrap#doBind0
// 3.1 或 3.2 執行 doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 1、確保執行是在nio eventLoop線程中執行// 2、綁定會從pipe的tail開始找,最終會到headContext中調用到AbstractUnsafe的bind方法channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}
AbstractUnsafe#bind
關鍵代碼 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// 記錄日志...}boolean wasActive = isActive();try {// 3.3 【執行端口綁定】doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}// 從這里可以看出是綁定端口后,再去觸發active事件的// 當前serverSocketChannel的pipeline已經添加了head-acceptor-tail處理器鏈, // 并且已經綁定好端口了,所以這里觸發pipeline上所有handler的active事件,// 接下來,去看HeadContext#channelActive方法if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {// 3.4 【觸發 active 事件】pipeline.fireChannelActive();}});}safeSetSuccess(promise);
}
- NioServerSocketChannel#doBind
3.3 關鍵代碼 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {// 調用java原生channel的綁定端口的方法javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
- HeadContext#channelActive
3.4 關鍵代碼 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {// 觸發所有handler的active事件ctx.fireChannelActive();// 從這里可以看出是所有handler的active觸發之后,再將channel注冊感興趣的事件的// 觸發 read ,目的是為了觸發channel的事件注冊,注冊OP_ACCEPT事件,// 見AbstractNioChannel#doBeginRead// (注意: NioServerSocketChannel 上的 read 不是讀取數據,只是為了觸發 channel 的事件注冊)readIfIsAutoRead();
}
– AbstractNioChannel#doBeginRead
關鍵代碼 io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();// readInterestOp 取值是 16,在NioServerSocketChannel創建時初始化好,代表關注 accept 事件if ((interestOps & readInterestOp) == 0) {// 注冊感興趣的事件!!!selectionKey.interestOps(interestOps | readInterestOp);}
}
1.2 NioEventLoop 剖析
NioEventLoop的重要組成
1、在NioEventLoop類中有成員變量
private Selector selector;
private Selector unwrappedSelector;
2、在NioEventLoop的父類SingleThreadEventExecutor中有成員變量:
private volatile Thread thread;// 使用的跟上面同1個thread
private final Executor executor; // 由于eventLoop是單線程,其它的任務先放在taskQueue任務隊列中,然后由單線程依次執行
private final Queue<Runnable> taskQueue;
3、在NioEventLoop的父類的父類AbstractScheduledEventExecutor中有成員變量
// 用來處理定時任務
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
NioEventLoop 線程不僅要處理 IO 事件,還要處理 Task(包括普通任務和定時任務)
selector何時創建
在NioEventLoop的唯一構造方法中,創建了selector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");// 在這里創建selectorfinal SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
那為什么會有2個selector呢?
因為netty要把selector里面原來的selectionKey的set實現改為用數組實現,因為數組遍歷的性能比set好!
nio線程在何時啟動
(當首次調用eventLoop的execute方法時,會啟動線程,并且state狀態位控制只會啟動1次。)
public class TestEventLoop {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();group.next()// 入口.execute(() -> {System.out.println("Hello");});}}
SingleThreadEventExecutor#execute
提交任務代碼 io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判斷當前線程是否是eventLoop的thread,很顯然,現在eventLoop的thread是nullboolean inEventLoop = inEventLoop();// 添加任務,其中隊列使用了 jctools 提供的 mpsc 無鎖隊列addTask(task);if (!inEventLoop) {// inEventLoop 如果為 false 表示由其它線程來調用 execute,// 即首次調用時,需要向 eventLoop 提交首個任務,啟動死循環,會執行到下面的 doStartThreadstartThread();if (isShutdown()) {// 如果已經 shutdown,做拒絕邏輯,代碼略...}}if (!addTaskWakesUp && wakesUpForTask(task)) {// 如果線程由于 IO select 阻塞了,添加任務的線程需要負責喚醒 NioEventLoop 線程wakeup(inEventLoop);}
}private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 啟動線程doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}private void doStartThread() {assert thread == null;// 這個executor是在 MultithreadEventExecutorGroup的構造方法中初始化的,// (直接new的ThreadPerTaskExecutor)executor.execute(new Runnable() {@Overridepublic void run() {// 將線程池的當前線程保存在成員變量中,以便后續使用// 將thread線程設置為執行線程// (所以eventLoopgroup中的executor屬性的線程和thread屬性是同一個線程)thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 調用外部類 SingleThreadEventExecutor 的 run 方法,進入死循環,run 方法見下// 【啟動 EventLoop 主循環 】SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally { // 清理工作,代碼略... }}});
}@Override
protected void wakeup(boolean inEventLoop) {// !eventLoop的理解: 只有其它非nio線程提交任務,才會有機會去喚醒selector停止阻塞// (因為如果是eventLoop自己提交任務給自己,在提交的時候,// 當前eventLoop正在運行,沒有阻塞,所以不需要喚醒selector)// wakenUp的理解: 當多個其它非nio線程提交任務,那么只會將selector喚醒1次if (!inEventLoop && wakenUp.compareAndSet(false, true)) {// 喚醒 select 阻塞線程// (這個wakeup調用后,如果selector正在select,那么直接喚醒,// 如果selector還沒select,那么該selector去select時,就不會阻塞。// 類似于LockSupport的park和unpark。)selector.wakeup();}
}
*NioEventLoop#run
io.netty.channel.nio.NioEventLoop#run
主要任務是執行死循環,不斷看有沒有新任務,有沒有定時任務,有沒有 IO 事件,如果有,則執行。
// 死循環執行
for (;;) {try {try {//calculateStrategy 的邏輯如下:/* [代碼] hasTasks ? selectNow() : SelectStrategy.SELECT; */// 當有任務時, 會執行一次selectNow(),去獲取看看是否有io事件,// 并且會清除上一次的wakeup結果, 無論有沒有IO事件,都會跳過switch// (因為有任務的話,即便沒有io事件,也得干活,所以沒有必要阻塞了)// 當沒有任務,會匹配 SelectStrategy.SELECT,看是否應當阻塞// (因為沒有任務的話,那就等有io事件了,再干活,所以就設置超時阻塞,// 同時還要看在阻塞期間有其它非nio線程提交任務,并喚醒selector。// 那么默認阻塞多久呢?那就需要看NioEventLoop#select(boolean oldWakenUp)方法// 默認是阻塞1s + 0.5ms,不過還得看有沒有定時任務。)switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE: // -2continue;case SelectStrategy.BUSY_WAIT: // -3case SelectStrategy.SELECT: // -1// 因為IO線程和提交任務線程都有可能執行 wakeup,而 wakeup 屬于比較昂貴的操作,// 因此使用了一個原子布爾對象 wakenUp,它取值為 true 時,表示該由當前線程喚醒// 進行 select 阻塞,并設置喚醒狀態為 falseboolean oldWakenUp = wakenUp.getAndSet(false);// 這里select方法中會調用select(timeoutMillis)阻塞,那么什么時候喚醒呢?// 當有io事件時自動喚醒// 或者超時自動喚醒// 或者有任務提交時,手動喚醒以便及時處理io事件以外的普通任務// 如果在這個位置,非 EventLoop 線程搶先將 wakenUp 置為 true,并 wakeup// 下面的 select 方法不會阻塞// 等 runAllTasks 處理完成后,到再循環進來這個階段新增的任務會不會及時執行呢?// 因為 oldWakenUp 為 true,因此下面的 select 方法就會阻塞,直到超時// 才能執行,讓 select 方法無謂阻塞select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}// 有任務 或者 正在等待io事件但io事件還沒來就被喚醒了 或者 io事件來了cancelledKeys = 0;needsToSelectAgain = false;// (如果eventLoop在執行非io任務的事件過長,勢必會影響到io事件的處理)// ioRatio 默認是 50final int ioRatio = this.ioRatio;// 如果ioRatio設置為 100,那么會讓普通任務都運行完。// 如果ioRatio不設置為100,那么會根據io事件處理的運行時間,算出普通任務可以運行的時間,// 算出的這個時間僅僅是用來判斷要不要繼續運行下1個普通任務,// 因此,如果1個普通任務本身耗時就特別長,// 這里是沒有中斷這個任務的說法的,而且還得任務響應中斷才行。if (ioRatio == 100) {try {processSelectedKeys();} finally {// ioRatio 為 100 時,總是運行完所有非 IO 任務runAllTasks();}} else { final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 記錄 io 事件處理耗時final long ioTime = System.nanoTime() - ioStartTime;// 運行非 IO 任務,一旦超時會退出 runAllTasksrunAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}
}
NioEventLoop#select
io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();// 計算等待時間// * 沒有 scheduledTask定時任務,超時時間為 1s// * 有 scheduledTas定時任務k,超時時間為 `下一個定時任務執行時間 - 當前時間`long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 如果超時,退出循環if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 如果期間又有task退出循環,如果沒這個判斷,那么任務就會等到下次select超時時才能被執行// wakenUp.compareAndSet(false, true) 是讓非 NioEventLoop 不必再執行 wakeupif (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// select 有限時阻塞// 注意 nio 有 bug,當 bug 出現時,select 方法即使沒有時間發生,也不會阻塞住,// 導致不斷空輪詢,cpu 占用 100%// (所以就用了 selectCnt ++ 來統計次數,因為如果bug發生的話,循環會很快,// 這樣selectCnt就會猛增,就檢測到了)int selectedKeys = selector.select(timeoutMillis);// 計數加 1selectCnt ++;// 醒來后,如果有 IO 事件、或是由非 EventLoop 線程喚醒,或者有任務,退出循環if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()|| hasScheduledTasks()) {break;}if (Thread.interrupted()) {// 線程被打斷,退出循環// 記錄日志selectCnt = 1;break;}long time = System.nanoTime();if (time-TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 如果超時,計數重置為 1,下次循環就會 breakselectCnt = 1;} // 計數超過閾值,由 io.netty.selectorAutoRebuildThreshold 指定,默認 512// 這是為了解決 nio 空輪詢 bug// (重新創建1個selector,來替換原來的selector,來解決nio空輪詢bug)else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 重建 selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {// 記錄日志}} catch (CancelledKeyException e) {// 記錄日志}
}
NioEventLoop#processSelectedKeys
處理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {// 通過反射將 Selector 實現類中的就緒事件集合替換為 SelectedSelectionKeySet // SelectedSelectionKeySet 底層為數組實現,可以提高遍歷性能(原本為 HashSet)processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}private void processSelectedKeysOptimized() {// 遍歷所有的selectionKeyfor (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// 獲取完就置為nullselectedKeys.keys[i] = null;// 附件就是 NioServerSocketChannelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {// 處理selectionKeyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}
- NioEventLoop#processSelectedKey
io.netty.channel.nio.NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 當 key 取消或關閉時會導致這個 key 無效if (!k.isValid()) {// 無效時處理...return;}try {int readyOps = k.readyOps();// 連接事件// (這個是客戶端需要監聽處理的事件,服務端就不用管)if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// OP_ACCEPT 和 OP_CONNECT的理解:// ServerBootstrap用來處理服務器端,而Bootstrap用于客戶端。當服務器綁定端口后,會注冊OP_ACCEPT事件,等待客戶端連接。一旦有連接進來,就會觸發這個事件,然后創建子Channel來處理通信。而客戶端在連接服務器時,會發起非阻塞的連接操作,這時候會注冊OP_CONNECT,當連接建立完成后,觸發該事件,之后就可以進行讀寫操作了// OP_CONNECT只是在連接過程中注冊,一旦連接成功,就會觸發,之后可能需要修改感興趣的事件為OP_READ等。// 需要注意,當連接失敗時,OP_CONNECT也會觸發,這時候需要處理異常情況。比如,在Netty中,連接失敗會觸發相應的異常處理機制,比如channel的exceptionCaught方法// 總結來說,OP_ACCEPT是服務器端用于接收新連接,而OP_CONNECT是客戶端用于處理連接建立完成的事件// 可讀:當客戶端或服務端的緩沖區有接收到數據,這時候,就會通知程序有數據可以讀了,然后這里就會觸發read事件,然后handler就使用channel去讀取數據,假設這里在handler里面只讀了1半數據,然后就不讀了,就是說還有數據沒有讀,但是這個時候,就去處理下1個selectionKey,那么當調用下1次selector.select方法時,仍然會由于有數據要讀取,而被喚醒,仍然是對應該channel的selectionKey的可讀事件。// 可寫:當客戶端或服務端需要將數據發送出去,這時候,需要訂閱可寫事件,當發送緩沖區可寫時,就會觸發這個事件,然后使用channel將數據寫出到緩沖區,假設1次沒寫完,那就需要繼續訂閱可寫事件,直到全部數據寫完,然后數據全部寫完之后,取消訂閱可寫事件,然后又有數據需要發送,就再次訂閱可寫事件,寫完之后,就取消訂閱可寫事件。// 可寫事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}// 可讀 或 可接入事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// (這個方法同時處理 可接入 和 可讀 事件,因為如果是NioServerSocketChannel它感興趣的是OP_ACCEPT事件,而如果是NioSocketChannel它感興趣的是OP_READ事件,對應的unsafe是不一樣的。)// 如果是可接入 AbstractNioMessageChannel.NioMessageUnsafe#read// 如果是可讀 AbstractNioByteChannel.NioByteUnsafe#readunsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}
1.3 accept 剖析
其中,前面3步,在NioEventLoop#processSelectedKey中已經做過分析了。
nio 中如下代碼,在 netty 中的流程
//1 阻塞直到事件發生
selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) { //2 拿到一個事件SelectionKey key = iter.next();//3 如果是 accept 事件if (key.isAcceptable()) {//4 執行 acceptSocketChannel channel = serverSocketChannel.accept();channel.configureBlocking(false);//5 關注 read 事件channel.register(selector, SelectionKey.OP_READ);}// ...
}
入口代碼
// 服務端
public class TestSourceServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8888);}
}
// 客戶端
public class TestSourceClient {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).connect(new InetSocketAddress("localhost", 8888));}}
AbstractNioMessageChannel.NioMessageUnsafe#read
先來看可接入事件處理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// doReadMessages中執行了accept并創建【NioSocketChannel】作為消息放入readBuf// readBuf 是一個 ArrayList 用來緩存消息// (看下面的doReadMessages方法)int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}// localRead 為 1,就一條消息,即接收一個客戶端連接allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t; // 忽略暫時的異常}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 觸發 read 事件,讓NioServerSocketChannel的pipelin上的 handler 處理,// 這時 肯定交給 ServerBootstrapAcceptor#channelReadpipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();// 觸發讀取完畢事件pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);// 觸發異常事件pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}@Override
protected int doReadMessages(List<Object> buf) throws Exception {// 獲取到SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// 創建NioSocketChannelbuf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;
}
ServerBootstrapAcceptor#channelRead
關鍵代碼 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 這時的 msg 是 NioSocketChannelfinal Channel child = (Channel) msg;// NioSocketChannel 添加 childHandler 即初始化器// (這里添加的是初始化器)child.pipeline().addLast(childHandler);// 設置選項setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 注冊 NioSocketChannel 到 nio worker 線程,接下來的處理也移交至 nio worker 線程childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}
AbstractChannel.AbstractUnsafe#register
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register
方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 一些檢查,略...AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 這行代碼完成的事實是 nio boss -> nio worker 線程的切換!!!eventLoop.execute(new Runnable() {@Overridepublic void run() {// 調用注冊的方法register0(promise);}});} catch (Throwable t) {// 日志記錄...closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
*AbstractUnsafe#register0
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;// 這里面在 AbstractNioChannel 的 doRegister()方法中會將channel注冊到selector上doRegister();neverRegistered = false;registered = true;//【關鍵代碼,注意初始化器執行前后。這里將會為NiosocketChannel添加自定義的handler。】// 執行初始化器,執行前 pipeline 中只有 head -> 初始化器 -> tailpipeline.invokeHandlerAddedIfNeeded();// 執行后就是 head -> logging handler -> my handler -> tail// 上面將客戶端的channel已經配置好了,所以通知promise已經成功設置了safeSetSuccess(promise);// 觸發handler的 channelRegistered事件pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {// 觸發 pipeline 上 active 事件// (這里就會在HeadContext#channelActive中讓channel關注可讀事件)pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
- HeadContext#channelActive
回到了熟悉的代碼 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 觸發read (NioSocketChannel 這里 read,只是為了觸發 channel 的事件注冊,還未涉及數據讀取)// (注冊可讀事件)readIfIsAutoRead();
}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {// 進入該調用,經過pipeline逐鏈調用,會來到HeadContext的read方法channel.read();}
}@Override
public void read(ChannelHandlerContext ctx) {// 接著這里調用到了AbstractNioChannel#doBeginReadunsafe.beginRead();
}
–AbstractNioChannel#doBeginRead
io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;// 這時候 interestOps 是 0final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {// 關注 read 事件!!!selectionKey.interestOps(interestOps | readInterestOp);}
}
1.4 read 剖析
接著NioEventLoop#processSelectedKey的那節,當對方發送消息來時。
AbstractNioByteChannel.NioByteUnsafe#read
再來看可讀事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意發送的數據未必能夠一次讀完,因此會觸發多次 nio read 事件,一次事件內會觸發多次 pipeline read,一次事件會觸發一次 pipeline read complete
public final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();// io.netty.allocator.type 決定 allocator 的實現final ByteBufAllocator allocator = config.getAllocator();// 用來分配 byteBuf,確定單次讀取大小final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);// 上面還是空的byteBuf// 讀取(這里就會將緩沖區中的數據讀取到byteBuf中,// 調用NioSocketChannel#doReadBytes)allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;// 觸發read事件,讓pipeline上的handler處理,這時是處理 NioSocketChannel上的 handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} // 是否要繼續循環while (allocHandle.continueReading());allocHandle.readComplete();// 觸發 read complete 事件pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}
NioSocketChannel#doReadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.attemptedBytesRead(byteBuf.writableBytes());// 讀取數據return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
MaxMessageHandle#continueReading
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {return // 一般為 trueconfig.isAutoRead() &&// respectMaybeMoreData 默認為 true// maybeMoreDataSupplier 的邏輯是如果預期讀取字節與實際讀取字節相等,返回 true(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&// 小于最大次數,maxMessagePerRead 默認 16totalMessages < maxMessagePerRead &&// 實際讀到了數據totalBytesRead > 0;
}
1.5 write剖析
public class TestSourceServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());}}).bind(8888);}
}
public class TestSourceClient {public static void main(String[] args) {new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.channel().write("halo");}});}}).connect(new InetSocketAddress("localhost", 8888));}}
HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise);
}
write:寫隊列
我們來看看channel中unsafe的write方法,先來看看其中的一個屬性
AbstractUnsafe
protected abstract class AbstractUnsafe implements Unsafe {private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
我們來看看 ChannelOutboundBuffer 這個類
public final class ChannelOutboundBuffer {private final Channel channel;private ChannelOutboundBuffer.Entry flushedEntry;private ChannelOutboundBuffer.Entry unflushedEntry;private ChannelOutboundBuffer.Entry tailEntry;
ChannelOutboundBuffer內部維護了一個Entry鏈表,并使用Entry封裝msg。其中的屬性我們下面會詳細講
我們回到正題,接著看 unsafe.write(msg, promise);
AbstractUnsafe
@Override
public final void write(Object msg, ChannelPromise promise) {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;int size;try {msg = filterOutboundMessage(msg);size = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} catch (Throwable t) {safeSetFailure(promise, t);ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, size, promise);
}
1.調用 filterOutboundMessage() 方法,將待寫入的對象過濾,把非ByteBuf對象和FileRegion過濾,把所有的非直接內存轉換成直接內存DirectBuffer
@Override
protected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
2.接下來,估算出需要寫入的ByteBuf的size
3.最后,調用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下來,我們需要重點看一下這個方法干了什么事情
ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {// 創建一個待寫出的消息節點Entry entry = Entry.newInstance(msg, size, total(msg), promise);if (tailEntry == null) {flushedEntry = null;tailEntry = entry;} else {Entry tail = tailEntry;tail.next = entry;tailEntry = entry;}if (unflushedEntry == null) {unflushedEntry = entry;}incrementPendingOutboundBytes(size, false);
}
想要理解上面這段代碼,必須得掌握寫緩存中的幾個消息指針,如下圖
hannelOutboundBuffer 里面的數據結構是一個單鏈表結構,每個節點是一個 Entry,Entry 里面包含了待寫出ByteBuf 以及消息回調 promise,下面分別是三個指針的作用
1.flushedEntry 指針表示第一個被寫到操作系統Socket緩沖區中的節點
2.unFlushedEntry 指針表示第一個未被寫入到操作系統Socket緩沖區中的節點
3.tailEntry指針表示ChannelOutboundBuffer緩沖區的最后一個節點
初次調用 addMessage 之后,各個指針的情況為
fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的節點
第二次調用 addMessage之后,各個指針的情況為
第n次調用 addMessage之后,各個指針的情況為
可以看到,調用n次addMessage,flushedEntry指針一直指向NULL,表示現在還未有節點需要寫出到Socket緩沖區,而unFushedEntry之后有n個節點,表示當前還有n個節點尚未寫出到Socket緩沖區中去
flush:刷新寫隊列
不管調用channel.flush(),還是ctx.flush(),最終都會落地到pipeline中的head節點
HeadContext
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush();
}
之后進入到AbstractUnsafe
AbstractUnsafe
public final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {return;}outboundBuffer.addFlush();flush0();
}
flush方法中,先調用 outboundBuffer.addFlush();
ChannelOutboundBuffer
public void addFlush() {Entry entry = unflushedEntry;if (entry != null) {if (flushedEntry == null) {flushedEntry = entry;}do {flushed ++;if (!entry.promise.setUncancellable()) {int pending = entry.cancel();decrementPendingOutboundBytes(pending, false, true);}entry = entry.next;} while (entry != null);unflushedEntry = null;}
}
可以結合前面的圖來看,首先拿到 unflushedEntry 指針,然后將 flushedEntry 指向unflushedEntry所指向的節點,調用完畢之后,三個指針的情況如下所示
相當于所有的節點都即將開始推送出去
接下來,調用 flush0();
AbstractUnsafe
protected void flush0() {doWrite(outboundBuffer);
}
發現這里的核心代碼就一個 doWrite,繼續跟
AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1;boolean setOpWrite = false;for (;;) {// 拿到第一個需要flush的節點的數據Object msg = in.current();if (msg instanceof ByteBuf) {// 強轉為ByteBuf,若發現沒有數據可讀,直接刪除該節點ByteBuf buf = (ByteBuf) msg;boolean done = false;long flushedAmount = 0;// 拿到自旋鎖迭代次數if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}// 自旋,將當前節點寫出for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}}in.progress(flushedAmount);// 寫完之后,將當前節點刪除if (done) {in.remove();} else {break;}} }
}
這里略微有點復雜,我們分析一下
1.第一步,調用current()先拿到第一個需要flush的節點的數據
ChannelOutBoundBuffer
public Object current() {Entry entry = flushedEntry;if (entry == null) {return null;}return entry.msg;
}
2.第二步,拿到自旋鎖的迭代次數
if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();
}
3.自旋的方式將ByteBuf寫出到jdk nio的Channel
for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}
}
doWriteBytes 方法跟進去
protected int doWriteBytes(ByteBuf buf) throws Exception {final int expectedWrittenBytes = buf.readableBytes();return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
我們發現,出現了 javaChannel(),表明已經進入到了jdk nio Channel的領域,我們來看看 buf.readBytes(javaChannel(), expectedWrittenBytes);
public int readBytes(GatheringByteChannel out, int length) throws IOException {this.checkReadableBytes(length);int readBytes = this.getBytes(this.readerIndex, out, length);this.readerIndex += readBytes;return readBytes;
}
我們來看關鍵代碼 this.getBytes(this.readerIndex, out, length)
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {this.checkIndex(index, length);if (length == 0) {return 0;} else {ByteBuffer tmpBuf;if (internal) {tmpBuf = this.internalNioBuffer();} else {tmpBuf = ((ByteBuffer)this.memory).duplicate();}index = this.idx(index);tmpBuf.clear().position(index).limit(index + length);//將tmpBuf中的數據寫到out中return out.write(tmpBuf);}
}
我們來看看out.write(tmpBuf)
public int write(ByteBuffer src) throws IOException {ensureOpen();if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {int n = 0;int ti = -1;try {begin();ti = threads.add();if (!isOpen())return 0;do {n = IOUtil.write(fd, src, -1, nd);} while ((n == IOStatus.INTERRUPTED) && isOpen());return IOStatus.normalize(n);} finally {threads.remove(ti);end(n > 0);assert IOStatus.check(n);}}
}
和read實現一樣,SocketChannelImpl的write方法通過IOUtil的write實現:關鍵代碼 n = IOUtil.write(fd, src, -1, nd);
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {//如果是DirectBuffer,直接寫,將堆外緩存中的數據拷貝到內核緩存中進行發送if (var1 instanceof DirectBuffer) {return writeFromNativeBuffer(var0, var1, var2, var4);} else {//非DirectBuffer//獲取已經讀取到的位置int var5 = var1.position();//獲取可以讀到的位置int var6 = var1.limit();assert var5 <= var6;//申請一個原buffer可讀大小的DirectByteBufferint var7 = var5 <= var6 ? var6 - var5 : 0;ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);int var10;try {var8.put(var1);var8.flip();var1.position(var5);//通過DirectBuffer寫,將堆外緩存的數據拷貝到內核緩存中進行發送int var9 = writeFromNativeBuffer(var0, var8, var2, var4);if (var9 > 0) {var1.position(var5 + var9);}var10 = var9;} finally {//回收分配的DirectByteBufferUtil.offerFirstTemporaryDirectBuffer(var8);}return var10;}
}
代碼邏輯我們就不再講了,代碼注釋已經很清楚了,這里我們關注一點,我們可以看看我們前面的一個方法 filterOutboundMessage(),將待寫入的對象過濾,把非ByteBuf對象和FileRegion過濾,把所有的非直接內存轉換成直接內存DirectBuffer
說明到了這一步所有的 var1 意境是直接內存DirectBuffer,就不需要走到else,就不需要write兩次了
4.刪除該節點
節點的數據已經寫入完畢,接下來就需要刪除該節點
ChannelOutBoundBuffer
public boolean remove() {Entry e = flushedEntry;Object msg = e.msg;ChannelPromise promise = e.promise;int size = e.pendingSize;removeEntry(e);if (!e.cancelled) {ReferenceCountUtil.safeRelease(msg);safeSuccess(promise);}// recycle the entrye.recycle();return true;
}
首先拿到當前被flush掉的節點(flushedEntry所指),然后拿到該節點的回調對象 ChannelPromise, 調用 removeEntry()方法移除該節點
private void removeEntry(Entry e) {if (-- flushed == 0) {flushedEntry = null;if (e == tailEntry) {tailEntry = null;unflushedEntry = null;}} else {flushedEntry = e.next;}
}
這里的remove是邏輯移除,只是將flushedEntry指針移到下個節點,調用完畢之后,節點圖示如下
writeAndFlush: 寫隊列并刷新
理解了write和flush這兩個過程,writeAndFlush 也就不難了
public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg);
}public ChannelFuture writeAndFlush(Object msg) {return writeAndFlush(msg, newPromise());
}public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {write(msg, true, promise);return promise;
}private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}}
}
可以看到,最終,通過一個boolean變量,表示是調用 invokeWriteAndFlush,還是 invokeWrite,invokeWrite便是我們上文中的write過程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {invokeWrite0(msg, promise);invokeFlush0();
}
可以看到,最終調用的底層方法和單獨調用 write 和 flush 是一樣的
private void invokeWrite(Object msg, ChannelPromise promise) {invokeWrite0(msg, promise);
}private void invokeFlush(Object msg, ChannelPromise promise) {invokeFlush0(msg, promise);
}
由此看來,invokeWriteAndFlush基本等價于write方法之后再來一次flush