大綱
1.關于NioEventLoop的問題整理
2.理解Reactor線程模型主要分三部分
3.NioEventLoop的創建
4.NioEventLoop的啟動
4.NioEventLoop的啟動
(1)啟動NioEventLoop的兩大入口
(2)判斷當前線程是否是NioEventLoop線程
(3)創建一個線程并啟動
(4)NioEventLoop的啟動總結
(1)啟動NioEventLoop的兩大入口
入口一:服務端啟動,注冊服務端Channel到Selector時
入口二:新連接接入,通過chooser綁定一個NioEventLoop時
下面先看入口一:
調用ServerBootstrap的bind()方法其實會調用AbstractBootstrap的doBind()方法,然后會調用AbstractBootstrap的initAndRegister()方法,接著執行config().group().register(channel)注冊服務端Channel。最后會逐層深入調用到AbstractChannel.AbstractUnsafe的register()方法,來啟動一個NioEventLoop將服務端Channel注冊到Selector上。
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {......
}//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.
//It support method-chaining to provide an easy way to configure the AbstractBootstrap.
//When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP).
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {...//Create a new Channel and bind it.public ChannelFuture bind(int inetPort) {//首先根據端口號創建一個InetSocketAddress對象,然后調用重載方法bind()return bind(new InetSocketAddress(inetPort));}//Create a new Channel and bind it.public ChannelFuture bind(SocketAddress localAddress) {//驗證服務啟動需要的必要參數validate();if (localAddress == null) throw new NullPointerException("localAddress");return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();//1.初始化和注冊Channelfinal Channel channel = regFuture.channel();...doBind0(regFuture, channel, localAddress, promise);//2.綁定服務端端口...return promise;}final ChannelFuture initAndRegister() {Channel channel = null;...//1.創建服務端Channelchannel = channelFactory.newChannel();//2.初始化服務端Channelinit(channel);...//3.注冊服務端Channel,比如通過NioEventLoopGroup的register()方法進行注冊ChannelFuture regFuture = config().group().register(channel);...return regFuture;}...
}//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);...@Overridepublic final ServerBootstrapConfig config() {return config;}...
}public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {protected final B bootstrap;...protected AbstractBootstrapConfig(B bootstrap) {this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");}//Returns the configured EventLoopGroup or null if non is configured yet.public final EventLoopGroup group() {//比如返回一個NioEventLoopGroup對象return bootstrap.group();}...
}//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {......
}//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {...@Overridepublic ChannelFuture register(Channel channel) {//先通過next()方法獲取一個NioEventLoop,然后通過NioEventLoop.register()方法注冊服務端Channelreturn next().register(channel);}@Overridepublic EventLoop next() {return (EventLoop) super.next();}...
}//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutorChooserFactory.EventExecutorChooser chooser;...@Overridepublic EventExecutor next() {//通過線程選擇器chooser選擇一個NioEventLoopreturn chooser.next();} ...
}//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {......
}//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {...@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");//調用AbstractUnsafe的register()方法promise.channel().unsafe().register(this, promise);return promise;}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private volatile EventLoop eventLoop;...//Unsafe implementation which sub-classes must extend and use.protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//綁定事件循環器,即綁定一個NioEventLoop到該Channel上AbstractChannel.this.eventLoop = eventLoop;//注冊Selector,并啟動一個NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通過啟動這個NioEventLoop線程來調用register0()方法將這個服務端Channel注冊到Selector上//其實執行的是SingleThreadEventExecutor的execute()方法eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}}...
}
(2)判斷當前線程是否是NioEventLoop線程
調用NioEventLoop的inEventLoop()方法可以判斷當前線程是否是Netty的Reactor線程,也就是NioEventLoop對應的線程實體。NioEventLoop的線程實體被創建之后,會將該線程實體保存到NioEventLoop父類的成員變量thread中。
服務端啟動、注冊服務端Channel到Selector,執行到AbstractUnsafe.register()方法中的eventLoop.inEventLoop()代碼時,會將main方法對應的主線程傳遞進來與this.thread進行比較。由于this.thread此時并未賦值,所以為空,因此inEventLoop()方法返回false,于是便會執行eventLoop.execute()代碼創建一個線程并啟動。
//SingleThreadEventLoop implementation which register the Channel's
//to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {......
}//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {......
}//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......
}//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {......
}//Abstract base class for {@link EventExecutor} implementations.
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {...@Overridepublic boolean inEventLoop() {//注冊服務端Channel時是通過主線程進行注冊的,Thread.currentThread()對應的就是main線程//調用SingleThreadEventExecutor.inEventLoop()方法return inEventLoop(Thread.currentThread());}...
}//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private volatile Thread thread;...@Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread;//此時線程還沒創建,this.thread為null}...
}
(3)創建一個線程并啟動
AbstractUnsafe.register()方法準備將服務端Channel注冊到Selector上時,首先在判斷條件中執行eventLoop.inEventLoop()代碼發現為false,于是便執行eventLoop.execute()代碼創建一個線程并啟動它去執行注冊任務。而執行eventLoop.execute()代碼其實就是調用SingleThreadEventExecutor的execute()方法。
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {...@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");//調用AbstractUnsafe的register()方法,并把NioEventLoop自己當作參數傳入promise.channel().unsafe().register(this, promise);return promise;}...
}//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private volatile EventLoop eventLoop;...//Unsafe implementation which sub-classes must extend and use.protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...//綁定事件循環器,即綁定一個NioEventLoop到該Channel上AbstractChannel.this.eventLoop = eventLoop;//注冊Selector,并啟動一個NioEventLoopif (eventLoop.inEventLoop()) {register0(promise);} else {...//通過啟動這個NioEventLoop線程來調用register0()方法將這個服務端Channel注冊到Selector上//其實執行的是SingleThreadEventExecutor的execute()方法eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}}}...
}//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {private volatile Thread thread;//創建NioEventLoop時會通過構造方法傳入NioEventLoopGroup的線程執行器executorprivate final Executor executor;...@Overridepublic void execute(Runnable task) {if (task == null) throw new NullPointerException("task");boolean inEventLoop = inEventLoop();//判斷當前線程是否是Netty的Reactor線程if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) reject();}if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);}private void startThread() {//判斷Reactor線程有沒有被啟動;如果沒有被啟動,則通過CAS調用doStartThread()方法啟動線程if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;//executor.execute()方法會創建出一個FastThreadLocalThread線程來執行Runnable任務//所以在Runnable的run()方法中,Thread.currentThread()指的是這個FastThreadLocalThread線程executor.execute(new Runnable() {@Overridepublic void run() {//Thread.currentThread()指的是FastThreadLocalThread線程thread = Thread.currentThread();...SingleThreadEventExecutor.this.run();//啟動線程...}});}//具體的run()方法由子類比如NioEventLoop來實現protected abstract void run();...
}
SingleThreadEventExecutor的execute()方法的說明如下:
一.這個方法也可能會被用戶代碼使用,如ctx.executor().execute(task)。所以execute()方法里又調用inEventLoop()方法進行了一次外部線程判斷,確保執行task任務時不會遇到線程問題。
二.如果當前線程不是Netty的Reactor線程,則調用startThread()方法啟動一個Reactor線程。在startThread()方法中首先會判斷當前NioEventLoop對應的Reactor線程實體有沒有被啟動。如果沒有被啟動,則通過設置CAS成功后調用doStartThread()方法啟動線程。
三.執行doStartThread()方法時,會調用NioEventLoop的內部成員變量executor的execute()方法。executor就是線程執行器ThreadPerTaskExecutor,它的作用是每次執行Runnable任務時都會創建一個線程來執行。也就是executor.execute()方法會通過DefaultThreadFactory的newThread()方法,創建出一個FastThreadLocalThread線程來執行Runnable任務。
四.doStartThread()方法的Runnable任務會由一個FastThreadLocalThread線程來執行。在Runnable任務的run()方法里,會保存ThreadPerTaskExecutor創建出來的FastThreadLocalThread對象到SingleThreadEventExecutor的成員變量thread中,然后調用SingleThreadEventExecutor的run()方法。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutor[] children;private final EventExecutorChooserFactory.EventExecutorChooser chooser;... //Create a new instance.protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));//1.創建ThreadPerTaskExecutor線程執行器if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//2.創建NioEventLoopchildren = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {...//創建每一個NioEventLoop時,都會調用newChild()方法給每一個NioEventLoop配置一些核心參數//傳入線程執行器executor去創建NioEventLoopchildren[i] = newChild(executor, args);}//3.創建線程選擇器chooser = chooserFactory.newChooser(children);...}protected ThreadFactory newDefaultThreadFactory() {//getClass()是獲取該方法所屬的對象類型,也就是NioEventLoopGroup類型//因為是通過NioEventLoopGroup的構造方法層層調用到這里的return new DefaultThreadFactory(getClass());}...
}public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) throw new NullPointerException("threadFactory");this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {//調用DefaultThreadFactory的newThread()方法執行Runnable任務threadFactory.newThread(command).start();}
}//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolId = new AtomicInteger();private final AtomicInteger nextId = new AtomicInteger();private final boolean daemon;private final int priority;protected final ThreadGroup threadGroup;...public DefaultThreadFactory(Class<?> poolType) {this(poolType, false, Thread.NORM_PRIORITY);}public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {//toPoolName()方法會把NioEventLoopGroup的首字母變成小寫this(toPoolName(poolType), daemon, priority);}public DefaultThreadFactory(String poolName, boolean daemon, int priority) {this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());}public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {...//prefix用來標記線程名字的前綴prefix = poolName + '-' + poolId.incrementAndGet() + '-';this.daemon = daemon;this.priority = priority;this.threadGroup = threadGroup;}@Overridepublic Thread newThread(Runnable r) {Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());if (t.isDaemon()) {if (!daemon) t.setDaemon(false);} else {if (daemon) t.setDaemon(true);}if (t.getPriority() != priority) t.setPriority(priority);return t;}protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);}...
}
NioEventLoop是如何與一個線程實體綁定的?NioEventLoop會通過線程執行器ThreadPerTaskExecutor創建一個FastThreadLocalThread,然后再將該FastThreadLocalThread線程保存到其成員變量中,從而實現與一個線程實體進行綁定。
(4)NioEventLoop的啟動總結
一.在注冊服務端Channel的過程中,主線程最終會調用AbstractUnsafe的register()方法。該方法首先會將一個NioEventLoop綁定到這個服務端Channel上,然后把實際注冊Selector的邏輯封裝成一個Runnable任務,接著調用NioEventLoop的execute()方法來執行這個Runnable任務。
二.NioEventLoop的execute()方法其實就是其父類SingleThreadEventExecutor的execute()方法,它會先判斷當前調用execute()方法的線程是不是Netty的Reactor線程,如果不是就調用startThread()方法來創建一個Reactor線程。
三.startThread()方法會通過線程執行器ThreadPerTaskExecutor的execute()方法來創建一個線程。這個線程是一個FastThreadLocalThread線程,這個線程需要執行如下邏輯:把線程保存到NioEventLoop的成員變量thread中,然后調用NioEventLoop的run()方法來啟動NioEventLoop。
NioEventLoop的啟動流程如下:
bind() -> initAndRegister() -> config().group().register() -> eventloop.execute() //入口startThread() -> doStartThread() //創建線程ThreadPerTaskExecutor.execute() //線程執行器創建FastThreadLocalThread線程thread = Thread.currentThread() //保存FastThreadLocalThread線程到NioEventLoop的成員變量中NioEventLoop.run() //啟動NioEventLoop
NioEventLoop的啟動流程說明如下:
首先bind()方法會將具體綁定端口的操作封裝成一個Runnable任務,然后調用NioEventLoop的execute()方法,接著Netty會判斷調用execute()方法的線程是否是NIO線程,如果發現不是就會調用startThread()方法開始創建線程。
創建線程是通過線程執行器ThreadPerTaskExecutor來創建的。線程執行器的作用是每執行一個任務都會創建一個線程,而且創建出來的線程就是NioEventLoop底層的一個FastThreadLocalThread線程。
創建完FastThreadLocalThread線程后會執行一個Runnable任務,該Runnable任務首先會將這個線程保存到NioEventLoop對象。保存的目的是為了判斷后續對NioEventLoop的相關執行線程是否為本身。如果不是就將封裝好的一個任務放入TaskQueue中進行串行執行,實現線程安全。該Runnable任務然后會調用NioEventLoop的run()方法,從而啟動NioEventLoop。NioEventLoop的run()方法是驅動Netty運轉的核心方法。