Java ForkJoin 解析[精品短文]

本文主要想了解兩個地方:如何竊取任務、task如何等待(join) 代碼基於 OpenJDK 12git

竊取算法(work-stealing)

ForkJoin-Paper-DougLea中能夠看出:github

  • 每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應。
  • 隊列使用的是雙端隊列,支持LIFO、FIFO。
  • 子任務會被放到線程(不必定是當前線程)的隊列中。
  • 工做線程按照LIFO的順序處理本身隊列中數據。
  • 當一個工做線程處理完本身隊列中數據的時候,會隨機挑選一個工做線程,並「竊取」的該工做線程隊列隊尾的task。

到了這裏就能夠知道,竊取任務從其餘線程隊列的尾部竊取的了。算法

竊取算法優缺點

工做竊取算法的優勢:充分利用線程進行並行計算,減小了線程間的競爭。bash

工做竊取算法的缺點:在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且該算法會消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。微信

Task 等待(join)

Join方法的主要做用是阻塞當前線程並等待獲取結果。具體代碼以下:less

public final V join() {
    int s;
    if (((s = doJoin()) & ABNORMAL) != 0)
        reportException(s);
    return getRawResult();
}
複製代碼

首先,它調用了doJoin()方法,經過doJoin()方法獲得當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)。ui

  • 若是任務狀態是已完成,則直接返回任務結果。
  • 若是任務狀態是被取消,則直接拋出CancellationException。
  • 若是任務狀態是拋出異常,則直接拋出對應的異常。

讓咱們再來分析一下doJoin()方法的實現代碼:this

/**
 * Implementation for join, get, quietlyJoin. Directly handles
 * only cases of already-completed, external wait, and
 * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
 *
 * @return status upon completion
 */
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return 
        //已完成,返回status
    	(s = status) < 0 ? s :
    	//未完成,若是當前線程是ForkJoinWorkerThread,從該線程中取出workQueue,並嘗試將
        //當前task出隊而後執行,執行的結果是完成則返回狀態,不然使用當線程池所在的ForkJoinPool的awaitJoin方法等待
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) :
        //當前線程不是ForkJoinWorkerThread,調用externalAwaitDone方法
        //externalAwaitDone: Blocks a non-worker-thread until completion.
        externalAwaitDone();
}
/**
 * Pops the given task only if it is at the current top.
 */
final boolean tryUnpush(ForkJoinTask<?> task) {
    boolean popped = false;
    int s, cap; ForkJoinTask<?>[] a;
    if ((a = array) != null && (cap = a.length) > 0 &&
        (s = top) != base &&
        (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
        TOP.setOpaque(this, s);
    return popped;
}
/**
 * Primary execution method for stolen tasks. Unless done, calls
 * exec and records status if completed, but doesn't wait for * completion otherwise. * * @return status on exit from this method */ final int doExec() { int s; boolean completed; // 僅未完成的任務會運行,其餘狀況會忽略. if ((s = status) >= 0) { try { //exec是abstract方法 //調用ForkJoinTask子類中exec completed = exec(); } catch (Throwable rex) { completed = false; s = setExceptionalCompletion(rex); } if (completed) s = setDone(); } return s; } 複製代碼

在doJoin()方法裏,首先經過查看任務的狀態,看任務是否已經執行完成,若是執行完成,則直接返回任務狀態;若是沒有執行完,則從任務隊列中取出任務並執行。若是任務順利執行完成,則設置任務狀態爲NORMAL,若是出現異常,則記錄異常,並將任務狀態設置爲EXCEPTIONAL。spa

我的微信公衆號:線程

我的github:

github.com/jiankunking

我的博客:

jiankunking.com

相關文章
相關標籤/搜索