ForkJoin框架之CompletableFuture

前言

近期做者對響應式編程愈加感興趣,在內部分享"JAVA9-12"新特性過程當中,有兩處特性讓做者深感興趣:
1.JAVA9中的JEP266對併發編程工具的更新,包含發佈訂閱框架Flow和CompletableFuture增強,其中發佈訂閱框架以java.base模塊下的java.util.concurrent.Flow及其中的幾個內部類/接口爲組成部分,它們的名稱和做用以下,摘自JAVA12的Flow api文檔。java

clipboard.png
2.JAVA9中孵化,JAVA11中標準化的HttpClient,在以前分享的JAVA9-12新特性一文中曾引用摘自網絡的HttpClient代碼片斷:
片斷1:web

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create(uri))
    .build();

return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
    .thenApply(HttpResponse::body);
}

片斷2:spring

HttpClient client = HttpClient.newHttpClient();
List<String> urls = List.of("http://www.baidu.com","http://www.alibaba.com/","http://www.tencent.com");
List<HttpRequest> requests = urls.stream()
    .map(url -> HttpRequest.newBuilder(URI.create(url)))
    .map(reqBuilder -> reqBuilder.build())
    .collect(Collectors.toList());

List<CompletableFuture<HttpResponse<String>>> futures = requests.stream()
    .map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString()))
    .collect(Collectors.toList());
futures.stream()
    .forEach(e -> e.whenComplete((resp,err) -> {
        if(err != null){
            err.printStackTrace();
        }else{
            System.out.println(resp.body());
            System.out.println(resp.statusCode());
        }
    }));
CompletableFuture.allOf(futures
    .toArray(CompletableFuture<?>[]::new))
    .join();
}

在片斷1中,thenApply方法是CompletableFuture的成員,client.sendAsync返回的是一個CompletableFuture。這兩段代碼很好閱讀,甚至說猜出其中的意義。片斷2能夠說對於做者目前的書寫習慣是一個全面的顛覆,顯然咱們能夠預先定義響應行爲,而行爲的執行時間則由前一個階段的實際完成時間決定。片斷2中的whenComplete方法很好理解,最後一行用allOf生成一個相似樹的依賴結構,在當前方法中等待全部CompletableFuture執行完成。編程

簡單看這兩段代碼,響應式編程的魅力可見一斑,甚至能夠說是美妙趣橫生。
那麼,做爲JAVA9中額外照顧加強,HttpClient賴以實現的CompletableFuture,它是何方神聖呢?api

CompletionStage接口

CompletionStage是什麼?不妨賣個關子先。
做者目前使用的JDK版本爲8,儘管它不包含9以後的加強,萬幸CompletionStage是從JDK8引入,所以足以用以瞭解這一偉大神器了。近期做者在公司使用的一些開源框架中,發現至處間接對它的使用:
1.持久化框架Redission。它內部使用一個RedissonExecutorService(實現ScheduledExecutorService)和PromiseDelegator(實現CompletionStage,而CompletableFuture一樣也實現了CompletionStage)來異步地執行task。
2.apollo配置中心。它提供了配置變動的異步通知機制,而這依賴於spring web-mvc提供的DeferredResult,而在異步處理return value時,DeferredResult的setResult一樣也是相應的CompletionStage執行。網絡

//例:阿波羅NotificationControllerV2拉取通知接口
@GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>     pollNotification(
  @RequestParam(value = "appId") String appId,
  @RequestParam(value = "cluster") String cluster,
  @RequestParam(value = "notifications") String notificationsAsString,
  @RequestParam(value = "dataCenter", required = false) String dataCenter,
  @RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
//省略無關代碼
//DeferredResultWrapper是apollo做者包裝的spring DeferredResult
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
//省略無關代碼
if (!CollectionUtils.isEmpty(newNotifications)) {
  deferredResultWrapper.setResult(newNotifications);
} else {
  deferredResultWrapper
      .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
 
  deferredResultWrapper.onCompletion(() -> {
    //unregister all keys
    for (String key : watchedKeys) {
      deferredResults.remove(key, deferredResultWrapper);
    }
    logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
  });
 
  //省略
return deferredResultWrapper.getResult();
}

在spring的CompletionStageReturnValueHandler的handleReturnValue()方法中,以下異步地處理響應結果:併發

@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
 
    if (returnValue == null) {
        mavContainer.setRequestHandled(true);
        return;
    }
 
    final DeferredResult<Object> deferredResult = new DeferredResult<Object>();
    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
 
    @SuppressWarnings("unchecked")
    CompletionStage<Object> future = (CompletionStage<Object>) returnValue;
    future.thenAccept(new Consumer<Object>() {
        @Override
        public void accept(Object result) {
            //在一個CompletionStage完成後執行此方法,爲defferedResult設值
            deferredResult.setResult(result);
        }
    });
    future.exceptionally(new Function<Throwable, Object>() {
        @Override
        public Object apply(Throwable ex) {
            //在一個CompletionStage執行出錯後執行此方法,爲deferredResult設值
            deferredResult.setErrorResult(ex);
            return null;
        }
    });
}

以上代碼的future.thenAccept與future.exceptionally只是規定了兩種狀況下程序接下來的運行行爲,相應的代碼不是當即執行,而是等到相應的行爲發生了纔去執行。很明顯,同步式編程寫流程,響應式編程彷佛就是在寫行爲。
顯然,只要熟悉了CompletionStage的api,以上的代碼就絕對簡單了,好了,開胃菜已上完,接下來介紹CompletionStage。
CompletionStage其實很好理解,按照官方定義,它表示一個可能異步運行的「階段」,在該階段內要執行相應的行爲,而這些運算會在另外一個CompletionStage完成後開始,它自身完成後又可觸發另外一個依賴的CompletionStage。mvc

clipboard.png

在CompletionStage中這些方法都可用來定義一個行爲,行爲的執行方式可參考方法名和入參,這與java8中的stream api持一樣的風格。行爲參數能夠是Consumer,Function,Runnable。包含accept的方法,參數會有一個Consumer,它會消費上一或多個CompletionStage的結果;包含run的方法,參數會有一個Runnable,它的運行不須要前面CompletionStage的執行結果;包含apply的方法,參數會包含Function,該function通常之前一或幾階段的返回值爲入參,以自身的執行結果做爲當前CompletionStage的結果。
CompletionStage和實現類ComletableFuture的方法名中也會包含either/all/any等簡單的單詞,和上述的含義相組合,不難理解。
以如下三個接口爲例說明:app

1.public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);

接口會返回一個CompletionStage,該stage僅在當前stage和參數中的other正常完成後纔會執行參數中的action。框架

2.public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);

接口會返回一個CompletionStage,該stage會在當前stage或參數中的other正常執行完畢後異步執行參數中的函數fn,而fn的參數就是前面執行完畢的stage的結果,fn的返回值將是被返回的stage的結果。

3.public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

接口會返回一個CompletionStage,它會在當前stage和參數中的other正常執行完畢後執行,以這兩個stage的結果做爲參數,在參數executor線程池中執行action函數,由於它是一個消費者,所以沒有返回值。
接口的其餘方法邏輯相似,再也不綴述。

CompletableFuture源碼

