ForkJoin 學習使用筆記

ForkJoin 學習使用筆記

Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架html

背景

在平常的業務需求中,常常出現的批量查詢,批量寫入等接口的提供,通常來講,最簡單最low的方式就是寫一個for循環來一次執行,可是當業務方對接口的性能要求較高時,就比較尷尬了java

一般能夠想到的方式是採用併發操做,首先想到能夠實現的方式就是利用線程池來作數組

一般實現方式以下併發

// 1. 建立線程池

ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"),
      new ThreadPoolExecutor.CallerRunsPolicy());

// 2. 建立執行任務
List<Future<Object>> futureList = new ArrayList<>();
for(Object arg : list) {
        futureList.add(executorService.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
              // xxx
            }
        }));
}

// 3. 結果獲取
for(Future f: futureList) {
    Object obj = f.get();
}

用上面的這種方式並無什麼問題,咱們接下來考慮的是如何使用ForkJoin框架來實現相似的功能框架

ForkJoin 基本知識

Fork: 將大任務拆分紅若干個能夠併發執行的小任務異步

Join: 合併全部小任務的執行結果ide

forkjoin

任務分割

ForkJoinTask : 基本任務,使用forkjoin框架必須建立的對象,提供fork,join操做,經常使用的兩個子類性能

  • RecursiveAction : 無結果返回的任務
  • RecursiveTask : 有返回結果的任務

說明:學習

  1. fork : 讓task異步執行
  2. join : 讓task同步執行,能夠獲取返回值
  3. ForkJoinTask 在不顯示使用ForkJoinPool.execute/invoke/submit()方法進行執行的狀況下,也可使用本身的fork/invoke方法進行執行

結果合併

ForkJoinPool 執行 ForkJoinTask測試

  • 任務分割出的子任務會添加到當前工做線程所維護的雙端隊列中,進入隊列的頭部。
  • 當一個工做線程的隊列裏暫時沒有任務時,它會隨機從其餘工做線程的隊列的尾部獲取一個任務

三中提交方式:

  1. execute 異步,無返回結果
  2. submit 異步,有返回結果 (返回Future<T>
  3. invoke 同步,有返回結果 (會阻塞)

使用說明

結合兩個場景,給出使用姿式

1. 累加

實現從 start - end 的累加求和

首先是定義一個CountTask 來實現求和

首先是肯定任務分割的閥值,當 end-start 的差值大於閥值時,將任務一分爲二

public class CountTask extends RecursiveTask<Integer> {

    private int start;
    private int end;

    private static final int THRED_HOLD = 30;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRED_HOLD;
        if (canCompute) { // 不須要拆分
            for (int i = start; i <= end; i++) {
                sum += i;
            }

            System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end);
        } else {
            int mid = (end + start) / 2;
            CountTask left = new CountTask(start, mid);
            CountTask right = new CountTask(mid + 1, end);
            left.fork();
            right.fork();

            sum = left.join() + right.join();
        }
        return sum;
    }
}

調用case

@Test
public void testFork() throws ExecutionException, InterruptedException {
    int start = 0;
    int end = 200;

    CountTask task = new CountTask(start, end);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    Future<Integer> ans = pool.submit(task);
    int sum = ans.get();
    System.out.println(sum);
}

輸出結果:

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200
20100

2. 排序

int 數組進行排序

一樣先定義一個SortTask, 主要是爲了演示ForkJoin的使用姿式,具體的排序和合並的邏輯比較簡陋的實現了一下(這塊不是重點)

public class SortTask extends RecursiveTask<List<Integer>> {

    private List<Integer> list;

    private final static int THRESHOLD = 5;

    public SortTask(List<Integer> list) {
        this.list = list;
    }

    @Override
    protected List<Integer> compute() {
        if (list.size() < THRESHOLD) {
            Collections.sort(list);

            System.out.println("thread: " + Thread.currentThread() + " sort: " + list);
            return list;
        }


        int mid = list.size() >> 1;


        SortTask l = new SortTask(list.subList(0,  mid));
        SortTask r = new SortTask(list.subList(mid, list.size()));

        l.fork();
        r.fork();

        List<Integer> left = l.join();
        List<Integer> right = r.join();

        return merge(left, right);
    }

    private List<Integer> merge(List<Integer> left, List<Integer> right) {
        List<Integer> result = new ArrayList<>(left.size() + right.size());

        int rightIndex = 0;
        for (int i = 0; i < left.size(); i++) {
            if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) {
                result.add(left.get(i));
            } else {
                result.add(right.get(rightIndex++));
                i -= 1;
            }
        }

        if (rightIndex < right.size()) {
            result.addAll(right.subList(rightIndex, right.size()));
        }

        return result;
    }
}

測試case和上面基本同樣,咱們改用 invoke 替換上面的 submit

@Test
public void testMerge() throws ExecutionException, InterruptedException {
    List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23);
    SortTask sortTask = new SortTask(list);
    ForkJoinPool pool = ForkJoinPool.commonPool();
    List<Integer> ans = pool.invoke(sortTask);
    System.out.println(ans);
}

輸出結果

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123]
[6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]

參考

其餘

我的博客:一灰的我的博客

公衆號獲取更多:

我的信息

相關文章
相關標籤/搜索