Netty源碼分析第二章: NioEventLoophtml
第四節: NioEventLoop線程的啓動promise
以前的小節咱們學習了NioEventLoop的建立以及線程分配器的初始化, 那麼NioEventLoop是如何開啓的呢, 咱們這一小節繼續學習服務器
NioEventLoop的開啓方法在其父類SingleThreadEventExecutor中的execute(Runnable task)方法中, 咱們跟到這個方法:異步
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //判斷當前線程是否是eventLoop線程
boolean inEventLoop = inEventLoop(); //若是是eventLoop線程
if (inEventLoop) { addTask(task); } else { //不是eventLoop線程啓動線程
startThread(); //添加task
addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
這個方法傳入一個Runnble對象, 也就是一個任務ide
首先boolean inEventLoop = inEventLoop()方法會判斷是否是NioEventLoop線程
oop
跟進 inEventLoop()方法:源碼分析
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
這裏inEventLoop(Thread.currentThread())方法傳入了當前線程對象, 這個方法會調用當前類的inEventLoop(Thread thread)方法
學習
跟進inEventLoop(Thread thread)方法:優化
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
咱們看到判斷的依據是當前線程對象是否是NioEventLoop綁定的線程對象, 這裏咱們會想到開啓線程確定會爲NioEventLoop綁定一個線程對象, 若是判斷當前線程對象不是當前NioEventLoop綁定的線程對象, 說明執行此方法的線程不是當前NioEventLoop線程, 那麼這個線程如何初始化的, 後面咱們會講到, 咱們繼續看execute(Runnable task)方法:this
若是是NioEventLoop線程, 則會經過addTask(task)添加任務, 經過NioEventLoop異步執行, 那麼這個task是何時執行的, 一樣後面會講到
跟一下addTask(task):
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //若是添加不成功
if (!offerTask(task)) { reject(task); } }
這裏offerTask(task)表明添加一個task, 跟進去:
final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } //往taskQ中添加一個task
return taskQueue.offer(task); }
咱們看到taskQueue.offer(task)將一個task添加到任務隊列, 而這個任務隊列taskQueue就是咱們NioEventLoop初始化的時候與NioEventLoop惟一綁定的任務隊列
回顧一下初始構造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
在這裏經過 taskQueue = newTaskQueue(this.maxPendingTasks) 建立了taskQueue
回到execute(Runnable task)方法中, 咱們繼續往下看:
若是不是NioEventLoop線程咱們經過startThread()開啓一個NioEventLoop線程
跟到startThread()以前, 咱們先繼續往下走:
開啓NioEventLoop線程以後, 又經過addTask(task)往taskQueue添加任務
最後咱們注意有這麼一段代碼:
if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }
addTaskWakesUp表明添加task以後, NioEventLoop的select()操做是否是要喚醒, 這個屬性是在初始化NioEventLoop的時候傳入的, 你們能夠回顧下, 默認是false, 這裏!addTaskWakesUp就是須要喚醒, wakesUpForTask(task)與addTaskWakesUp意義相同, 默認是true, 能夠看代碼:
protected boolean wakesUpForTask(Runnable task) { return true; }
這裏恆爲true, 因此這段代碼就是添加task時須要經過wakeup(inEventLoop)喚醒, 這樣NioEventLoop在作select()操做時若是正在阻塞則馬上喚醒, 而後執行任務隊列的task
回到execute(Runnable task)方法中咱們跟進開啓線程的startThread()方法中:
private void startThread() { //判斷線程是否啓動, 未啓動則啓動
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { //當前線程未啓動, 則啓動
doStartThread(); } } }
前面的判斷是判斷當前NioEventLoop線程是否啓動, 若是未啓動, 則經過doStartThread()方法啓動, 咱們第一次執行execute(Runnable task)線程是未啓動的, 因此會執行doStartThread(), 後續該線程則不會再執行doStartThread()方法
咱們跟進doStartThread()方法中:
private void doStartThread() { assert thread == null; //線程執行器執行線程(全部的eventLoop共用一個線程執行器)
executor.execute(new Runnable() { @Override public void run() { //將當前線程複製給屬性
thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //開始輪詢
SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //代碼省略
} } }); }
咱們重點關注executor.execute()這個方法, 其中executor就是咱們建立NioEventLoop的線程器, execute()就是開啓一個線程
回顧下execute()方法:
public void execute(Runnable command) { //起一個線程
threadFactory.newThread(command).start(); }
咱們看到經過線程工廠開啓一個線程, 因爲前面的小節已經剖析, 這裏再也不贅述
開啓線程則執行Runnble類中的run()方法, 咱們看到在run()方法裏經過 thread = Thread.currentThread() 將新開啓的線程對象賦值NioEventLoop的thread的屬性, 這樣就能夠經過線程對象的判斷, 來肯定是否是NioEventLoop線程了
後面咱們看到 SingleThreadEventExecutor.this.run() , 這裏this, 就是當前NioEventLoop對象, 而這裏的run()方法, 就是NioEventLoop中的run()方法, 在這個run()方法中, 真正開始了selector的輪詢工做, 對於run()方法的詳細剖析, 咱們會在以後的小節中進行
剛纔咱們剖析了NioEventLoop的啓動方法, 那麼根據咱們的分析, 就是第一次調用NioEventLoop的execute(Runnable task)方法的時候, 則會開啓NioEventLoop線程, 以後的調用只是往taskQueue中添加任務, 那麼第一次是何時開啓的呢?這裏咱們要回顧上一章講過的內容
上一章中咱們講過在AbstractServerBootstrap中有個initAndRegister()方法, 這個方法主要用於channel的初始化和註冊, 其中註冊的代碼爲:
ChannelFuture regFuture = config().group().register(channel);
其中group()咱們剖析過是Boss線程的group, 咱們剖析過其中的register(channel)方法:
public ChannelFuture register(Channel channel) { return next().register(channel); }
首先跟到next()方法:
public EventLoop next() { return (EventLoop) super.next(); }
首先調用了其父類MultithreadEventExecutorGroup的next方法, 跟進去:
public EventExecutor next() { return chooser.next(); }
這裏chooser, 就是初始化NioEventLoopGroup的線程選擇器, 爲此分配了不一樣的策略, 這裏再也不贅述, 經過這個方法, 返回一個NioEventLoop線程
回到MultithreadEventLoopGroup類的register()方法中, next().register(channel)表明分配後的NioEventLoop的register()方法, 這裏會調用NioEventLoop的父類SingleThreadEventLoop類中的register()方法
跟到SingleThreadEventLoop類中的register()方法:
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
DefaultChannelPromise是一個監聽器, 它會跟隨channel的讀寫進行監聽, 綁定傳入的channel和NioEventLoop, 有關Promise後面的章節會講到
這裏咱們繼續跟進register(new DefaultChannelPromise(channel, this))
public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
unsafe()方法返回建立channel初始化的unsafe()對象, 若是是NioSeverSocketChannel, 則綁定NioMessageUnsafe對象, 上一小節進行剖析過這裏再也不贅述
最終這個unsafe對象會調用到AbstractChannel的內部類AbstractUnsafe中的register()方法, 這裏register(), 不管是客戶端channel和服務器channel都會經過這個一個register註冊, 在之後的客戶端接入章節中咱們會看到
這裏咱們繼續看register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { //代碼省略 //全部的複製操做, 都交給eventLoop處理(1)
AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { //作實際主註冊(2)
register0(promise); } }); } catch (Throwable t) { //代碼省略
} } }
這裏咱們上一小節分析過, 再也不陌生, 這裏只分析有關NioEventLoop相關的內容
咱們首先看到 AbstractChannel.this.eventLoop = eventLoop , 獲取當前channel的NioEventLoop, 經過上一章的學習, 咱們知道每一個channel建立的時候會綁定一個NioEventLoop
這裏經過eventLoop.inEventLoop()判斷當前線程是不是NioEventLoop線程, inEventLoop()方法在前面的小節剖析過, 這裏再也不贅述
若是是NioEventLoop線程則經過register0(promise)方法作實際的註冊, 可是咱們第一次執行註冊方法的時候, 若是是服務器channel是則是由server的用戶線程執行的, 若是是客戶端channel, 則是由Boss線程執行的, 因此走到這裏均不是當前channel的NioEventLoop的線程, 因而會走到下面的eventLoop.execute()方法中
eventLoop.execute()上一小節剖析過, 就是將task添加到taskQueue中而且開啓器NioEventLoop線程, 因此, 在這裏就開啓了NioEventLoop線程, 有關開啓步驟, 能夠經過上一小節內容進行回顧
這裏注意一點, 有的資料會講第一次開啓NioEventLoop線程是在AbstractBootstrap的doBind0(regFuture, channel, localAddress, promise)方法中開啓的, 我的通過debug和分析, 實際上並非那樣的, 但願你們不要被誤導
簡單看下doBind0(regFuture, channel, localAddress, promise)方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { //綁定端口
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
這裏雖然調用了eventLoop的execute()方法, 可是eventLoop線程在註冊期間已經啓動, 因此這裏不會重複啓動, 只會將任務添加到taskQueue中
其實這裏咱們也可以看出, 其實綁定端口的相關操做, 一樣是也是eventLoop線程中執行的