編寫併發程序 Inversion

作完了 scala parallel 課程做業後,以爲 scala 寫併發程序的便捷性是 java 永遠都追不上的。scala 的Future 和 Promise,java 裏 Future 和 CompleteFuture 實現了相似的功能,可是使用的便捷性還差的很遠,java.util.Future 自己 API 較少,不支持鏈式操做。CompleteFuture 豐富了 Future 的 API,可是也很差用。java

 

這裏用 Scala parallel 學到的東西計算 Inversion. Inversion 叫作逆序對,它的 nlogn 算法的思想是在 merge sort 的 Merge 階段計算逆序對的個數。先列出單線程解法。算法

 

public static long sort(List<Integer> numbers, int left, int right) {併發

  if(left >= right) return 0L;app

  if(left + 1 == right) return 0L;函數

  int mid = (right - left) / 2 + left;性能

 

  long leftInversion = sort(numbers, left, mid);線程

  long rightInversion = sort(numbers, mid, right);scala

  long mergeInversion = merge(numbers, left, mid, right);xml

  return (leftInversion + rightInversion + mergeInversion);排序

}

public static long merge(List<Integer> numbers, int left, int mid, int right) {

  List<Integer> buf = new ArrayList<>();

  int leftCursor = left, rightCursor = mid;

  long inversion = 0;

  

  while(leftCursor < mid && rightCursor < right) {

    if(numbers.get(leftCursor) <= numbers.get(rightCursor)) buf.add(numbers.get(leftCursor ++));

    else {

      buf.add(numbers.get(rightCursor ++));

      inversion += (mid - leftCursor);

    }

    while(leftCursor < mid) buf.add(numbers.get(leftCursor ++));

    while(rightCursor < right) buf.add(numbers.get(rightCursor ++));

    for(int i = 0; i < (right - left); i ++) numbers.set(i+left, buf.get(i));

    return inversion;

  }

}

 

作 benchmark 必定要注意同一段程序要 run 多遍,以最後一遍的運行時間爲準,由於預熱階段包括對內存的填充,線程的建立等等。

在個人 4 核 i7 mac 上跑了三輪,10萬數字的 inversion, 時間分別是 100ms, 70ms, 40ms.

 

而後是並行解法。並行解法使用了 ForkJoinPool,別的 threadPool 也是同樣的,可是性能上是否有區別就不知道了。

 

爲了不每次執行任務都要建立 ForkJoinTask, 先寫一個 wrapper.

 

public abstract class TaskScheduler {

  public abstract <T> ForkJoinTask<T> schedule(Function<Void, T> func);

}

 

public class DefaultTaskScheduler extends TaskScheduler {

  public <T> ForkJoinTask<T> schedule(Function<Void, T> func) {

    ForkJoinTask<T> task = new ForkJoinTask<T>() {

      protected T compute() { return func.apply(null); }

    };

    ForkJoinCom.pool.execute(task);

    return task;

  }

}

有了這個 Wrapper 之後,就能夠經過 schedule 函數直接把運算邏輯變成 ForkJoinTask。

merge 是順序執行的,寫不出它的並行實現,可是 sort 函數是分而治之算法,每次把 List 劃分爲不相交的兩段,能夠並行的對這兩段排序。

 

public static long parSort(List<Integer> nums, int left, int right, int threshold) {

  if(right - left <= threshold) return Inversion.sort(nums, left, right);

  int mid = (right - left) /2 + left;

 

  ForkJoinTask<Long> leftTask = ForkJoinCom.scheduler.schedule(Void -> parSort(nums, left, mid, threshold));

  ForkJoinTask<Long> rightTask = ForkJoinCom.scheduler.schedule(Void -> parSort(nums, mid, right, threshold));

  long leftInversions = leftTask.join();

  long rightInversions = rightTask.join();

 

  long mergeInversions = Inversion.merge(numbers, left, mid, right);

  return leftInversions + rightInversions + mergeInversions;

}

到這裏,並行解法就算寫完了,可是性能提高的並不明顯。嘗試調整 threshold, 調整 ForkJoinPool 的線程數目,效果依然不明顯。回憶 scala 做業題里老師給出的實現,忽然想到,當 leftTask, rightTask 正在執行的時候,當前線程只是傻等着,什麼都沒幹,這是對 CPU 資源的浪費。照着這個思路稍微修改了下 parSort 方法:

 

ForkJoinTask<Long> leftTask = ForkJonCom.scheduler.schedule(Void -> parSort(nums, left, mid, threshold));
//     ForkJoinTask<Long> rightTask = ForkJonCom.scheduler.schedule(Void -> parSort(nums, mid, right, threshold));
  
long rightInversion = parSort(nums, mid, right, threshold);
long leftInversion = leftTask.join(); 
 
通過修改,parSort 的第三輪成績是 16ms, 比單線程算法快了一倍多。
相關文章
相關標籤/搜索