上一篇文件淺析了Netty中的事件驅動過程,這篇主要寫一下異步相關的東東。bootstrap
首先,什麼是異步了?併發
異步的概念和同步相對。當一個異步過程調用發出後,調用者不能馬上獲得結果。實際處理這個調用的部件在完成後,經過狀態、通知和回調來通知調用者。異步
異步的好處是不會形成阻塞,在高併發情形下會更穩定和更高的吞吐量。函數
說到Netty中的異步,就不得不提ChannelFuture。Netty中的IO操做是異步的,包括bind、write、connect等操做會簡單的返回一個ChannelFuture,調用者並不能馬上得到結果。高併發
當future對象剛剛建立時,處於非完成狀態。能夠經過isDone()方法來判斷當前操做是否完成。經過isSuccess()判斷已完成的當前操做是否成功,getCause()來獲取已完成的當前操做失敗的緣由,isCancelled()來判斷已完成的當前操做是否被取消。this
調用者能夠經過返回的ChannelFuture來獲取操做執行的狀態,註冊監聽函數來執行完成後的操做。spa
其實同步的阻塞和異步的非阻塞能夠直接經過代碼看出:code
這是一段阻塞的代碼:對象
printTime("開始connect: "); // Start the connection attempt. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is closed or the connection attempt fails. future.getChannel().getCloseFuture().awaitUninterruptibly(); printTime("connect結束: "); // Shut down thread pools to exit. bootstrap.releaseExternalResources();
這段代碼的輸出結果是:blog
開始connect: 2013-07-17 14:45:28
connect結束: 2013-07-17 14:45:29
很明顯的能夠看出,connect操做致使整段代碼阻塞了大概1秒。
如下這段是異步非阻塞的代碼:
printTime("開始connect: "); // Start the connection attempt. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { public void operationComplete(final ChannelFuture future) throws Exception { printTime("connect結束: "); } }); printTime("異步時間: "); // Shut down thread pools to exit. bootstrap.releaseExternalResources();
輸出結果是:
開始connect: 2013-07-17 14:50:09 異步時間: 2013-07-17 14:50:09 connect結束: 2013-07-17 14:50:09
能夠明顯的看出,在異步模式下,上面這段代碼沒有阻塞,在執行connect操做後直接執行到printTime("異步時間: "),隨後connect完成,future的監聽函數輸出connect操做完成。
關於同步的阻塞和異步的非阻塞能夠打一個很簡單的比方,A向B打電話,通知B作一件事。
在同步模式下,A告訴B作什麼什麼事,而後A依然拿着電話,等待B作完,才能夠作下一件事;
在異步模式下,A告訴B作什麼什麼事,A掛電話,作本身的事。B作完後,打電話通知A作完了。
如上面代碼所顯示的,ChannelFuture同時提供了阻塞和非阻塞方法,接下來就簡單的分析一下各自是怎麼實現的。
阻塞方法是await系列,這些方法要當心翼翼的使用,不能夠在handler內調用這些方法,不然會形成死鎖。
public ChannelFuture awaitUninterruptibly() { boolean interrupted = false; synchronized (this) { //循環等待到完成 while (!done) { checkDeadLock(); waiters++; try { wait(); } catch (InterruptedException e) { //不容許中斷 interrupted = true; } finally { waiters--; } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; }
一個標誌位,一個while循環,代碼簡潔明瞭。
非阻塞則是添加監聽類ChannelFutureListener,經過覆蓋ChannelFutureListener的operationComplete執行業務邏輯。
public void addListener(final ChannelFutureListener listener) { if (listener == null) { throw new NullPointerException("listener"); } boolean notifyNow = false; synchronized (this) { if (done) { notifyNow = true; } else { if (firstListener == null) { //listener鏈表頭 firstListener = listener; } else { if (otherListeners == null) { otherListeners = new ArrayList<ChannelFutureListener>(1); } //添加到listener鏈表中,以便操做完成後遍歷操做 otherListeners.add(listener); } ...... if (notifyNow) { //通知listener進行處理 notifyListener(listener); } }
而後當操做完成後直接遍歷listener鏈表,把每一個listener取出來執行。以setSuccess爲例,以下:
public boolean setSuccess() { synchronized (this) { // Allow only once. if (done) { return false; } done = true; //喚醒全部等待 if (waiters > 0) { notifyAll(); } } //通知全部listener notifyListeners(); return true; }
private void notifyListeners() { if (firstListener != null) { //執行listener表頭 notifyListener(firstListener); firstListener = null; //挨個執行其他的listener if (otherListeners != null) { for (ChannelFutureListener l: otherListeners) { notifyListener(l); } otherListeners = null; } } }
其實這部分代碼的邏輯很簡單,就是註冊回調函數,當操做完成後自動調用回調函數,就達到了異步的效果。