netty源碼死磕9 html
Future Promise 模式詳解java
因爲Netty中的Handler 處理都是異步IO操做,結果是未知的。編程
Netty繼承和擴展了JDK Future的API,定義了自身的Future系列類型,實現異步操做結果的獲取和監控。promise
其中,最爲重要的是ChannelFuture 。網絡
代碼以下:異步
public interface ChannelFuture extends Future<Void> { //... Channel channel(); @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture sync() throws InterruptedException; //... }
之因此命名爲ChannelFuture,表示跟Channel的操做有關。socket
ChannelFuture用於獲取Channel相關的操做結果,添加事件監聽器,取消IO操做,同步等待。ide
java.util.concurrent.Future是Java提供的接口,提供了對異步操做的簡單幹預。oop
Future接口定義了isDone()、isCancellable(),用來判斷異步執行狀態。Future接口的get方法,能夠用來獲取結果。get方法首先會判斷任務是否執行完成,若是完成就返回結果,不然阻塞線程,直到任務完成。學習
Netty官方文檔直接說明——Netty的網絡操做都是異步的,Netty源碼上大量使用了Future/Promise模式。
若是用戶操做調用了sync或者await方法,會在對應的future對象上阻塞用戶線程,例如future.channel().closeFuture().sync()。
Netty 的Future 接口,在繼承了java.util.concurrent.Future的基礎上,增長了一系列監聽器方法,好比addListener()、removeListener() 等等。Netty強烈建議,經過添加監聽器的方式獲取IO結果,而不是經過JDK Future的同步等待的方式去獲取IO結果。
Netty擴展了Java的Future,增長了監聽器Listener接口,經過監聽器可讓異步執行更加有效率,不須要經過get來等待異步執行結束,而是經過監聽器回調來精確地控制異步執行結束的時間點。
這一點,正好是Netty在Future模式的最主要的改進。
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); boolean isCancellable(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> sync() throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); V getNow(); boolean cancel(boolean mayInterruptIfRunning); }
Netty的出站和入站操做,都是異步的。
以最爲經典的NIO出站操做——write出站爲例,說一下ChannelFuture的使用。
代碼以下:
ChannelFuture future = ctx.channel().write(msg); future.addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { // write操做完成後的回調代碼 } });
在write操做調用後,Netty並無完成對Java NIO底層鏈接的寫入操做,出站操做是異步執行的。
若是須要獲取IO結果,可使用回調的方式。
使用ChannelFuture的異步完成後的回調,須要搭配使用另外的一個接口ChannelFutureListener ,他從父接口哪裏繼承了一個被回調到的operationComplete操做完成的方法。
ChannelFutureListener 的父親接口是GenericFutureListener 接口。
定義以下:
public interface GenericFutureListener <F extends Future<?>> extends EventListener { void operationComplete(F future) throws Exception; }
異步操做完成後的回調代碼,放在operationComplete方法中的實現中,就能夠了。
Netty的Future,只是增長了監聽器。整個異步的狀態,是不能進行設置和修改的。
換句話說,Future是隻讀的,是不能夠寫的。
因而,Netty的 Promise接口擴展了Netty的Future接口,它表示一種可寫的Future,就是能夠設置異步執行的結果。
部分源碼以下:
public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V result); Promise<V> setFailure(Throwable cause); boolean setUncancellable(); //.... }
而ChannelPromise接口,則繼承擴展了Promise和ChannelFuture。因此,ChannelPromise既綁定了Channel,又具有了監聽器的功能,還能夠設置IO操做的結果,是Netty實際編程使用的最多的接口。
在AbstratChannel的代碼中,至關多的IO操做,都會返回ChannelPromise類型實例做爲調用的返回值。 經過這個返回值,客戶程序能夠用於讀取IO操做的結果,執行IO操做真正完成後的回調。
在AbstractChannel中,定義了幾個對Channel的異步狀態進行監控的Promise和Future成員,用於監控Channel的鏈接是否成功,鏈接是否關閉。
源碼以下:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //鏈接成功的監控 private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); //鏈接關閉的監控 private final CloseFuture closeFuture = new CloseFuture(this); //... }
通常狀況下,在應用程序中使用以下的代碼:
// Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync();
通常來講,編寫以上代碼的都是在Main線程中用來啓動ServerBootStrap的,因此Main線程會被阻塞,保證服務端Channel的正常運行。
上面的代碼中,channel.closeFuture()不作任何操做,只是簡單的返回channel對象中的closeFuture對象。而CloseFuture的sync方法,會將當前線程阻塞在CloseFuture上。
那麼,f.channel().closeFuture().sync() 實際是如何工做的呢?
CloseFuture繼承了DefaultPromise的sync同步方法。
DefaultPromise的代碼以下:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { private volatile Object result; //... @Override public Promise<V> sync() throws InterruptedException { await(); //... } @Override public Promise<V> await() throws InterruptedException { //... synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); //阻塞了,死等 } finally { decWaiters(); } } } return this; } //... }
從源碼能夠看出,sync方法,調用了await方法。
在await方法中,CloseFuture 使用java 基礎的synchronized 方法進行線程同步;而且,使用CloseFuture.wait / notify 這組來自Object根類中的古老方法進行線程之間的等待和喚醒。
在await方法,不斷的自旋,判斷當前的 CloseFuture 實例的結果是否已經完成,若是沒有完成 !isDone() ,就不斷的等待。一直到 isDone() 的值爲true。
isDone() 的源碼以下:
@Override public boolean isDone() { return isDone0(result); } private static boolean isDone0(Object result) { return result != null && result != UNCANCELLABLE; }
CloseFuture的 isDone() 的條件是否可以知足,和Channel的close 關閉鏈接的出站操做有關。
下一步,咱們來看 isDone() 的條件,如何纔可以知足?
在Netty中,close 關閉鏈接的操做,屬於全部的出站操做的一種。關於Netty出站處理的流程,在前面的文字中,已經很是詳細的介紹了。這裏再也不贅述,只是簡單的列出一個流程圖。
close 關閉鏈接的出站操做,其流程以下圖所示:
一溜兒下來,最終會落地到unsafe.doClose 方法。
看看unsafe.doClose,是如何與CloseFuture的 isDone() 的條件進行關聯的。
unsafe.doClose 方法中,設置了CloseFuture 的result值。
unsafe.doClose 源碼以下:
protected abstract class AbstractUnsafe implements Unsafe { private void close(final ChannelPromise promise,…) { //… try { // Close the channel doClose0(promise); } finally { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } //…… } } private void doClose0(ChannelPromise promise) { try { doClose(); closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } } //…… }
在closeFuture.setClosed() 設置關閉的結果的過程當中,主要完成如下三個工做:
1 設置result的值
2 notifyAll,喚醒在本Promise上等待的線程
3 回調listener
closeFuture.setClosed()的主要源碼以下:
boolean setClosed() { return super.trySuccess(); } @Override //這個定義在父類中 public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }
上面的 notifyListeners()調用,就是用來喚醒了等待在closeFuture 實例對象上的等待線程。
到了這裏,終於鬆了一口氣了。
以前經過 f.channel().closeFuture().sync() 同步操做,阻塞在哪兒的Main線程,終於經過channel.close() 方法,給喚醒了。
在上面的源碼中,最終觸發future對象的notify動做的線程,都是eventLoop線程(Reactor線程)。
通常狀況下,Channel的出站和入站操做,也都是在eventLoop線程的輪詢任務中完成的。
例如由於不管是用戶直接關閉channel,或者eventLoop的輪詢狀態關閉channel,都會在eventLoop的線程內完成notify動做。notify那些經過sync操做,正在等待CloseFuture的哪些阻塞線程。
因此不要在Reactor線程內調用future對象的sync或者await方法。若是在Reactor線程進行sync或者await,會有可能引發死鎖。
爲何呢?
在Reactor線程進行sync時,會進入等待狀態,等待Future(DefaultPromise)的 isDone 的條件知足。經過前面的例子,咱們已經看到了,而Future的isDone的條件,又須要Reactor線程的出站或者入站操做來知足。這是,Reactor線程既然已經處於等待狀態,怎麼可能再進行其餘的出站或者入站操做呢?至關於本身等本身,這就是典型的死鎖。
在實際開發中,因爲應用程序代碼都是編寫在自定義的channelHandler處理器中,而channelHandler是在eventLoop線程(Reactor線程)內執行的。因此,不能在channelHandler中調用Future(DefaultPromise)的sync或者await兩個同步方法。
正確的作法是:經過給Future(DefaultPromise) 增長listeners監聽器 的方式,來干預異步操做的過程,處理異步操做的結果。
這樣,能夠避免使用Future帶來的死鎖。
瘋狂創客圈 Netty 死磕系列 10多篇深度文章: 【博客園 總入口】 QQ羣:104131248