<!-- create time: 2016-03-14 09:49:12 -->java
本節主要結合排序的實例來演示多線程執行任務的流程,主要使用了線程池
ExecutorService
, 閉鎖Futrue
, 完成服務CompletionService
以及最多見的冒泡排序算法算法
ExecutorService 線程池數組
private static ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("int-sort"), new ThreadPoolExecutor.CallerRunsPolicy());多線程
上面是建立一個線程池的實例,其中幾個參數分別爲:(來自jdk) ```java /** * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` - Futrue > 就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時能夠經過get方法獲取執行結果,該方法會阻塞直到任務返回結果 ```java public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
CompletionService併發
若是向Executor提交了一組計算任務,而且但願在計算完成後得到結果,那麼能夠保留與每一個任務關聯的Future,而後反覆使用get方法,同時將參數timeout指定爲0,從而經過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionServiceless
import io.netty.util.concurrent.DefaultThreadFactory; import org.junit.Test; import java.util.concurrent.*; /** * Created by yihui on 16/3/11. */ public class RunnableTest { private static ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("sort-calculate"), new ThreadPoolExecutor.CallerRunsPolicy()); /** * 隨機生成一些數 * * @param size * @return */ private int[] genNums(final int size) { int[] num = new int[size]; for (int i = 0; i < size; i++) { num[i] = (int) (Math.random() * 1230); } return num; } // 冒泡排序 private int[] sort(int[] num, int size) { if (size <= 1) { return num; } int tmp; for (int i = 0; i < size; i++) { for (int j = i + 1; j < size; j++) { if (num[i] > num[j]) { tmp = num[i]; num[i] = num[j]; num[j] = tmp; } } } return num; } // 合併兩個排序數組 public int[] merge(int[] ans, int[] sub) { if (ans == null) { return sub; } int ansSize = ans.length; int subSize = sub.length; int[] result = new int[subSize + ansSize]; for (int i =0, ansIndex=0, subIndex=0; i < ansSize + subSize; i ++) { if (subIndex >= subSize) { result[i] = ans[ansIndex ++]; continue; } if (ansIndex >= ansSize) { result[i] = sub[subIndex ++]; continue; } if (ans[ansIndex] < sub[subIndex]) { result[i] = ans[ansIndex ++]; } else { result[i] = sub[subIndex ++]; } } return result; } public int[] calculate(int[] numbers, int size) { CompletionService<int[]> completionService = new ExecutorCompletionService<int[]>(executorService); if (size <= 50) { return this.sort(numbers, size); } // 將數組分割,50個做爲一組,進行排序 int subNum = (size - 1) / 50 + 1; for (int i = 0; i < subNum; i++) { int len = 50; if (i == subNum - 1) { len = size - 50 * i; } final int[] subNumbers = new int[len]; System.arraycopy(numbers, i * 50 + 0, subNumbers, 0, len); final int finalLen = len; Callable<int[]> runnable = new Callable<int[]>() { @Override public int[] call() throws Exception { return sort(subNumbers, finalLen); } }; completionService.submit(runnable); } int[] ans = null; // 開始對提交的排序任務的結果進行合併 try{ for (int i = 0; i < subNum; i ++) { // get and remove the result Future<int[]> f = completionService.take(); int[] tmp = f.get(); ans = this.merge(ans, tmp); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ans; } // 輸出數組 private void print(int[] num, int size, boolean newLine) { for (int i = 0; i < size; i++) { System.out.print(num[i] + ","); } if (newLine) { System.out.println(); } } @Test public void tt() { int size = 250; int[] numbers = this.genNums(size); int[] numbers2 = new int[size]; System.arraycopy(numbers, 0, numbers2, 0, size); long start = System.nanoTime(); this.sort(numbers, size); long end = System.nanoTime(); this.print(numbers, size, true); System.out.println("Cost is : " + (end - start) / 1000); this.print(numbers2, size, true); start = System.nanoTime(); int[] ans = this.calculate(numbers2, size); end = System.nanoTime(); this.print(ans, size, true); System.out.println("cost is : " + (end - start) / 1000); } // 用於測試排序算法,以及合併算法的正確性 @Test public void test() { int size = 10; int[] numbers = this.genNums(size); int[] ans1 = this.sort(numbers, size); this.print(ans1, size, true); size += 5; int[] numbers2 = this.genNums(size); int[] ans2 = this.sort(numbers2, size); this.print(ans2, size, true); int[] ans = this.merge(ans1, ans2); this.print(ans, 25, true); } }
針對上面的實例,咱們重點須要關注的對象集中在 calculate
方法中dom
執行流程:ide
completionQueue
隊列中// 提交task completionService.submit(new Callable<int[]>() { @Override public int[] call() throws Exception { return sort(subNumbers, finalLen); } }); // 獲取結果 for (...) { // take 表示從阻塞隊列中獲取並移除Future === get以後remove掉 Futrue<int[]> futrue = completionService.take(); int[] ans = futrue.get(); }