上一節簡述了CompletionStage接口的函數定義,做爲官方提供的實現類,CompletableFuture實現了有關的全部接口,它的做者依舊是我等膜拜的道格大神,下面來具體分析CompletableFuture的實現。

類簽名:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
從簽名信息來看,CompletableFuture實現了Future和CompletionStage接口,這意味着它即知足CompletableStage的階段執行,也提供了Future中獲取該執行結果的方法。

首先來當作員變量和核心函數:

volatile Object result;       // 當前對象的結果或者一個異常包裝對象AltResult,關於AltResult下面再看
volatile Completion stack;    // 任務棧,Completion後面再述。
 
final boolean internalComplete(Object r) {
//使用cas原子操做,將本來爲null的result置爲r,全部調用者都保證r不是null,所以只有第一次才能返回true。
    return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
}
 
final boolean casStack(Completion cmp, Completion val) {
//嘗試用cas原子操做將當前stack的值從cmp換爲val。
    return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
}
//其中STACK,RESULT就是上面stack和result的句柄,這點和其餘juc中的工具慣例相同
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
    try {
        final sun.misc.Unsafe u;
        UNSAFE = u = sun.misc.Unsafe.getUnsafe();
        Class<?> k = CompletableFuture.class;
        RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
        STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
        NEXT = u.objectFieldOffset
            (Completion.class.getDeclaredField("next"));
    } catch (Exception x) {
        throw new Error(x);
    }
}

stack的類型爲Completion,爲了方便理解,在介紹Completion類以前,先看幾個聲明在CompletableFuture的常量

static final int SYNC   =  0;//同步
static final int ASYNC  =  1;//異步
static final int NESTED = -1;//嵌套

再來看Completion類的結構

//繼承ForkJoinTask,實現Runnable,以及簽名接口AsynchronousCompletionTask
abstract static class Completion extends ForkJoinTask<Void>  implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // 指向下一個Completion
 
    //當被觸發時,執行completion動做,若是存在須要傳遞的行爲,
    //返回一個表明該行爲的CompletableFuture  
    //參數只能是上面提到的SYNC,ASYNC,NESTED,後面留意它的正負。  
    abstract CompletableFuture<?> tryFire(int mode);
 
    //若是當前completion依舊是可觸發的,則返回true,這會在清理任務棧時使用. 
    abstract boolean isLive();
    //繼承自Runnable,直接調用tryFile,參數爲1
    public final void run() { tryFire(ASYNC); }
    //繼承自ForkJoinTask,直接調用tryFile,參數爲1,返回true
    public final boolean exec() { tryFire(ASYNC); return true; }
    //繼承自ForkJoinTask,直接返回null
    public final Void getRawResult() { return null; }
    //繼承自ForkJoinTask,空方法。
    public final void setRawResult(Void v) {}
}

上面列舉了內部類Completion的所有代碼,它繼承並實現了ForkJoinTask和Runnable中的抽象方法,同時聲明瞭tryFire這個抽象方法供子類實現。由於繼承了ForkJoinTask,這意味着Completion也是一個任務,且它可能在ForkJoinPool中執行。關於Completion和它的子類後面詳述。先來繼續看核心函數和成員實現。

/** 嘗試將一個任務壓棧,成功返回true */
final boolean tryPushStack(Completion c) {
    Completion h = stack;
    lazySetNext(c, h);//把當前的棧設置爲c的next
    //嘗試把當前棧(h)更新爲新值(c)
    return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
//lazySetNext定義
static void lazySetNext(Completion c, Completion next) {
    UNSAFE.putOrderedObject(c, NEXT, next);
}

方法tryPushStack的流程很簡單,先調用lazySetNext將當前棧設置爲參數的next,這樣達到了棧的後入爲頂層的目的,而後試圖將頂部元素設置爲新壓入棧的c。

/** 不加鎖將任務壓棧,使用cas加自旋的方式,這也是道格大神的經典. */
final void pushStack(Completion c) {
    do {} while (!tryPushStack(c));
}

接下來是一些對輸出結果編碼的代碼。

//內部類,用於對null和異常進行包裝,從而保證對result進行cas只有一次成功。
static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}
 
/** 空值用一個ex爲null的AltResult表示 */
static final AltResult NIL = new AltResult(null);
 
/** 使用上面的NIL完成任務,若任務已經被完成過,返回false */
final boolean completeNull() {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       NIL);
}
 
/** 對空值進行編碼,使用NIL */
final Object encodeValue(T t) {
    return (t == null) ? NIL : t;
}
 
/** 使用t完成當前任務,t是null時使用NIL做爲結果,不然使用t */
final boolean completeValue(T t) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       (t == null) ? NIL : t);
}
 
//對異常進行編碼,返回一個AltResult,其值ex取決於參數x,
//若x爲CompletionException則直接用x賦值ex,
//不然用CoimpletionException包一層。 
static AltResult encodeThrowable(Throwable x) {
    return new AltResult((x instanceof CompletionException) ? x :
                         new CompletionException(x));
}
 
/** 使用參數提供的異常的編碼結果完成任務,若result已非空,返回false */
final boolean completeThrowable(Throwable x) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeThrowable(x));
}
 
// 若是x非CompletionException,將它包裹成CompletionException返回。
//若是不是,則判斷,若r是AltResult且其ex就是參數x的值,則將r返回。  
// 不然將x包裹成AltResult返回。
static Object encodeThrowable(Throwable x, Object r) {
    if (!(x instanceof CompletionException))
        x = new CompletionException(x);
    else if (r instanceof AltResult && x == ((AltResult)r).ex)
        return r;
    return new AltResult(x);
}
 
// 給定一個Throwble x,一個Object r,使用上面的方法編碼的結果來嘗試完成。  
final boolean completeThrowable(Throwable x, Object r) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeThrowable(x, r));
}
 
//若是x不是null,使用上面的encodeThrowable對x編碼的結果返回,不然若t是空,  
// 返回NIL,不然返回t。
Object encodeOutcome(T t, Throwable x) {
    return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
}
 
 
static Object encodeRelay(Object r) {
    Throwable x;
    //對非空參數r進行判斷。
    //若r是AltResult且具有非空的ex,且ex並非CompletionException類型,
    //將ex包裝成CompletionException,幷包裹成AltResult返回。
    //其餘狀況直接返回r。
    return (((r instanceof AltResult) &&
             (x = ((AltResult)r).ex) != null &&
             !(x instanceof CompletionException)) ?
            new AltResult(new CompletionException(x)) : r);
}
 
 
final boolean completeRelay(Object r) {
//這段代碼的邏輯和上一個方法聯合去看,當前未完成的狀況下,嘗試使用參數r完成。
//若是r是異常,嘗試將它包裝成CompletionException並外包一層AltResult。
//用這個AltResult完成。
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeRelay(r));
}

CompletableFuture本質也是一個Future,所以也會支持異步的阻塞的result獲取。由於在完成這個future時,爲了便於處理和維護,使用了編碼的結果,固在讀取結果時,也要對結果進行解碼。

