文章目錄
![在這裏插入圖片描述](http://static.javashuo.com/static/loading.gif)
ForkJoin
1.Fork/Join流程:
ForkJoin是一種分治的思想,在1.7中引入JDK中。現實生活中的快排,隊排,MapReduce都是思想的 實現,意思是在必要的狀況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這裏就是指咱們制定的拆分的臨界值),再將一個個小任務的結果進行join彙總。
java
2. 工做竊取模式
從上述Fork/Join框架的描述能夠看出,咱們須要一些線程來執行Fork出的任務,在實際中,若是每次都建立新的線程執行任務,對系統資源的開銷會很大,因此Fork/Join框架利用了線程池來調度任務。web
另外,這裏能夠思考一個問題,既然由線程池調度,根據咱們以前學習普通/計劃線程池的經驗,必然存在兩個要素:算法
工做線程
任務隊列數據庫
通常的線程池只有一個任務隊列,可是對於Fork/Join框架來講,因爲Fork出的各個子任務實際上是平行關係,爲了提升效率,減小線程競爭,應該將這些平行的任務放到中去,如上不一樣的隊列圖中,大任務分解成三個子任務:子任務一、子任務2,那麼就建立兩個任務隊列,而後再建立3個工做線程與隊列一一對應。segmentfault
那麼爲何須要使用工做竊取算法呢?假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
知足這一需求的任務隊列其實就是JUC框架中介紹過的雙端阻塞隊列 LinkedBlockingDeque
。數組
工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。而且在進行RR跟上下文切換也會耗時的,因此不必定是多線程就必定 比單線程速度快。彈性而定,看任務量。多線程
3. demo演示
ForkJoin
有兩種繼承方式,RecursiveTask
有返回值,RecursiveAction
無返回值
任務需求:假設有個很是大的long[]數組,經過FJ框架求解數組全部元素的和。
任務類定義,由於須要返回結果,因此繼承RecursiveTask
,並覆寫compute
方法。任務的fork經過ForkJoinTask
的fork方法執行,join方法方法用於等待任務執行後返回:框架
public class ForkJoinWork extends RecursiveTask<Long> { private Long start;//起始值 private Long end;//結束值 public static final Long critical = 100000L;//臨界值 public ForkJoinWork(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { // return null; //判斷是不是拆分完畢 Long lenth = end - start; //起始值差值 if (lenth <= critical) { //若是拆分完畢就相加 Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; } else { //沒有拆分完畢就開始拆分 Long middle = (end + start) / 2;//計算的兩個值的中間值 ForkJoinWork right = new ForkJoinWork(start, middle); right.fork();//拆分,並壓入線程隊列 ForkJoinWork left = new ForkJoinWork(middle + 1, end); left.fork();//拆分,並壓入線程隊列 //合併 return right.join() + left.join(); } } }
測試:less
public class ForkJoinWorkTest { @Test public void test() { //ForkJoin實現 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持 ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);//參數爲起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 56418 //ForkJoinWork forkJoinWork = new ForkJoinWork(0L, 10000000000L); } @Test public void test2() { //普通線程實現 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x += i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 64069 } @Test public void test3() { //Java 8 並行流的實現 long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 2152 } }
結論:Java 8 就爲咱們提供了一個並行流來實現ForkJoin實現的功能。能夠看到並行流比本身實現ForkJoin還要快。dom
Java 8 中將並行流進行了優化,咱們能夠很容易的對數據進行並行流的操做,Stream API能夠聲明性的經過parallel()與sequential()在並行流與串行流中隨意切換!
核心組件
F/J框架的實現很是複雜,內部大量運用了位操做和無鎖算法,撇開這些實現細節不談,該框架主要涉及三大核心組件:ForkJoinPool(線程池)、ForkJoinTask(任務)、ForkJoinWorkerThread(工做線程),外加WorkQueue(任務隊列):
- ForkJoinPool:ExecutorService的實現類,負責工做線程的管理、任務隊列的維護,以及控制整個任務調度流程;
- ForkJoinTask:Future接口的實現類,fork是其核心方法,用於分解任務並異步執行;而join方法在任務結果計算完畢以後纔會運行,用來合併或返回計算結果;
- ForkJoinWorkerThread:Thread的子類,做爲線程池中的工做線程(Worker)執行任務;
- WorkQueue:任務隊列,用於保存任務;
ForkJoinPool
ForkJoinPool做爲Executors框架的一員,從外部看與其它線程池並無什麼區別,僅僅是ExecutorService的一個實現類:
ForkJoinPool的主要工做以下:
- 接受外部任務的提交(外部調用ForkJoinPool的invoke/execute/submit方法提交任務);
- 接受ForkJoinTask自身fork出的子任務的提交;
- 任務隊列數組(WorkQueue[])的初始化和管理;
工做線程(Worker)的建立/管理。
注意:ForkJoinPool提供了3類外部提交任務的方法:invoke
、execute
、submit
,它們的主要區別在於任務的執行方式上。
- 經過invoke方法提交的任務,調用線程直到任務執行完成纔會返回,也就是說這是一個同步方法,且有返回結果;
- 經過execute方法提交的任務,調用線程會當即返回,也就是說這是一個異步方法,且沒有返回結果;
- 經過submit方法提交的任務,調用線程會當即返回,也就是說這是一個=異步=方法,且有==返回結果(返回Future實現類,能夠經過get獲取結果)。
注意:ForkJoinPool支持兩種模式:
同步模式(默認方式)
異步模式
這裏的同步/異步並不是指F/J框架自己是採用同步模式仍是採用異步模式工做,而是指其中的工做線程的工做方式。在F/J框架中,每一個工做線程(Worker)都有一個屬於本身的任務隊列(WorkQueue),這是一個底層採用數組實現的雙向隊列。
同步是指:對於工做線程(Worker)自身隊列中的任務,採用後進先出(LIFO)的方式執行;異步是指:對於工做線程(Worker)自身隊列中的任務,採用先進先出(FIFO)的方式執行
ForkJoinTask
從Fork/Join框架的描述上來看,「任務」必需要知足必定的條件:
支持Fork,即任務自身的分解
支持Join,即任務結果的合併
所以,J.U.C提供了一個抽象類——ForkJoinTask,來做爲該類Fork/Join任務的抽象定義
ForkJoinTask實現了Future接口,是一個異步任務,咱們在使用Fork/Join框架時,通常須要使用線程池來調度任務,線程池內部調度的其實都是ForkJoinTask任務(即便提交的是一個Runnable或Callable任務,也會被適配成ForkJoinTask)。
除了ForkJoinTask,Fork/Join框架還提供了兩個它的抽象實現,咱們在自定義ForkJoin任務時,通常繼承這兩個類:
RecursiveAction:表示具備返回結果的ForkJoin任務
RecursiveTask:表示沒有返回結果的ForkJoin任務
ForkJoinWorkerThread
Fork/Join框架中,每一個工做線程(Worker)都有一個本身的任務隊列(WorkerQueue), 因此須要對通常的Thread作些特性化處理,J.U.C提供了ForkJoinWorkerThread類做爲ForkJoinPool中的工做線程:
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // 該工做線程歸屬的線程池 final ForkJoinPool.WorkQueue workQueue; // 對應的任務隊列 protected ForkJoinWorkerThread(ForkJoinPool pool) { super("aForkJoinWorkerThread"); // 指定工做線程名稱 this.pool = pool; this.workQueue = pool.registerWorker(this); } // ... }
ForkJoinWorkerThread 在構造過程當中,會。同時,它會經過ForkJoinPool的registerWorker方保存所屬線程池信息和與本身綁定的任務隊列信息法將本身註冊到線程池中。
WorkQueue
任務隊列(WorkQueue)是ForkJoinPool與其它線程池區別最大的地方,在ForkJoinPool內部,維護着一個WorkQueue[]數組,它會在外部首次提交任務)時進行初始化:
volatile WorkQueue[] workQueues; // main registry
當經過線程池的外部方法(submit、invoke、execute)提交任務時,若是WorkQueue[]沒有初始化,則會進行初始化;而後根據數組大小和線程隨機數(ThreadLocalRandom.probe)等信息,計算出任務隊列所在的數組索引(這個索引必定是偶數),若是索引處沒有任務隊列,則初始化一個,再將任務入隊。也就是說,經過外部方法提交的任務必定是在偶數隊列,沒有綁定工做線程。
WorkQueue做爲ForkJoinPool的內部類,表示一個雙端隊列雙端隊列,既能夠做爲棧使用(LIFO),也能夠做爲隊列使用(FIFO)。ForkJoinPool的「工做竊取」正是利用了這個特色,當工做線程從本身的隊列中獲取任務時,默認老是以棧操做(LIFO)的方式從棧頂取任務;當工做線程嘗試竊取其它任務隊列中的任務時,則是FIFO的方式。
線程池中的每一個工做線程(ForkJoinWorkerThread)都有一個本身的任務隊列(WorkQueue),工做線程優先處理自身隊列中的任務(LIFO或FIFO順序,由線程池構造時的參數 mode 決定),自身隊列爲空時,以FIFO的順序隨機竊取其它隊列中的任務。
F/J框架的核心來自於它的工做竊取及調度策略,能夠總結爲如下幾點:
- 每一個Worker線程利用它本身的任務隊列維護可執行任務;
- 任務隊列是一種雙端隊列,支持LIFO的push和pop操做,也支持FIFO的take操做;
- 任務fork的子任務,只會push到它所在線程(調用fork方法的線程)的隊列;
- 工做線程既可使用LIFO經過pop處理本身隊列中的任務,也能夠FIFO經過poll處理本身隊列中的任務,具體取決於構造線程池時的asyncMode參數;
- 當工做線程本身隊列中沒有待處理任務時,它嘗試去隨機讀取(竊取)其它任務隊列的base端的任務;
- 當線程進入join操做,它也會去處理其它工做線程的隊列中的任務(本身的已經處理完了),直到目標任務完成(經過isDone方法);
- 當一個工做線程沒有任務了,而且嘗試從其它隊列竊取也失敗了,它讓出資源(經過使用yields, sleeps或者其它優先級調整)而且隨後會再次激活,直到全部工做線程都空閒了——此時,它們都阻塞在等待另外一個頂層線程的調用。
CountDownLatch
CountDownLatch是一個很是實用的多線程控制工具類,能夠簡單聯想到下課倒計時一塊兒開飯,百米賽跑一塊兒跑。經常使用的就下面幾個方法:
CountDownLatch(int count) //實例化一個倒計數器,count指定計數個數 countDown() // 計數減一 await() //等待,當計數減到0時,全部線程並行執行
CountDownLatch在咱們工做的多個場景被使用,算是用的很頻繁的了,好比咱們的API接口響應時間被要求在200ms之內,可是若是一個接口內部依賴多個三方/外部服務,那串行調用接口的RT必然好久,因此我的用的最多的是接口RT優化場景,內部服務並行調用。
對於倒計數器,一種典型的場景就是火箭發射。在火箭發射前,爲了保證萬無一失,每每還要進行各項設備、儀器的檢測。只有等到全部的檢查完畢後,引擎才能點火。那麼在檢測環節固然是多個檢測項能夠的。同時進行代碼:
/** * @Description: 倒計時器示例:火箭發射 */ public class CountDownLatchDemo implements Runnable{ static final CountDownLatch latch = new CountDownLatch(10); static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { // 模擬檢查任務 try { Thread.sleep(new Random().nextInt(10) * 1000); System.out.println("檢查完畢"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //計數減一 //放在finally避免任務執行過程出現異常,致使countDown()不能被執行 latch.countDown(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for (int i=0; i<10; i++){ exec.submit(demo); } // 等待檢查 latch.await(); // 外部主線程main 方法來等待下面運行!!! // 發射火箭 System.out.println("Fire!"); // 關閉線程池 exec.shutdown(); } }
上述代碼中咱們先生成了一個CountDownLatch
實例。計數數量爲10,這表示須要有10個線程來完成任務,等待在CountDownLatch
上的線程才能繼續執行。latch.countDown()
; 方法做用是通知CountDownLatch
有一個線程已經準備完畢,倒計數器能夠減一了。atch.await()
方法要求主線程等待全部10個檢查任務所有準備好才一塊兒並行執行。
latch.countDown()
的調用不必定非要開啓線程執行,即便你在主線程中下面這樣寫效果也是同樣。
for (int i = 0; i < 10; i++) { countDownLatch.countDown(); }
CyclicBarrier
這個類的中文意思是循環柵欄。大概的意思就是一個可循環利用的屏障。
它的做用就是會讓全部線程都等待完成後纔會繼續下一步行動。
舉個例子,就像生活中咱們會約朋友們到某個餐廳一塊兒吃飯,有些朋友可能會早到,有些朋友可能會晚到,可是這個餐廳規定必須等到全部人到齊以後纔會讓咱們進去。這裏的朋友們就是各個線程,餐廳就是 CyclicBarrier。
構造方法
public CyclicBarrier(int parties) public CyclicBarrier(int parties, Runnable barrierAction)
parties 是參與線程的個數
第二個構造方法有一個 Runnable 參數,這個參數的意思是到達線程最後一個要作的任務
重要方法:
public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
線程調用 await() 表示本身已經到達柵欄
BrokenBarrierException 表示柵欄已經被破壞,破壞的緣由多是其中一個線程 await() 時被中斷或者超時
demo:一個線程組的線程須要等待全部線程完成任務後再繼續執行下一次任務
public class CyclicBarrierTest { public static void main(String[] args) { //定義一個計數器,當計數器的值累加到30,輸出"放行" CyclicBarrier cyclicBarrier = new CyclicBarrier(30,()->{ System.out.println("放行"); }); for (int i = 1; i <= 90; i++) { final int temp = i; new Thread(()->{ System.out.println("-->"+temp); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
上面的結果會出現3次放行哦。
CyclicBarrier 與 CountDownLatch 區別
CountDownLatch 是一次性的,CyclicBarrier 是可循環利用的
CountDownLatch 參與的線程的職責是不同的,有的在倒計時,有的在等待倒計時結束。CyclicBarrier 參與的線程職責是同樣的。
CountDownLatch 作減法計算,count=0,喚醒阻塞線程,CyclicBarrier 作加法計算,count=屏障值(parties),喚醒阻塞線程。
最重要:CountDownLatch的放行由第三者控制,CyclicBarrier是由一組線程自己來控制的, CountDownLatch放行條件>=線程數。CyclicBarrier放行條件=線程數。
Semaphore
用途:控制同時訪問某個特定資源的線程數據,用來流量控制。
一個超市只能容納5我的購物,其他人排隊。
public class SemaphoreTest { public static void main(String[] args) { //同時只能進5我的 Semaphore semaphore = new Semaphore(5); for (int i = 0; i < 15; i++) { new Thread(() -> { try { //得到許可 semaphore.acquire(); // 已經進店人+1 System.out.println(Thread.currentThread().getName() + "進店購物"); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + "出店"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //釋放許可 semaphore.release(); //已經進店人 -1 } }, String.valueOf(i)).start(); } } }
實現數據庫鏈接池
數據庫鏈接實現:
public class SqlConnectImpl implements Connection{ /*拿一個數據庫鏈接*/ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } }
鏈接池的實現:
public class DBPoolSemaphore { private final static int POOL_SIZE = 10; private final Semaphore useful, useless;//useful表示可用的數據庫鏈接,useless表示已用的數據庫鏈接 public DBPoolSemaphore() { this.useful = new Semaphore(POOL_SIZE); this.useless = new Semaphore(0); } //存放數據庫鏈接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } /*歸還鏈接*/ public void returnConnect(Connection connection) throws InterruptedException { if (connection != null) { System.out.println("當前有" + useful.getQueueLength() + "個線程等待數據庫鏈接!!" + "可用鏈接數:" + useful.availablePermits()); useless.acquire();// 可用鏈接 +1 synchronized (pool) { pool.addLast(connection); } useful.release(); // 已用鏈接 -1 } } /*從池子拿鏈接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); // 可用鏈接-1 Connection conn; synchronized (pool) { conn = pool.removeFirst(); } useless.release(); // 以用鏈接+1 return conn; } }
測試代碼:
public class AppTest { private static DBPoolSemaphore dbPool = new DBPoolSemaphore(); //業務線程 private static class BusiThread extends Thread { @Override public void run() { Random r = new Random();//讓每一個線程持有鏈接的時間不同 long start = System.currentTimeMillis(); try { Connection connect = dbPool.takeConnect(); System.out.println("Thread_" + Thread.currentThread().getId() + "_獲取數據庫鏈接共耗時【" + (System.currentTimeMillis() - start) + "】ms."); SleepTools.ms(100 + r.nextInt(100));//模擬業務操做,線程持有鏈接查詢數據 System.out.println("查詢數據完成,歸還鏈接!"); dbPool.returnConnect(connect); } catch (InterruptedException e) { } } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { Thread thread = new BusiThread(); thread.start(); } } }
Exchange
兩個線程間的數據交換,侷限性比較大。Exchange
是 阻塞形式的,兩個線程要都到達執行Exchange
函數纔會交換。
public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>(); public static void main(String[] args) { //第一個線程 new Thread(new Runnable() { @Override public void run() { Set<String> setA = new HashSet<String>();//存放數據的容器 try { setA.add("liu"); setA.add("Liu"); setA.add("LIU"); setA = exchange.exchange(setA);//交換set /*處理交換後的數據*/ } catch (InterruptedException e) { } } }).start(); //第二個線程 new Thread(new Runnable() { @Override public void run() { Set<String> setB = new HashSet<String>();//存放數據的容器 try { setB.add("jin"); setB.add("Jie"); setB.add("JIN"); setB = exchange.exchange(setB);//交換set /*處理交換後的數據*/ } catch (InterruptedException e) { } } }).start(); } }
Callable,Future,FutureTask
這三個組合使用,通常咱們能夠將耗時任務用子線程去執行,同時執行咱們本身的主線程任務。主線程執行任務完畢後再調Future.get()來得到子線程任務。
說明:
Callable有返回值可拋出異常,其中返回值有Future得到。
Future 得到返回值。
FutureTask實現Future跟Runnable。1.7前用AQS實現的,1.8之後再也不是。
Future主要函數功能:
- isDone,結束,正常仍是異常結束,或者本身取消,都返回true;
- isCancelled 任務完成前被取消,返回true;
- cancel(boolean):
- 任務還沒開始,返回false
- 任務已經啓動,cancel(true)
- 中斷正在運行的任務,中斷成功,返回true
- cancel(false),不會去中斷已經運行的任務
- 任務已經結束,返回false
demo:
public class UseFuture { /*實現Callable接口,容許有返回值*/ private static class UseCallable implements Callable<Integer> { private int sum; @Override public Integer call() throws Exception { System.out.println("Callable子線程開始計算"); Thread.sleep(2000); for (int i = 0; i < 5000; i++) { sum = sum + i; } System.out.println("Callable子線程計算完成,結果=" + sum); return sum; } } public static void main(String[] args) throws InterruptedException, ExecutionException { UseCallable useCallable = new UseCallable(); FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable); new Thread(futureTask).start(); Random r = new Random(); SleepTools.second(1); if (r.nextBoolean()) { // 方法調用返回下一個僞均勻分佈的boolean值 System.out.println("Get UseCallable result = " + futureTask.get()); } else { System.out.println("中斷計算"); futureTask.cancel(true); } } }
參考
本文同步分享在 博客「SoWhat1412」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。