JDK7引入了Fork/Join框架,所謂Fork/Join框架,我的解釋:Fork分解任務成獨立的子任務,用多線程去執行這些子任務,Join合併子任務的結果。這樣就能使用多線程的方式來執行一個任務。數組
JDK7引入的Fork/Join有三個核心類:多線程
ForkJoinPool,執行任務的線程池併發
ForkJoinWorkerThread,執行任務的工做線程框架
ForkJoinTask,一個用於ForkJoinPool的任務抽象類。this
由於ForkJoinTask比較複雜,抽象方法比較多,平常使用時通常不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類:spa
RecursiveTask:子任務帶返回結果時使用線程
RecursiveAction:子任務不帶返回結果時使用code
對於Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework對象
在看了網上的不少例子以後,發如今自定義任務類實現compute方法的邏輯通常是這樣的:blog
if 任務足夠小 直接返回結果 else 分割成N個子任務 依次調用每一個子任務的fork方法執行子任務 依次調用每一個子任務的join方法合併執行結果
而執行該自定義任務的調用的則是ForkJoinPool的execute方法,所以首先來看的就是ForkJoinPool的execute方法,看看和普通線程池執行任務有什麼不一樣:
public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); }
所以forkOrSubmit是真正執行ForkJoinTask的方法:
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else // 正常執行的時候是主線程調用的,所以關注addSubmission addSubmission(task); }
那麼咱們首先要關注的是addSubmission方法,發覺所作的事情和普通線程池很相似,就是把任務加入到隊列中,不一樣的是直接使用Unsafe操做內存來添加任務對象
private void addSubmission(ForkJoinTask<?> t) { final ReentrantLock lock = this.submissionLock; lock.lock(); try { // 隊列只是普通的數組而不是普通線程池的BlockingQueue, // 喚醒worker線程的工做由下面的signalWork來完成 // 使用Unsafe進行內存操做,把任務放置在數組中 ForkJoinTask<?>[] q; int s, m; if ((q = submissionQueue) != null) { long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; if (s - queueBase == m) // 數組已滿,爲數組擴容 growSubmissionQueue(); } } finally { lock.unlock(); } // 通知有新任務來了:兩種操做,有空閒線程則喚醒該線程 // 不然若是能夠新建worker線程則爲這個任務新建worker線程 // 若是不能夠就返回了,等到有空閒線程來執行這個任務 signalWork(); }
接下來要弄清楚就是在compute中fork時,按道理來講這個動做是和主任務在同一個線程中執行,fork是若是把子任務變成多線程執行的:
public final ForkJoinTask<V> fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; }
在上面分析forkOrSubmit的時候一樣見到了ForkJoinWorkerThread的pushTask方法調用,那麼來看這個方法:
final void pushTask(ForkJoinTask<?> t) { // 代碼的基本邏輯和ForkJoinPool的addSubmission方法基本一致 // 都是把任務加入了任務隊列中,這裏是加入到ForkJoinWorkerThread // 內置的任務隊列中 ForkJoinTask<?>[] q; int s, m; if ((q = queue) != null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; // or use putOrderedInt // 這裏不太明白 if ((s -= queueBase) <= 2) pool.signalWork(); else if (s == m) growQueue(); } }
看到這裏一會兒陷入了僵局,爲何ForkJoinWorkerThread要內建一個隊列呢,並且若是子任務仍舊在同一個線程內的話,何以實現併發執行子任務呢?下一篇文章繼續。