/**  * 供future.get()使用。  */
private static <T> T reportGet(Object r)
    throws InterruptedException, ExecutionException {
    if (r == null)
    //參數r表明一個CompletableFuture的result,由於它會對異常和null進行編碼。
    //故null能夠視爲get的中間被擾動的結果。
        throw new InterruptedException();
    if (r instanceof AltResult) {
        Throwable x, cause;
        //這一段很簡單,是AltResult,ex是空返回空。
        if ((x = ((AltResult)r).ex) == null)
            return null;
             
        if (x instanceof CancellationException)
        //ex是取消異常,轉換後拋出。
            throw (CancellationException)x;
        if ((x instanceof CompletionException) &&
            (cause = x.getCause()) != null)
            //異常是包裝異常CompletionException,取出被包裝的異常拋出。
            x = cause;
        throw new ExecutionException(x);
    }
    //result不是null也不能體現異常,強轉返回。
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}
 
//reportJoin方法相對簡單,由於join操做會一直等待,r能保證非空。  
//對於非AltResult類型的r直接強轉返回,AltResult類型的處理與  
//reportGet相似,可是不解CompletionException,直接拋出。  
//此方法拋出的異常均不受檢。 
private static <T> T reportJoin(Object r) {
    if (r instanceof AltResult) {
        Throwable x;
        if ((x = ((AltResult)r).ex) == null)
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x;
        if (x instanceof CompletionException)
            throw (CompletionException)x;
        throw new CompletionException(x);
    }
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

相應的get和join方法實現。

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

能夠看出,get和join方法分別先調用reportGet,reportJoin,若獲得的空結果,會繼續調用waitingGet方法,只是參數分別爲true和false,waitingGet方法的實現須要先了解剩餘的核心函數以及Completion子類,稍後再看。

一些與異步操做的準備:

/**  * 標識是異步方法產生的任務的接口,對於異步行爲的監控,debug,追蹤會頗有用。  
*     在jdk8的CompletableFuture實現中,它有三個直接實現類,AsyncRun,  
*     AsyncSupply以及前面提到過的Completion。  
*/
public static interface AsynchronousCompletionTask {
}
//判斷是否使用ForkJoinPool的common線程池,在ForkJoinTask中持有該線程池的引用。
//判斷規則是可用cpu核數大於1.
private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);
 
//異步線程池,根據上述判斷,決定使用commonPool仍是ThreadPerTaskExecutor,  
// 後者是一個對每個任務都新建一個線程的low逼線程池。 
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 
/** low逼線程池源碼,沒什麼可說的 */
static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}
 
 
static Executor screenExecutor(Executor e) {
    if (!useCommonPool && e == ForkJoinPool.commonPool())
    //判斷參數執行器(線程池的父接口,通常會傳入線程池)是否須要屏蔽,
    //若是參數就是ForkJoinPool.commonPool()而且經前面的系統判斷
    //useCommonPool爲false,則強制使用asyncPool。
        return asyncPool;
    if (e == null) throw new NullPointerException();
//非空且經過驗證,返回參數e
return e;

}

爲異步作的這些準備很好理解,屏蔽不合理的線程池使用,在用戶提供的線程池,commonPool和ThreadPerTaskExecutor之中擇一,在後續的操做中須要使用它們。
還有兩個重要的核心函數,是道格大神的神做。

final void postComplete() {
     
    CompletableFuture<?> f = this; Completion h;//初始化f爲this
    while ((h = f.stack) != null ||//1,f的棧非空
           (f != this && (h = (f = this).stack) != null)) {//2 f的棧爲空且不是this,重置
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {//3 h出棧
            if (t != null) {//4 出棧的h不是最後一個元素,最後一個元素直接執行7便可,減小一次循環cas競態
                if (f != this) {//f不是this
                    pushStack(h);//5 將f剛出棧的h(頂)入this的棧(頂)
                    continue;
                }
                h.next = null;    //6 detach 幫助gc
            }
            //tryFire參數爲NESTED,即-1,這是它惟一一次使用。
            f = (d = h.tryFire(NESTED)) == null ? this : d;//7 f棧的最後一個元素或者就是this棧中的元素
        }
    }
}

這寥寥數行代碼的含金量不可小覷。它應在將要完成時調用,很明顯,它會將當前CompletableFuture的棧以及傳遞依賴的其餘CompletableFuture的棧清空。爲了便於解釋,在相應的代碼上打出了編號,下面詳細分析。

調用該方法,首先進入1,此時f是當前CompletableFuture,h是它的stack,知足不爲空的判斷,進入3.

到達3時,將棧頂Completion h出棧,通常除非併發多個線程對同一個CompletableFuture調用postComplete,不然必定會成功併到達4。若出現多個線程調用,cas失敗,則從新循環。

到達4後,若發現f的棧已空,則直接進入7,不然判斷f是否爲當前CompletableFuture,如果,則進行6,取消h和t的關聯,若不是則進入5,將h(f中剛剛移除的棧頂)壓入當前Completable的棧並從新循環。

顯然,只要處理當前CompletableFuture的棧,就必定會執行7,只要處理的是另外一個CompletableFuture的棧,就會將其出棧,而後壓入當前CompletableFuture的棧。

在7處,會嘗試執行棧頂的Completion的tryFile方法,它會返回一個可能爲null的CompletableFuture,若非空,則賦給f,不然將this賦給f。

因此這段方法的真實執行流程:當前CompletableFuture的棧中元素逐個出棧並tryFile,發現新的CompletableFuture,將它的元素反向壓入本CompletableFuture的棧,壓入結束後,繼續對棧中元素逐個出棧並tryFire,發現非空CompletableFuture則繼續上述過程。直到本CompletableFuture的棧中再也不有元素(此時tryFire返回的CompletableFuture棧也是空的)爲止。

膜拜道格大神的同時,順便點一下,這彷佛是一種避免遞歸的方式。只不過tryFire返回的CompletableFuture中的棧元素將會反向執行。

