Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架html
在平常的業務需求中,常常出現的批量查詢,批量寫入等接口的提供,通常來講,最簡單最low的方式就是寫一個for循環來一次執行,可是當業務方對接口的性能要求較高時,就比較尷尬了java
一般能夠想到的方式是採用併發操做,首先想到能夠實現的方式就是利用線程池來作數組
一般實現方式以下併發
// 1. 建立線程池 ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"), new ThreadPoolExecutor.CallerRunsPolicy()); // 2. 建立執行任務 List<Future<Object>> futureList = new ArrayList<>(); for(Object arg : list) { futureList.add(executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { // xxx } })); } // 3. 結果獲取 for(Future f: futureList) { Object obj = f.get(); }
用上面的這種方式並無什麼問題,咱們接下來考慮的是如何使用ForkJoin框架來實現相似的功能框架
Fork: 將大任務拆分紅若干個能夠併發執行的小任務異步
Join: 合併全部小任務的執行結果ide
ForkJoinTask
: 基本任務,使用forkjoin框架必須建立的對象,提供fork,join操做,經常使用的兩個子類性能
RecursiveAction
: 無結果返回的任務RecursiveTask
: 有返回結果的任務說明:學習
fork
: 讓task異步執行join
: 讓task同步執行,能夠獲取返回值ForkJoinPool
執行 ForkJoinTask
,測試
三中提交方式:
execute
異步,無返回結果submit
異步,有返回結果 (返回Future<T>
)invoke
同步,有返回結果 (會阻塞)結合兩個場景,給出使用姿式
實現從 start - end 的累加求和
首先是定義一個CountTask
來實現求和
首先是肯定任務分割的閥值,當 end-start
的差值大於閥值時,將任務一分爲二
public class CountTask extends RecursiveTask<Integer> { private int start; private int end; private static final int THRED_HOLD = 30; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRED_HOLD; if (canCompute) { // 不須要拆分 for (int i = start; i <= end; i++) { sum += i; } System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end); } else { int mid = (end + start) / 2; CountTask left = new CountTask(start, mid); CountTask right = new CountTask(mid + 1, end); left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; } }
調用case
@Test public void testFork() throws ExecutionException, InterruptedException { int start = 0; int end = 200; CountTask task = new CountTask(start, end); ForkJoinPool pool = ForkJoinPool.commonPool(); Future<Integer> ans = pool.submit(task); int sum = ans.get(); System.out.println(sum); }
輸出結果:
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75 thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125 thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25 thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150 thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100 thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175 thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50 thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200 20100
int 數組進行排序
一樣先定義一個SortTask, 主要是爲了演示ForkJoin的使用姿式,具體的排序和合並的邏輯比較簡陋的實現了一下(這塊不是重點)
public class SortTask extends RecursiveTask<List<Integer>> { private List<Integer> list; private final static int THRESHOLD = 5; public SortTask(List<Integer> list) { this.list = list; } @Override protected List<Integer> compute() { if (list.size() < THRESHOLD) { Collections.sort(list); System.out.println("thread: " + Thread.currentThread() + " sort: " + list); return list; } int mid = list.size() >> 1; SortTask l = new SortTask(list.subList(0, mid)); SortTask r = new SortTask(list.subList(mid, list.size())); l.fork(); r.fork(); List<Integer> left = l.join(); List<Integer> right = r.join(); return merge(left, right); } private List<Integer> merge(List<Integer> left, List<Integer> right) { List<Integer> result = new ArrayList<>(left.size() + right.size()); int rightIndex = 0; for (int i = 0; i < left.size(); i++) { if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) { result.add(left.get(i)); } else { result.add(right.get(rightIndex++)); i -= 1; } } if (rightIndex < right.size()) { result.addAll(right.subList(rightIndex, right.size())); } return result; } }
測試case和上面基本同樣,咱們改用 invoke 替換上面的 submit
@Test public void testMerge() throws ExecutionException, InterruptedException { List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23); SortTask sortTask = new SortTask(list); ForkJoinPool pool = ForkJoinPool.commonPool(); List<Integer> ans = pool.invoke(sortTask); System.out.println(ans); }
輸出結果
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512] thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200] thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412] thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455] thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123] [6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]
我的博客:一灰的我的博客
公衆號獲取更多: