原文:慕課網實戰·高併發探索(十三):併發容器J.U.C -- 組件FutureTask、ForkJoin、BlockingQueuejava
FutureTask是J.U.C中的類,是一個可刪除的異步計算類。這個類提供了Future接口的的基本實現,使用相關方法啓動和取消計算,查詢計算是否完成,並檢索計算結果。只有在計算完成時才能使用get方法檢索結果;若是計算還沒有完成,get方法將會阻塞。一旦計算完成,計算就不能從新啓動或取消(除非使用runAndReset方法調用計算)。算法
一般實現一個線程咱們會使用繼承Thread的方式或者實現Runnable接口,這兩種方式有一個共同的缺陷就是在執行完任務以後沒法獲取執行結果。從Java1.5以後就提供了Callable與Future,這兩個接口就能夠實現獲取任務執行結果。數組
Runnable接口:代碼很是簡單,只有一個方法run緩存
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
Callable泛型接口:有泛型參數,提供了一個call方法,執行後可返回傳入的泛型參數類型的結果。安全
public interface Callable<V> { V call() throws Exception; }
Future接口提供了一系列方法用於控制線程執行計算,以下:併發
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning);//取消任務 boolean isCancelled();//是否被取消 boolean isDone();//計算是否完成 V get() throws InterruptedException, ExecutionException;//獲取計算結果,在執行過程當中任務被阻塞 V get(long timeout, TimeUnit unit)//timeout等待時間、unit時間單位 throws InterruptedException, ExecutionException, TimeoutException; }
使用方法:框架
public class FutureExample { static class MyCallable implements Callable<String> { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } } public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(new MyCallable());//線程池提交任務 log.info("do something in main"); Thread.sleep(1000); String result = future.get();//獲取不到一直阻塞 log.info("result:{}", result); } }
運行結果:阻塞效果
異步
Future實現了RunnableFuture接口,而RunnableFuture接口繼承了Runnable與Future接口,因此它既能夠做爲Runnable被線程中執行,又能夠做爲callable得到返回值。ide
public class FutureTask<V> implements RunnableFuture<V> { ... } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
FutureTask支持兩種參數類型,Callable和Runnable,在使用Runnable 時,還能夠多指定一個返回結果類型。高併發
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
使用方法:
public class FutureTaskExample { public static void main(String[] args) throws Exception { FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } }); new Thread(futureTask).start(); log.info("do something in main"); Thread.sleep(1000); String result = futureTask.get(); log.info("result:{}", result); } }
運行結果:
ForkJoin是Java7提供的一個並行執行任務的框架,是把大任務分割成若干個小任務,待小任務完成後將結果彙總成大任務結果的框架。主要採用的是工做竊取算法,工做竊取算法是指某個線程從其餘隊列裏竊取任務來執行。
在竊取過程當中兩個線程會訪問同一個隊列,爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般咱們會使用雙端隊列來實現工做竊取算法。被竊取任務的線程永遠從隊列的頭部拿取任務,竊取任務的線程從隊列尾部拿取任務。
侷限性:
一、任務只能使用fork和join做爲同步機制,若是使用了其餘同步機制,當他們在同步操做時,工做線程就不能執行其餘任務了。好比在fork框架使任務進入了睡眠,那麼在睡眠期間內在執行這個任務的線程將不會執行其餘任務了。
二、咱們所拆分的任務不該該去執行IO操做,如讀和寫數據文件。
三、任務不能拋出檢查異常。必須經過必要的代碼來處理他們。
框架核心:
核心有兩個類:ForkJoinPool | ForkJoinTask
ForkJoinPool:負責來作實現,包括工做竊取算法、管理工做線程和提供關於任務的狀態以及他們的執行信息。
ForkJoinTask:提供在任務中執行fork和join的機制。
使用方式:(模擬加和運算)
@Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> { public static final int threshold = 2;//設定不大於兩個數相加就直接for循環,不適用框架 private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //若是任務足夠小就計算任務 boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 若是任務大於閾值,就分裂成兩個子任務計算(分裂算法,可依狀況調優) int middle = (start + end) / 2; ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待任務執行結束合併其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合併子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一個計算任務,計算1+2+3+4...100 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); //執行一個任務 Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}", result.get()); } catch (Exception e) { log.error("exception", e); } } }
主要應用場景:生產者消費者模型,是線程安全的
阻塞狀況:
一、當隊列滿了進行入隊操做
二、當隊列空了的時候進行出隊列操做
四套方法:
BlockingQueue提供了四套方法,分別來進行插入、移除、檢查。每套方法在不能馬上執行時都有不一樣的反應。
Throws Exceptions :若是不能當即執行就拋出異常。
Special Value:若是不能當即執行就返回一個特殊的值。
Blocks:若是不能當即執行就阻塞
Times Out:若是不能當即執行就阻塞一段時間,若是過了設定時間尚未被執行,則返回一個值
實現類:
ArrayBlockingQueue:它是一個有界的阻塞隊列,內部實現是數組,初始化時指定容量大小,一旦指定大小就不能再變。採用FIFO方式存儲元素。
DelayQueue:阻塞內部元素,內部元素必須實現Delayed接口,Delayed接口又繼承了Comparable接口,緣由在於DelayQueue內部元素須要排序,通常狀況按過時時間優先級排序。
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
DalayQueue內部採用PriorityQueue與ReentrantLock實現。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); ... }
LinkedBlockingQueue:大小配置可選,若是初始化時指定了大小,那麼它就是有邊界的。不指定就無邊界(最大整型值)。內部實現是鏈表,採用FIFO形式保存數據。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);//不指定大小,無邊界採用默認值,最大整型值 }
PriorityBlockingQueue:帶優先級的阻塞隊列。無邊界隊列,容許插入null。插入的對象必須實現Comparator接口,隊列優先級的排序規則就是按照咱們對Comparable接口的實現來指定的。咱們能夠從PriorityBlockingQueue中獲取一個迭代器,但這個迭代器並不保證能按照優先級的順序進行迭代。
public boolean add(E e) {//添加方法 return offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator;//必須實現Comparator接口 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
SynchronusQueue:只能插入一個元素,同步隊列,無界非緩存隊列,不存儲元素。