/* 遍歷棧並去除死亡任務/

final void cleanStack() {
    for (Completion p = null, q = stack; q != null;) {//初始條件,q指向null時終止。
        Completion s = q.next;//循環內第一行,q永遠指向棧頂,s永遠指向棧頂第二個元素或者null
        if (q.isLive()) {//a只要q存活,就將p指向q,並將q指向s
            p = q;
            q = s;
        }
        else if (p == null) {//b q不存活,p是null,兩種可能,從未見到存活的節點,或執行過最後的重啓
            casStack(q, s);/將q出棧
            q = stack;//變量q從新指向新的棧頂。
        }
        else {
            p.next = s;//q已死亡,且當前已經找到過存活的元素。p指向q的下一個元素s,從而將q出棧
            if (p.isLive())//c判斷p是否存活,而p只能是null或者最近一個存活的Completion
                q = s;//6.q前進
            else {//4
                p = null;  //d 從新將p置null並將q指向當前的棧,重啓循環。
                q = stack;
            }
        }
    }
}

爲了讓這段代碼的說明更加清晰,不妨舉個簡單的例子說明。

假定當前CompletableFuture的棧中有1-9個元素,其中14568在調用cleanStack方法時已死亡,在執行過程當中,也出現方法執行過程當中出現死亡的狀態。
進入循環,p爲null,q指向1,知足循環條件,開始第一輪循環。
第一輪循環進入後,s指向2,p爲null,q指向1,是個死亡對象,所以在第一個判斷條件a處未能經過,b判斷條件爲真,q被移除,循環結束,此時p爲null,q指向2,棧變爲2-9.
第二輪循環進入,s指向3,p爲null,q指向2,是個存活對象,進入a,循環結束,p指向2,q指向3。棧依舊爲2-9.
第三輪循環進入,s指向4,p爲2,q指向3,是存活對象,進入a,循環結束,p指向3,q指向4,棧保持2-9不變。
第四輪循環進入,s指向5,p爲3,q指向4,是個死亡對象,p非空且存活,進入c,則p保持爲3,3的next指向5,q指向5.循環結束,棧變爲2356789.
第五輪循環進入,s指向6,p指向3,q指向5,是個死亡對象,p非空且存活,進入c,p保持爲3,3的next指向6,q指向6,循環結束,棧變爲236789.
第六輪循環進入,s指向7,p指向3,q指向6,是個死亡對象,假定此時3死亡,則3的next指向7,進入d分支,p爲null,q爲2,棧爲23789.
第七輪循環進入,s指向3,p爲null,q指向2,是個存活對象,p指向2,q指向3,棧依舊爲23789.
第八輪循環進入,s指向4,p指向2,q指向3,是個死亡對象,p非空且存活,進入c,則p保持爲2,q指向7,3的next指向7,棧變2789.
第九輪進入,s指向8,p指向2,q指向7,是個存活對象,進入a分支,p變爲7,q變爲8,棧保持2789.假定此步以後2死亡,但此時p已經指向7.
第十輪進入,s指向9,p指向7,q指向8,是個死亡對象,p當前指向7且存活,因此儘管2不存活,仍舊進入分支c,p保持爲7,q指向9,7的next指向9.棧爲279.
第十一輪,s爲null,p指向7,q指向9,是個存活對象,則進入a分支,p變爲9,q變爲null,棧保持279.
因q爲null,循環終止。棧通過清理只剩下279三個元素,其中2由於巧合而死亡且未被清理。

下面回到Completion,Completion是一個抽象類,前面已經簡單展現它的源碼,它的子類以下:

clipboard.png
能夠看到有三個直接子類,CoCompletion,Signaller和UniCompletion。UniCompletion又有若干子類,它們分別做爲一些CompletionStage中聲明方法的實現工具,很明顯,道格大神在此處大量使用了策略模式。
先來簡單看一下CoCompletion的實現:

static final class CoCompletion extends Completion {
    //CoCompletion徹底委託給base執行。
    BiCompletion<?,?,?> base;
    CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
    final CompletableFuture<?> tryFire(int mode) {
        BiCompletion<?,?,?> c; CompletableFuture<?> d;
        if ((c = base) == null || (d = c.tryFire(mode)) == null)
            //base未指定,或base的tryFire返回null,則返回null。
            return null;
        base = null; // 解除關聯,再isLive判斷爲死亡。
        //返回的d就是base的tryFire返回的非空CompletableFuture
        return d;
    }
    final boolean isLive() {
        BiCompletion<?,?,?> c;
        //存活標準,base非空且base的dep非空。
        return (c = base) != null && c.dep != null;
    }
}

CoCompletion雖然是Completion的直接子類,但它依賴了BiCompletion,且BiCompletion是UniCompletion的直接子類,先來看UniCompletion.

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;//用來執行任務的執行器                
    CompletableFuture<V> dep; //要完成的依賴CompletableFuture
    CompletableFuture<T> src; //做爲行爲源的CompletableFuture
 
    UniCompletion(Executor executor, CompletableFuture<V> dep,
                  CompletableFuture<T> src) {
        this.executor = executor; this.dep = dep; this.src = src;
    }
 
     
    final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {//1
            //compareAndSetForkJoinTaskTag是ForkJoinTask的方法,利用cas,保證任何一種狀況下,該行爲只能執行一次。
            if (e == null)
                //不精確的說法是同步調用返回true,異步調用返回false,而後在線程池中執行。
                //有e表明要在執行器中執行,儘管大多數狀況下e都是線程池實例,會異步運行任務。但對於Executor來講,徹底能夠實如今同一個線程執行。
                return true;//2.
            //對於3這行代碼,道格大神註釋就寫了個disable,爲此我翻了大量代碼,發現根本過不了上面cas這一關,因此我的有兩個理解:
            //1.對於當前Completion而言,它的線程池只能用來作一次事,在claim以後當即置空,儘管此時尚未執行action,也不容許當前Completion使用它作別的事了。
            //2.減小了一個指向該線程池的引用,線程池也有被gc的時候吧。就算不gc,關閉虛擬機或者dump的時候也能少作點事。
            executor = null; // 3.
            e.execute(this);//使用該線程池異步執行,回憶上面Completion的聲明,它實現了runnable,在run方法中tryFire(ASYNC),參數ASYNC是正數。
        }
        return false;
    }
 
    final boolean isLive() { return dep != null; }
}

儘管UniCompletion自己代碼很少,可是有關代碼卻很繞,後面會從CompletableFuture調用開始說明一個完整的工做流,做者原本有幾回都已經十分艱難的「肯定發現問題」,寫出了「問題」,但最終仍是在描述過程當中啓動大腦自我否認,不得不佩服道格大神強大的邏輯和大腦。

很明顯,UniCompletion是一個能夠擁有執行器的Completion,它是兩個操做的結合,dep爲要最終執行的依賴操做,src爲來源CompletableFuture,tryFire沒有默認實現,它的子類分別根據不一樣狀況實現了該方法,實現的方式依舊是優雅的策略模式。

claim方法要在執行action前調用,若claim方法返回false,則不能調用action,原則上要保證action只執行一次。

claim的意思是聲稱,開個玩笑,在美劇行屍走肉第四季,有一夥武裝分子解決爲了解決內部分配問題的提出了一個辦法,對任何事物只看誰先喊一句」claimed「,表明」我要了「。調用claim方法和稍後運行action的動做發生在一個線程,所以須要該線程嘗試去claim這個action,claim成功則執行,claim不成功則不執行。

但在提供Executor的前提下,claim除了聲明之外,還會直接在方法內使用該executor執行tryFire,間接地執行action,並返回false,避免調用者也執行action,由於有cas的效果,屢次claim只有第一次可能返回true。

接下來看BiCompletion,它也是一個抽象類,不一樣在於它有兩個源,也就是它的成員dep要等到另外兩個成員CompletableFuture(src,snd)完成,具體的依賴關係要看子類實現。

abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
    CompletableFuture<U> snd; // 第二個源action
    BiCompletion(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src, CompletableFuture<U> snd) {
        super(executor, dep, src); this.snd = snd;
    }
}

BiCompletion有多個實現類,看名稱能夠看到Apply,Accept,Run等字眼,前面已經討論過相應的語義。

clipboard.png

以OrApply爲例

static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
    Function<? super T,? extends V> fn;
    OrApply(Executor executor, CompletableFuture<V> dep,
            CompletableFuture<T> src,
            CompletableFuture<U> snd,
            Function<? super T,? extends V> fn) {
        //構造函數,多傳一個函數,該函數就是dep對應的action。
        super(executor, dep, src, snd); this.fn = fn;
    }
    //tryFire父類沒有實現
    final CompletableFuture<V> tryFire(int mode) {
        CompletableFuture<V> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null ||//沒有dep,則沒有相應的依賴行爲,已經執行過的dep會是null。
            //執行orApply返回false,則返回null。最後一個參數僅當mode是ASYNC(只有它大於1)時會是this
            !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
            //到此多是運行過,或者不滿執行fn的條件,返回null。
            return null;
        //前面dep不是null,執行orApply也成功了,則解除引用關聯,下次運行會直接返回null,也不影響gc。
        //回憶前面看過的核心函數postComplete,會對CompletableFuture中棧上的全部Completion進行tryFire,
        //返回非null則進行相似遞歸的操做,很明顯,在調用postComplete
        //方法時,dep爲null會返回一個null,避免了再次tryFire。
        dep = null; src = null; snd = null; fn = null;
        //正常運行結束,調用dep的postFire並返回。
        return d.postFire(a, b, mode);
    }
}

orApply方法定義在CompletionFuture。前面沒有敘述。它不是做者稱爲的」核心函數「(即各類Completion都能使用到)。

final <R,S extends R> boolean orApply(CompletableFuture<R> a,
                                      CompletableFuture<S> b,
                                      Function<? super R, ? extends T> f,
                                      OrApply<R,S,T> c) {
    Object r; Throwable x;
    if (a == null || b == null ||
        //爲r賦值用於後續的計算,由於是or,r優先取第一個,第一個源action未完成的狀況下再取第二個。
        ((r = a.result) == null && (r = b.result) == null) || f == null)
        //首先檢測兩個源action,若a和b均未完成,則說明依賴dep不可被執行,返回false。
        return false;
    //僅噹噹前(dep)未完成(result爲null)時,可進行完成工做。
    tryComplete: if (result == null) {
        try {
            //前面說過,c不爲null說明是異步執行,須要先去嘗試claim這個action。
            if (c != null && !c.claim())
                //異步且claim不成功,返回false。
                return false;
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    //若是r表示異常,調用completeThrowable核心函數並結束代碼塊,直接返回true。
                    completeThrowable(x, r);
                    break tryComplete;
                }
                //第一個非空的action(a或b)結果表明異常,但ex是null,則將r置爲null並返回true。
                r = null;
            }
            //r不表明異常結果,直接強轉,用該結果做爲action的參數,執行action,用結果做爲當前的result。出現異常則進入catch塊。
            @SuppressWarnings("unchecked") R rr = (R) r;
            completeValue(f.apply(rr));
        } catch (Throwable ex) {
            //上述代碼出現異常,調用completeThrowable完成dep(this)
            completeThrowable(ex);
        }
    }
    return true;
}

正常運行結束還會調用dep的postFire,它也位於CompletableFuture中,但它只供 BiCompletion在tryFire成功以後纔可以使用,該方法源碼以下:

final CompletableFuture<T> postFire(CompletableFuture<?> a,
                                    CompletableFuture<?> b, int mode) {
    //對於ab兩個源,先處理b,後處理a
    if (b != null && b.stack != null) {
        //b存在且b的棧還有元素
        if (mode < 0 || b.result == null)
            //當爲NESTED(只有它的值是-1)時,或者b沒有結果時,對b進行清棧。調用postFire意味着d執行tryFire成功,
            //即d得到告終果,而這前提是ab之一已執行成功(orApply的含義),因此ab多是其一完成。
            b.cleanStack();
        else
            //非NESTED,則對b進行postComplete,該方法內部又會對b的棧上的每個Completion執行tryFire,並且用NESTED模式。
            b.postComplete();
    }
    //接下來對a直接進行postFire,並沿用mode。
    return postFire(a, mode);
}

對a進行postComplete的方法以下:

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    if (a != null && a.stack != null) {
        //棧非空
        if (mode < 0 || a.result == null)
        //相似上面的邏輯,是NESTED模式時或者a未完成時,對a進行清棧,不然對a執行postComplete.
            a.cleanStack();
        else
            a.postComplete();
    }
    //處理a以後,處理當前(即dep)
    if (result != null && stack != null) {
        //有結果且棧非空
        if (mode < 0)
            //NESTED模式,直接返回this。
            return this;
        else
            //非NESTED模式,執行postComplete,其中會對d的棧中全部Completion進行tryFire(NESTED),
            //並在每個tryFire返回的CompletableFuture逆棧執行同同樣操做,參見上面的源碼。
            postComplete();
    }
    return null;
}

以上是所有與OrApply的實現有關的源碼,下面來看一看OrApply的應用,再簡單梳理一下流程。

在CompletableFuture中有三個有關的方法:

clipboard.png

能夠看到三個方法的簽名和調用信息,這三個方法均是實現自CompletionStage。關於方法的字意和大體邏輯的推測方法前面已分析。

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    //直接調用orApplyStage,不指定線程池。
    return orApplyStage(null, other, fn);
}
 
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    //調用orApplyStage方法,外部不提供線程池,使用asyncPool,關於asyncPool前面已分析。
    return orApplyStage(asyncPool, other, fn);
}
 
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor) {
    //調用orApplyStage方法,但對外面傳入的線程池進行屏蔽,條件符合則使用,不符合則更換,屏蔽原則前面已分析。
    return orApplyStage(screenExecutor(executor), other, fn);
}

可見三個方法均使用了orApplyStage方法,只是在參數上有所不一樣。再來看orApplyStage方法。

private <U extends T,V> CompletableFuture<V> orApplyStage(
    Executor e, CompletionStage<U> o,
    Function<? super T, ? extends V> f) {
    CompletableFuture<U> b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        //要執行的函數未提供,或者參數o轉換的CompletableFuture也是null,則拋出空指針。
        throw new NullPointerException();
    //新建了一個dep,後面將它返回,故直接調用實現自CompletionStage的方法不用考慮返回空的問題,能夠鏈式調用。
    CompletableFuture<V> d = new CompletableFuture<V>();
    //若是指定了線程池,直接進入if。未指定線程池,首先嚐試調用orApply方法,並以this和b做參數。
    //前面分析過,若條件知足,即this和b有一個是完成態,則會當即執行f,結果或異常做爲d的結果。
    //d.orApply的最後一個參數是null(c),說明是同步操做,不會進行c.claim操做。
    if (e != null || !d.orApply(this, b, f, null)) {
        //指定了線程池,或者嘗試d.orApply條件不知足,轉爲異步。
        //構建OrApply對象壓入Completion棧。
        OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
        orpush(b, c);
        //壓棧後再次嘗試同步調用一次tryFire,前面分析過,tryFire成功會最終調用相應的cleanStack,postComplete等操做,
        //將死亡的Completion(各子類有不一樣的斷定,CoCompletion斷定base是null,有些判斷dep是null,而完成通常會把dep置null)
        //從棧上移除。
        c.tryFire(SYNC);
    }
    return d;
}
 
 
public CompletableFuture<T> toCompletableFuture() {
    //直接返回this
    return this;
}
final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
    if (c != null) {
        //循環條件,b不存在或未完成且同時當前CompletableFuture未完成。有任何一個完成則終止,若無完成,則執行下面的代碼將任務入this和b的棧。
        while ((b == null || b.result == null) && result == null) {
            //將c壓入當前CompletableFuture棧並退出循環。
            if (tryPushStack(c)) {
                if (b != null && b != this && b.result == null) {
                    //存在b,b不是當前,b未完成時。嘗試將c封裝成CoCompletion並壓入b的棧,前面說過
                    //這個壓入b棧的q徹底依賴於c,並使用c的運行結果。
                    Completion q = new CoCompletion(c);
                    //內循環,參數外循環說明。
                    while (result == null && b.result == null &&
                           !b.tryPushStack(q))
                        lazySetNext(q, null); // clear on failure
                }
                break;
            }
            //到此說明c壓入當前棧失敗,則將c的next恢復爲null。
            lazySetNext(c, null); // clear on failure
        }
    }
}

簡單梳理OrApply這一條線的流程,其餘線邏輯相似。

當使用Completable的applyToEitherAsync/applyToEither時,將進入這一條線的代碼執行,CompletableFuture在初步驗參後,會封裝一個d用於表示結果的CompletableFuture,稍後將會用它做爲返回值。隨後根據入參不一樣而進入不停的邏輯。

同步的狀況,即未提供Executor,首先就嘗試調用它的d.uniApply方法,若此時當前CompletableFuture或參數中的另外一個stage已完成,則用完成的結果直接執行用戶指定的action並對d的結果進行賦值,並進一步完成d的後續清棧和postComplete(1);若此時當前的Completable或另外一個stage未完成,則不知足執行action的條件,將當前Completable做爲第一個source,另外一個stage做爲第二個source,封裝成一個OrApply並壓當前CompletableFuture和另外一個stage的棧(2),隨後當即以同步方式調用它的tryFire(1)。

異步的狀況,直接封裝OrApply對象,將由線程池間接調用tryFire(3),進一步調用orApply方法,由於是異步,即便知足了前面的條件(ab之一正常或異常完成),依舊須要進行claim,claim失敗則不會執行action。claim成功,執行action出現異常,則用異常來完成這個action。

以上三種狀況最終都會執行action,標註了(1)和(3)是很明確的兩種狀況。

任何一個CompletableFuture完成後,都會根據mode進行後續處理,其實儘管每一個Completion都具有一個next指針,但每個Completion的完成均不依賴於棧中的其餘Completion,僅在cleanStack,壓棧,postComplete使用了該棧的結構。如今來回答前面分析時發現的兩個問題。

1.當前CompletableFuture在完成後,執行postComplete,會將它自身的棧中completion出棧並執行action,若要產生新的CompletableFuture,則將它的棧反向壓入自身的棧,而後重複執行出棧-執行的操做。反向壓棧有問題嗎?答案是沒有。由於棧中的每個Completion在執行上互不影響,它們的順序隻影響到cleanStack和postComplete的處理順序。CompletableFuture和它的棧元素產生的CompletableFuture彼此間有順序要求,但對同一個CompletableFuture的棧內的Completion元素彼此間沒有順序要求,決定他們順序的是對源CompletionFuture調用orApply,thenApply等等方法的順序,後續運行也徹底獨立。只不過在源CompletableFuture進行postComplete時,執行的順序將會與本來的」先來後到「相反。

2.cleanStack到一半,p指向的Completion依舊存活,位於p以上的Completion已執行完畢,那麼不會從新開始循環,p以前的死亡Completion會留在棧中。這也是爲何前面使用OrApply來解釋這個問題的緣由,由於極可能就不存在這個問題。根據前面的源碼,僅有postComplete觸發的tryFire會使用NESTED(-1)模式,只有NESTED模式下,或者源CompletableFuture的result爲null(未完成)的狀況下執行postFire纔會進入到cleanStack,不然會進入postComplete,後者會將全部元素出棧並執行存活元素,顯然不存在要考慮存活的問題。而只有or且爲BiCompletion的狀況下,纔可能出現兩個源之一實際並未完成,這樣在非NESTED模式下調用cleanStack方法。

可見2的問題是存在的。但它對於總體的運行結果是無影響的,後續該source執行完畢,調用自身的postComplete時,將已死亡的Completion出棧並tryFire,會發現諸如」dep=null"等狀況,直接返回null,則postComplete方法中的f會保持指向this並繼續迭代下一個棧元素。

目前關於2中提到的cleanStack的調用只出如今UniCompletion成功後調用postFire時依賴模式和result運行。其實還有一種狀況,就是前面提了一次的,屬於future接口的get方法,以及相似的join方法。

前面提到,get和join方法都會在獲取不到結果是按條件輪循watingGet方法,下面來看waitingGet方法。

private Object waitingGet(boolean interruptible) {
    Signaller q = null;//信號器
    boolean queued = false;//是否入隊
    int spins = -1;//自旋次數
    Object r;//結果引用
    //循環條件是隻等待result,內部有根據擾動決定的break
    while ((r = result) == null) {
        //自旋次數只有第一次進來是負值,後續只能是0或其餘正數。
        if (spins < 0)
        //自旋次數,多處理器下初始化爲16,不然爲0,即不自旋。設置值後這次循環結束。
            spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                1 << 8 : 0;
        //第二次循環時纔會判斷自旋次數。只要spins大於0就繼續循環,直到達到0爲止再執行下面的else代碼。
        else if (spins > 0) {
            //僅當下一個種子數不小於0時,減少一次自旋次數。nextSecondarySeed是Thread類中使用@Contended註解標識的變量,
            //這與傳說中的僞共享有關。
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        }
        //中止自旋後的第一輪循環,result依舊是null,則對q進行初始化,關於Signaller後續再講。
        else if (q == null)
            q = new Signaller(interruptible, 0L, 0L);
        //初始化q後的下一輪循環(中止自旋後的第二輪),queued是false,將上一輪循環初始化的q壓入棧。
        else if (!queued)
            queued = tryPushStack(q);
        //中止自旋後的若干次循環(上一步可能壓棧失敗,則下一輪自旋會再次壓棧,直到成功)後,判斷是否可擾動。
        else if (interruptible && q.interruptControl < 0) {
            //擾動信號匹配,將q的有關字段所有置空,順帶清一下棧,返回null。
            q.thread = null;
            //這個清棧的過程,細看上面的解釋還有有關的源碼,可能會發出一個疑問,cleanStack只能清除isLive判斷false的Completion,
            //但目前的實現,基本上都只能在dep爲null,base爲null等僅當dep執行完成的狀況發生,而dep完成的狀況是當前CompletableFuture的
            //result不是null,而方法運行到此,很明顯result必然是null,那麼還有必要清棧嗎?
            //答案是必要的,首先未來也許能出現存活或死亡狀態與source的result無關的Completion,那麼此處清一下棧也是幫助後面的工做。
            //其次,剛纔壓入棧的q在thread指向null時即已死亡,它也必需要進行清除。
            cleanStack();
            return null;
        }
        else if (q.thread != null && result == null) {
            //q關聯的線程存在,即q存活,且依舊沒有執行完畢,使用ForkJoinPool的阻塞管理機制,q的策略進行阻塞。
            try {
                ForkJoinPool.managedBlock(q);
            } catch (InterruptedException ie) {
                //阻塞是能夠擾動的,此時會將q的擾動控制信號設置爲-1,則下一次循環時將可能進入上一個else if。
                q.interruptControl = -1;
            }
        }
    }
    //前面的循環沒有break,能執行到此,只有result得到非null值的狀況。
    if (q != null) {
        //若q不是null,說明沒有在自旋階段獲取到result,須要對它進行禁用。
        q.thread = null;
        if (q.interruptControl < 0) {
            if (interruptible)
                //可擾動且有擾動信號,則說明擾動後未能進入上面帶有cleanStack的那個else if,
                //多是剛好在此次循環開始時獲取到了非空result,從而退出循環,也多是參數interruptible爲假,
                //在外部擾動了當前線程後,依舊等到了result。
                //只要發生了擾動,就將結果置null,外面調用者若是是join,能夠報出擾動。
                r = null; // report interruption
            else
                //若是不可擾動,則中斷當前線程(建立q的線程)。
                Thread.currentThread().interrupt();
        }
    }
    //當前future已經有結果,進行postComplete邏輯並返回r。
    postComplete();
    return r;
}

根據該方法的註釋,waitingGet方法只會有兩個結果,null(可擾動而且擾動了)和原始的result。而get方法可擾動,也便可返回null,join方法不可擾動,只能等待結束或拋出異常。

waitingGet方法中出現了第三個也是最後一個Completion的直接子類Signaller,前面沒有對它進行介紹,不過它也只使用在此處,所以能夠一併介紹。

static final class Signaller extends Completion
    implements ForkJoinPool.ManagedBlocker {
    long nanos;                    // 計時的狀況下,要等待的時間。
    final long deadline;           // 計時的狀況下指定不爲0的值
    volatile int interruptControl; // 大於0表明可擾動,小於0表明已擾動。
    volatile Thread thread;//持有的線程
 
    Signaller(boolean interruptible, long nanos, long deadline) {
        this.thread = Thread.currentThread();
        this.interruptControl = interruptible ? 1 : 0;//不可擾動,賦0
        this.nanos = nanos;
        this.deadline = deadline;
    }
    final CompletableFuture<?> tryFire(int ignore) {//ignore無用
        Thread w; //Signaller自持有建立者線程,tryFire只是單純喚醒建立它的線程。
        if ((w = thread) != null) {
            thread = null;//釋放引用
            LockSupport.unpark(w);//解除停頓。
        }
        //返回null,當action已執行並進行postComlete調用時,f依舊指向當前CompletableFuture引用並解除停頓。
        return null;
    }
    public boolean isReleasable() {
        //線程是空,容許釋放。這多是某一次調用本方法或tryFire方法形成。
        if (thread == null)
            return true;
        if (Thread.interrupted()) {
            //若是調用isReleasable方法的線程被擾動了,則置擾動信號爲-1
            int i = interruptControl;
            interruptControl = -1;
            if (i > 0)
            //原擾動信號是」可擾動「,則是本次調用置爲」已擾動「,返回true。
                return true;
        }
        //未定時(deadline是0)的狀況只能在上面釋放,定時的狀況,本次計算nanos(deadline-System.nanoTime())
        //或上次計算的nanos不大於0時,說明能夠釋放。
        if (deadline != 0L &&
            (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
            //只要可釋放,將建立者線程的引用釋放。下次調用直接返回true,線程運行結束銷燬後可被gc回收。
            thread = null;
            return true;
        }
        //仍持有建立者線程,調用此方法的線程未擾動或當前擾動不是第一次,未定時或不知足定時設置的一概返回false。
        return false;
    }
    public boolean block() {
        //block方法
        if (isReleasable())
            //判斷可釋放,直接return true。
            return true;
        //判斷deadline是0,說明不計時,默認park。
        else if (deadline == 0L)
            LockSupport.park(this);
        else if (nanos > 0L)
            //計時狀況,park指定nanos。
            LockSupport.parkNanos(this, nanos);
        //睡醒後再次返回isReleasable的結果。
        return isReleasable();
    }
    //建立者線程引用被釋放即表明死亡。
    final boolean isLive() { return thread != null; }
}

Signaller是一個Completion的直接子類,同時實現了ForkJoinPool的內部接口ManagedBlocker,這使得它能夠在當ForkJoinPool出現大量線程阻塞堆積時避免飢餓。
Signaller的做用是持有和釋放一個線程,並提供相應的阻塞策略。
前面提到的waitingGet方法建立了一個Signaller(interruptible, 0L, 0L),相似的,能夠看到timedGet方法使用Signaller(true, nanos, d == 0L ? 1L : d)來進行阻塞的管理,管理的方法依賴ForkJoinPool內部的

ForkJoinPool.managedBlock(q)來實現,而這用到了被Signaller實現的ForkJoinPool.ManagedBlocker,managedBlock方法源碼以下。

//ForkJoinPool的managedBlock方法。
public static void managedBlock(ManagedBlocker blocker)
    throws InterruptedException {
    ForkJoinPool p;
    ForkJoinWorkerThread wt;
    Thread t = Thread.currentThread();//調用此方法的線程,即前面的Signaller的建立者線程。
    if ((t instanceof ForkJoinWorkerThread) &&
        (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
        //調用managedBlock方法的線程是ForkJoinWorkerThread,則它可運行在ForkJoinPool中。此處要求內部持有pool的引用。
        WorkQueue w = wt.workQueue;
        //循環,只要判斷blocker(即Signaller)不可釋放。
        while (!blocker.isReleasable()) {
            //嘗試用ForkJoinPool對當前線程的工做隊列進行補償。
            //tryCompensate方法會嘗試減小活躍數並可能建立或釋放一個準備阻塞的worker線程,
            //它會在發生競態,髒數據,鬆弛或池終止時返回false。
            //關於ForkJoinPool的詳情單獨準備文章。
            if (p.tryCompensate(w)) {
                 
                try {
                    //補償成功,不停地對線程池嘗試先isReleasable再block,任何一個方法返回true則終止循環。
                    do {} while (!blocker.isReleasable() &&
                                 !blocker.block());
                } finally {
                    //出現任何異常,或循環終止時,控制信號加上一個活躍數單元,由於前面經過補償纔會進入循環,已減小了一個單元。
                    U.getAndAddLong(p, CTL, AC_UNIT);
                }
                break;
            }
        }
    }
    else {
        //當前線程不是ForkJoinWorkerThread或不持有ForkJoinPool的引用。連續先嚐試isReleasable再嘗試block,直到有一者返回true爲止。
        do {} while (!blocker.isReleasable() &&
                     !blocker.block());
    }
}

關於ForkJoinPool本文不作額外介紹,只列舉這一個方法,到此爲止,對於CompletableFuture的主要接口(繼承自CompletionStage)和實現已經描述完畢(其實只過了一個特殊案例的接口,可是前面提到過,其餘接口的邏輯和實現方式相似,無非就是run,active,apply的更換,或either,both,then,when等,有上面的基礎,再憑藉規則推測語義,源碼並不難理解。

CompletableFuture還有一些獨立聲明的公有方法,源碼也有些很是值得借鑑的地方,如allOf,anyOf兩個方法。

//anyOf方法,返回一個CompletableFuture對象,任何一個cfs列表中的成員進入完成態(正常完成或異常),則它也一併完成,結果一致。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    //直接調用orTree
    return orTree(cfs, 0, cfs.length - 1);
}
//allOf方法,當全部cfs列表中的成員進入完成態後完成(使用空結果),或有任何一個列表成員異常完成時完成(使用同一個異常)。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    //直接調用andTree
    return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                       int lo, int hi) {
    //聲明一個後續返回的dep
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (lo > hi) //驗參
        d.result = NIL;
    else {
        CompletableFuture<?> a, b;
        //折半驗證參數並歸併。每相鄰的兩個成員會在一個遞歸中生成另外一個'd',
        //總量奇數的最後一個單獨表示這個d。
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  andTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        //調用d.biRelay的中繼方法嘗試完成。
        if (!d.biRelay(a, b)) {
            //不知足完成條件,生成一箇中繼並壓棧,再次嘗試同步完成。若不知足條件,ab任何一個完成後都會再間接調用它的tryFire。
            BiRelay<?,?> c = new BiRelay<>(d, a, b);
            a.bipush(b, c);//除非ab均完成,不然bipush要進ab二者的棧。
            c.tryFire(SYNC);
        }
    }
    return d;
}
//biRelay方法,有前面的基礎,很簡單,只要ab之一任何一個未完成則返回false,都完成且dep未完成則進入相應的正常異常完成策略,
//不論dep是否已完成,只要ab均已完成,則返回true
boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
    Object r, s; Throwable x;
    if (a == null || (r = a.result) == null ||
        b == null || (s = b.result) == null)
        return false;
    //biRelay是嘗試根據兩個CompletableFuture完成dep,由於三個complete*方法均已作到原子性,也沒有action要執行,所以它不須要claim。
    if (result == null) {
        if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
            completeThrowable(x, r);
        else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
            completeThrowable(x, s);
        else
            //正常狀況,用null完成。
            completeNull();
    }
    return true;
}
 
 
//壓入棧的BiRelay
static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
    BiRelay(CompletableFuture<Void> dep,
            CompletableFuture<T> src,
            CompletableFuture<U> snd) {
        super(null, dep, src, snd);
    }
    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null || !d.biRelay(a = src, b = snd))
            //已經完成過,或者未完成,本次也不能完成,返回一個null
            return null;
        //BiRelay經過BiCompletion間接繼承了UniCompletion,所以dep取null表明死亡。
        //這樣也能規避錯誤的tryFire,如當它已被完成,持有的dep引用置null,當d進行postFire的postComplete時會保持f=this並持續出棧
        //dep未完成時清棧也能有效移除已完成的任務。
        src = null; snd = null; dep = null;
        return d.postFire(a, b, mode);
    }
}
//orTree相似上面的andTree,有一個完成或異常,就用它的結果或異常做爲返回的CompletableFuture的結果或異常。
static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
                                        int lo, int hi) {
    CompletableFuture<Object> d = new CompletableFuture<Object>();
    if (lo <= hi) {
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        //同上
        if ((a = (lo == mid ? cfs[lo] :
                  orTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  orTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        //同上,下面簡述orRelay和OrRelay
        if (!d.orRelay(a, b)) {
            OrRelay<?,?> c = new OrRelay<>(d, a, b);
            //除非ab任何一個已完成,不然orpush要進棧,且只進一個棧。
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}
//很明顯,orRelay就是兩個CompletableFuture的或關係中繼者。
final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
    Object r;
    if (a == null || b == null ||
        ((r = a.result) == null && (r = b.result) == null))
        return false;
    //只要ab有一個有結果,當即進行完成工做。
    if (result == null)
        //前面已介紹過completeRelay函數,r能夠是異常。
        completeRelay(r);
    //只要ab有一個完成或異常,即返回true
    return true;
}
static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
    OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src,
            CompletableFuture<U> snd) {
        super(null, dep, src, snd);
    }
    final CompletableFuture<Object> tryFire(int mode) {
        CompletableFuture<Object> d;
        CompletableFuture<T> a;
        CompletableFuture<U> b;
        if ((d = dep) == null || !d.orRelay(a = src, b = snd))
            return null;
        //置空的理由同上。
        src = null; snd = null; dep = null;
        return d.postFire(a, b, mode);
    }
}

後語

到此爲止,關於CompletableFuture在JDK8的源碼解析完成。

每一次讀道格大神的代碼時都會很痛苦,可是當痛苦變成豁然開朗時,接下來的是舒爽。

毫無疑問,道格大神的實現一如既往的優雅和巧妙,甚至不能用技術來形容,而應該用」藝術「來形容。

前面解釋過選擇CompletableFuture,由於在現有的若干框架中發現了與它的父接口CompletionStage有關,而CompletableFuture是官方給出的實現。同時,它也是響應式編程裏的典型,在jdk出品的HttpClient中也有與之結合的部分。我相信前面的簡單例子加上源碼分析,再加上對api的稍加理解,很快就能夠上手響應式編程。

簡單回顧:

1.CompletableFuture實現了CompletionStage接口的一系列由either,apply,run,then等關鍵字組成的方法,能夠鏈式調用。

2.CompletableFuture能夠不去鏈式調用,每一個CompletableFuture都可以單獨調用多個1中提到的方法,結果是當即嘗試執行這些方法,若執行條件不知足,也會自身維護一個棧的結構,棧中的各方法用單向鏈表的形式鏈接,在清棧操做或完成後置處理時按這個順序或局部逆序fire,但任什麼時候候執行時彼此之間不影響。

3.調用1中的方法並返回新的CompletableFuture的CompletableFuture在源碼的註釋中稱爲source,即源,生成的即d或dep,做者花了好久去理解這兩個概念。源的完成通常爲dep的基本條件,dep能夠依賴一源或多源的完成,這裏的完成包含正常result或異常result。

4.CompletableFuture對null結果和異常結果進行了編碼,詳情見上面的分析。

5.CompletableFuture基本無鎖,get或join結果除外,但該鎖阻塞的是查詢線程,而不是用來計算和完成任務的線程,這些線程在CompletableFuture的代碼中無鎖,固然,能夠在相應的棧元素中使用鎖。

6.CompletableFuture中的棧以Completion爲實現,它是ForkJoinTask,能夠在ForkJoinPool中運行,它有各類派生類。各派生類能夠實現任務的」生死「判斷,」fire操做「,以及對有執行器(線程池或同步的執行器)時的邏輯處理。

7.經過CompletableFuture指定的action最多能有三種調用時機,但鑑於這太實現具體,未來隨時可能變成四種五種,沒有太大的參考價值。首先在action被聲明時,若沒有指定線程池,會進行一次調用嘗試(1);若提供了線程池或(1)的嘗試失敗(通常是source未完成),則將生成的dep,源,action封裝成一個Completion實例併入棧,入棧成功後進行第二次嘗試,在此次嘗試中,未提供執行器的會同步嘗試一次fire(2);提供了執行器(線程池是一種異步的執行器)的,須要先claim,並只能claim一次,在claim方法中異步地執行,且這種狀況下claim必定會失敗,但會將執行器引用釋放,下次claim由於cas的緣由也必定會失敗,這樣保證了僅一次執行(3)。

一些api能夠有一個線程池的成員變量,它能夠在一開始在入口用線程池異步執行(靜態方法爲主,如 public static CompletableFuture<Void> runAsync(Runnable runnable) 這種入口方法,它負責第一個CompletableFuture的生成。

相關文章
相關標籤/搜索