首先咱們來看看future和promise接口總體設計java
最頂層的future是jdk的,第二個是netty自定義的future,兩個同名,繼承關係promise
看看jdk的future接口異步
public interface Future<V> { // 取消任務 boolean cancel(boolean mayInterruptIfRunning); // 任務是否取消 boolean isCancelled(); // 任務是否完成 boolean isDone(); // 阻塞的獲取執行的結果 V get() throws InterruptedException, ExecutionException; // 在必定時間內-超時-阻塞的獲取執行的結果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
瞅瞅netty的future接口async
public interface Future<V> extends java.util.concurrent.Future<V> { // 是否成功 boolean isSuccess(); // 是否取消 boolean isCancellable(); Throwable cause(); // 添加listener進行回調 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 阻塞的等待任務執行,若是失敗則拋出失敗緣由的異常 Future<V> sync() throws InterruptedException; // 不響應中斷等待異常 Future<V> syncUninterruptibly(); // 阻塞等待任務執行,失敗不拋異常 Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 立刻獲取到任務的結果,不阻塞,而jdk的future是阻塞的 V getNow(); // 取消任務執行,若是取消成功,任務會由於 CancellationException 異常而致使失敗 // 也就是 isSuccess()==false,同時上面的 cause() 方法返回 CancellationException 的實例。 // mayInterruptIfRunning 說的是:是否對正在執行該任務的線程進行中斷(這樣才能中止該任務的執行), // 彷佛 Netty 中 Future 接口的各個實現類,都沒有使用這個參數 @Override boolean cancel(boolean mayInterruptIfRunning); }
netty的future在jdk的基礎上擴展了它須要的方法,sync和await的區別咱們放到下面看實現類的時候說ide
同時咱們也能夠看到,這個future接口跟io操做是無關的函數
接下來咱們看看ChannelFuture接口,接口註釋上寫的很清楚,咱們來看看ui
* The result of an asynchronous {@link Channel} I/O operation.
* <p>
* All I/O operations in Netty are asynchronous. It means any I/O calls will
* return immediately with no guarantee that the requested I/O operation has
* been completed at the end of the call. Instead, you will be returned with
* a {@link ChannelFuture} instance which gives you the information about the
* result or status of the I/O operation.
* <p>
* A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
* When an I/O operation begins, a new future object is created. The new future
* is uncompleted initially - it is neither succeeded, failed, nor cancelled
* because the I/O operation is not finished yet. If the I/O operation is
* finished either successfully, with failure, or by cancellation, the future is
* marked as completed with more specific information, such as the cause of the
* failure. Please note that even failure and cancellation belong to the
* completed state.
全部io操做都是異步的,一個io操做的調用會當即返回一個帶有結果或者狀態的io實例。
io操做要麼是未完成的,要麼是完成的。當它開始時,future會被建立,一開始是未完成的,未完成的時候沒有成功、失敗或者取消狀態
當它是完成的時候,能夠是失敗或者取消的,失敗或者取消緣由會被附加到future上。
* <pre> * +---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+ * </pre>
上面那個狀態遷移圖很清楚了,在兩種過程的時候會有什麼狀態,咱們看看接口this
public interface ChannelFuture extends Future<Void> { // 返回future關聯的channel Channel channel(); // 重寫下面幾個方法,修改返回值爲channelfuture @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); /** * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the * following methods: * <ul> * <li>{@link #addListener(GenericFutureListener)}</li> * <li>{@link #addListeners(GenericFutureListener[])}</li> * <li>{@link #await()}</li> * <li>{@link #await(long, TimeUnit)} ()}</li> * <li>{@link #await(long)} ()}</li> * <li>{@link #awaitUninterruptibly()}</li> * <li>{@link #sync()}</li> * <li>{@link #syncUninterruptibly()}</li> * </ul> 標記該future是void的,使不能使用上面的方法 */ boolean isVoid(); }
netty實際上是強烈建議直接經過添加監聽器的方式來獲取io操做結果,或者進行後續操做的,ChannelFuture能夠增長或者刪除一個多個 GenericFutureListener,它定義以下spa
public interface GenericFutureListener<F extends Future<?>> extends EventListener { void operationComplete(F future) throws Exception; }
執行完後會回調 operationComplete方法線程
注意一點,不要在ChannelHandler中調用ChannelFuture的await方法,會致使死鎖。這是由於發起io操做後,由io線程負責異步通知發起io操做的用戶線程,若是io線程和用戶線程是同一個的話,就會致使io線程等待本身通知操做完成,這就會致使死鎖,本身掛死本身。
咱們繼續看promise接口,
public interface Promise<V> extends Future<V> { // 標記該future成功及設置結果,並通知全部listener // 若是失敗的話拋異常 Promise<V> setSuccess(V result); // 和setsuccess同樣,只是失敗的話返回false boolean trySuccess(V result); // 標記future失敗,而後通知listener Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 標記該future 不可被取消 boolean setUncancellable(); // 下面跟ChannelFuture同樣,都是覆蓋重寫方法 @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }
Promise是可寫的future,Future自己並無寫操做相關的接口,netty經過Promise對其進行擴展,用於設置io操做的結果。Promise 實例內部是一個任務,任務的執行每每是異步的,一般是一個線程池來處理任務。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 未來會被某個執行任務的線程在執行完成之後調用,同時那個線程在調用 setSuccess(result) 或 setFailure(t) 後會回調 listeners 的回調函數(固然,回調的具體內容不必定要由執行任務的線程本身來執行,它能夠建立新的線程來執行,也能夠將回調任務提交到某個線程池來執行)。並且,一旦 setSuccess(...) 或 setFailure(...) 後,那些 await() 或 sync() 的線程就會從等待中返回。
接下來咱們看看ChannelPromise
public interface ChannelPromise extends ChannelFuture, Promise<Void> { @Override Channel channel();
@Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause);
@Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. */ ChannelPromise unvoid(); }
看方法其實很清楚,基本都是覆寫綜合了ChannelFuture和Promise接口的,就返回值變了,看看咱們一開始的類繼承圖,ChannelPromise 接口同時繼承了 ChannelFuture 和 Promise,最終繼承的都是Future接口,接下來咱們看看具體的實現類DefaultPromise吧
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { // 爲了後面操做成功後經過cas來保存結果到result字段 @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); // result爲null的時候默認值 private static final Object SUCCESS = new Object(); // 操做成功後cas比對的值 private static final Object UNCANCELLABLE = new Object(); // 保存執行的結果 private volatile Object result; // 線程執行器 private final EventExecutor executor; // 監聽者 private Object listeners; /** * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). */ // 等待這個 promise 的線程數(調用sync()/await()進行等待的線程數量) private short waiters; /** * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the * executor changes. */ // 是否喚醒正在等待線程,用於防止重複執行喚醒,否則會重複執行 listeners 的回調方法 private boolean notifyingListeners; .... }
屬性看完了,咱們能夠看看它主要的方法
@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } @Override public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } return false; } @Override public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this, cause); } @Override public boolean tryFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners(); return true; } return false; }
set和try的區別就是返回值不同而已,咱們看看底層的方法 setSuccess0
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; }
就是經過cas來把objResult保存到result屬性上,而後Notify其餘線程。其餘方法都差很少,能夠比對看看
咱們再看個await方法
public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; }
若是當前Promise已被設置,則返回;若是碰到線程中斷則響應中斷;檢查死鎖,因爲在IO線程中調用Promise的await方法或者sync方法會致使死鎖,前面說過的,因此須要檢驗保護,斷定當前線程是不是io線程;同步鎖定當前Promise對象,循環斷定是否設置完成,使用循環是避免僞喚醒,防止線程 被意外喚醒致使功能異常。
接下來咱們順便也看下sync方法
@Override public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); return this; }
首先調用await方法,而後看是否須要拋出異常,若是任務失敗的話就從新拋出異常,這也是兩方法區別了。
DefaultChannelPromise實現咱們就不看了,基本都是基於DefaultPromise的,只是返回值都是 ChannelPromise而已。
下面咱們來寫個例子吧
public class ChannelPromiseExample extends Thread{ private static final Object object = new Object(); public static void main(String[] args) { final DefaultEventExecutor executor = new DefaultEventExecutor(); final Promise<Integer> promise = executor.newPromise(); // 任務seccess或者failure來回調operationComplete 方法 promise.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { System.out.println(Thread.currentThread().getName() + " 第一個監聽器"); if (future.isSuccess()) { System.out.println("任務成功,result:" + future.get()); } else { System.out.println("任務失敗,result:" + future.cause()); } } }).addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { System.out.println(Thread.currentThread().getName() + " 第二個監聽器"); } }); // 提交任務 executor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //能夠設置成功或者失敗 //promise.setSuccess(1); promise.setFailure(new Throwable("FAILURE")); } }); try { System.out.println("promise wait begin"); //promise.sync(); promise.await(); System.out.println("promise wait end"); } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } } }
能夠體會下await和sync的區別