一、併發html
分工:如何高效地拆解任務並分配給線程java
同步:線程之間如何協做緩存
互斥:保證同一時刻只容許一個線程訪問共享資源安全
Fork/Join 框架就是一種分工模式,CountDownLatch 就是一種典型的同步方式,而可重入鎖則是一種互斥手段。網絡
二、可見性、原子性、有序性多線程
(1)可見性:緩存致使併發
(2)原子性:線程切換框架
count+=1dom
(3)有序性:編譯優化異步
三、java內存模型
(1)可見性:緩存致使-----按需禁用緩存
(2)有序性:編譯優化-----按需禁用
volatile int x=0;(該變量的讀寫,不使用cpu緩存,直接使用內存讀取或者寫入)
(3)原子性:同一時刻,只有一個線程執行,互斥。
synchronized
四、死鎖
死鎖發生的條件:
(1)互斥,共享資源x和y只能被一個線程佔用
(2)佔有且等待,線程 T1 已經取得共享資源 X,在等待共享資源 Y 的時候,不釋放共享資源 X;
破壞佔用且等待條件:一次性申請全部資源
(3)不可搶佔,其餘線程不能強行搶佔線程 T1 佔有的資源;
破壞不可強佔條件
(4)循環等待,線程 T1 等待線程 T2 佔有的資源,線程 T2 等待線程 T1 佔有的資源,就是循環等待。
破壞循環等待條件:
wait和sleep區別
1:wait釋放資源,sleep不釋放資源
2:wait須要被喚醒,sleep不須要
3:wait須要獲取到監視器,不然拋異常,sleep不須要
4:wait是object頂級父類的方法,sleep則是Thread的方法
5.CountDownLatch和CyclicBarrier:如何讓多線程步調一致?(主線程等待子線程結束)
Thread t1 = new Thread(() -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); Thread t2 = new Thread(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); t2.start();
//實現等待 t1.join();
t2.join(); System.out.println("=============");
線程池
Executor executor = Executors.newFixedThreadPool(2); CountDownLatch latch = new CountDownLatch(2); executor.execute(()->{ try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); executor.execute(()->{ try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); latch.await(); System.out.println("=============");
CountDownLatch 主要用來解決一個線程等待多個線程的場景。(CountDownLatch 的計數器是不能循環利用的,也就是說一旦計數器減到 0,再有線程調用 await(),該線程會直接經過。)
CyclicBarrier ---------- A線程執行,B線程執行,A、B其中一個線程等到AB執行完成再執行(不是主線程,且是異步的)
參考:https://www.cnblogs.com/dolphin0520/p/3920397.html
6.併發容器
List、Map、Set、Queue
非線程安全:ArrayList、HashMap
7.原子類
8.線程池、Executor
ThreadPoolExecutor
線程池其實是生產者 - 消費者模式
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
9.Future
ExecutorService executor = Executors.newFixedThreadPool(1); // 建立 Result 對象 r Result r = new Result(); r.setAAA(a); // 提交任務 Future<Result> future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 fr === r; fr.getAAA() === a; fr.getXXX() === x class Task implements Runnable{ Result r; // 經過構造函數傳入 result Task(Result r){ this.r = r; } void run() { // 能夠操做 result a = r.getAAA(); r.setXXX(x); } }
// 建立 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2); // 建立線程池 ExecutorService es = Executors.newCachedThreadPool(); // 提交 FutureTask es.submit(futureTask); // 獲取計算結果 Integer result = futureTask.get();
// 建立 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2); // 建立並啓動線程 Thread T1 = new Thread(futureTask); T1.start(); // 獲取計算結果 Integer result = futureTask.get();
// 建立任務 T2 的 FutureTask FutureTask<String> ft2 = new FutureTask<>(new T2Task()); // 建立任務 T1 的 FutureTask FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2)); // 線程 T1 執行任務 ft1 Thread T1 = new Thread(ft1); T1.start(); // 線程 T2 執行任務 ft2 Thread T2 = new Thread(ft2); T2.start(); // 等待線程 T1 執行結果 System.out.println(ft1.get()); // T1Task 須要執行的任務: // 洗水壺、燒開水、泡茶 class T1Task implements Callable<String>{ FutureTask<String> ft2; // T1 任務須要 T2 任務的 FutureTask T1Task(FutureTask<String> ft2){ this.ft2 = ft2; } @Override String call() throws Exception { System.out.println("T1: 洗水壺..."); TimeUnit.SECONDS.sleep(1); System.out.println("T1: 燒開水..."); TimeUnit.SECONDS.sleep(15); // 獲取 T2 線程的茶葉 String tf = ft2.get(); System.out.println("T1: 拿到茶葉:"+tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; } } // T2Task 須要執行的任務: // 洗茶壺、洗茶杯、拿茶葉 class T2Task implements Callable<String> { @Override String call() throws Exception { System.out.println("T2: 洗茶壺..."); TimeUnit.SECONDS.sleep(1); System.out.println("T2: 洗茶杯..."); TimeUnit.SECONDS.sleep(2); System.out.println("T2: 拿茶葉..."); TimeUnit.SECONDS.sleep(1); return " 龍井 "; } } // 一次執行結果: T1: 洗水壺... T2: 洗茶壺... T1: 燒開水... T2: 洗茶杯... T2: 拿茶葉... T1: 拿到茶葉: 龍井 T1: 泡茶... 上茶: 龍井
10.CompletableFuture
// 任務 1:洗水壺 -> 燒開水 CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 燒開水..."); sleep(15, TimeUnit.SECONDS); }); // 任務 2:洗茶壺 -> 洗茶杯 -> 拿茶葉 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶葉..."); sleep(1, TimeUnit.SECONDS); return " 龍井 "; }); // 任務 3:任務 1 和任務 2 完成後執行:泡茶 CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1: 拿到茶葉:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; }); // 等待任務 3 執行結果 System.out.println(f3.join()); void sleep(int t, TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){} } // 一次執行結果: T1: 洗水壺... T2: 洗茶壺... T1: 燒開水... T2: 洗茶杯... T2: 拿茶葉... T1: 拿到茶葉: 龍井 T1: 泡茶... 上茶: 龍井
11.CompletionService
參考: http://blog.csdn.net/lmj623565791/article/details/27250059 https://www.cnblogs.com/hrhguanli/p/3998865.html
普通狀況下,咱們使用Runnable做爲主要的任務表示形式,但是Runnable是一種有很是大侷限的抽象,run方法中僅僅能記錄日誌,打印,或者把數據彙總入某個容器(一方面內存消耗大,還有一方面需要控制同步,效率很是大的限制),總之不能返回運行的結果;比方同一時候1000個任務去網絡上抓取數據,而後將抓取到的數據進行處理(處理方式不定),我認爲最好的方式就是提供回調接口,把處理的方式最爲回調傳進去;但是現在咱們有了更好的方式實現:CompletionService + Callable
Callable的call方法可以返回運行的結果;
CompletionService將Executor(線程池)和BlockingQueue(堵塞隊列)結合在一塊兒,同一時候使用Callable做爲任務的基本單元,整個過程就是生產者不斷把Callable任務放入堵塞對了,Executor做爲消費者不斷把任務取出來運行,並返回結果;
優點:
a、堵塞隊列防止了內存中排隊等待的任務過多,形成內存溢出(畢竟通常生產者速度比較快,比方爬蟲準備好網址和規則,就去運行了,運行起來(消費者)仍是比較慢的)
b、CompletionService可以實現,哪一個任務先運行完畢就返回,而不是按順序返回,這樣可以極大的提高效率;
package com.zhy.concurrency.completionService; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; /** * 將Executor和BlockingQueue功能融合在一塊兒,可以將Callable的任務提交給它來運行, 而後使用take()方法得到已經完畢的結果 * * @author zhy * */ public class CompletionServiceDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { /** * 內部維護11個線程的線程池 */ ExecutorService exec = Executors.newFixedThreadPool(11); /** * 容量爲10的堵塞隊列 */ final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>( 10); //實例化CompletionService final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( exec, queue); /** * 模擬瞬間產生10個任務,且每個任務運行時間不一致 */ for (int i = 0; i < 10; i++) { completionService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int ran = new Random().nextInt(1000); Thread.sleep(ran); System.out.println(Thread.currentThread().getName() + " 歇息了 " + ran); return ran; } }); } /** * 立刻輸出結果 */ for (int i = 0; i < 10; i++) { try { //誰最早運行完畢,直接返回 Future<Integer> f = completionService.take(); System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } exec.shutdown(); } } 輸出結果: pool-1-thread-4 歇息了 52 52 pool-1-thread-1 歇息了 59 59 pool-1-thread-10 歇息了 215 215 pool-1-thread-9 歇息了 352 352 pool-1-thread-5 歇息了 389 389 pool-1-thread-3 歇息了 589 589 pool-1-thread-2 歇息了 794 794 pool-1-thread-7 歇息了 805 805 pool-1-thread-6 歇息了 909 909 pool-1-thread-8 歇息了 987 987
2.ExecutorService.invokeAll
ExecutorService的invokeAll方法也能批量運行任務,並批量返回結果,但是呢,有個我認爲很是致命的缺點,必須等待所有的任務運行完畢後統一返回,一方面內存持有的時間長;還有一方面響應性也有必定的影響,畢竟你們都喜歡看看刷刷的運行結果輸出,而不是苦苦的等待;
package com.zhy.concurrency.executors; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestInvokeAll { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = Executors.newFixedThreadPool(10); List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>(); Callable<Integer> task = null; for (int i = 0; i < 10; i++) { task = new Callable<Integer>() { @Override public Integer call() throws Exception { int ran = new Random().nextInt(1000); Thread.sleep(ran); System.out.println(Thread.currentThread().getName()+" 歇息了 " + ran ); return ran; } }; tasks.add(task); } long s = System.currentTimeMillis(); List<Future<Integer>> results = exec.invokeAll(tasks); System.out.println("運行任務消耗了 :" + (System.currentTimeMillis() - s) +"毫秒"); for (int i = 0; i < results.size(); i++) { try { System.out.println(results.get(i).get()); } catch (Exception e) { e.printStackTrace(); } } exec.shutdown(); } } 運行結果: pool-1-thread-10 歇息了 1 pool-1-thread-5 歇息了 59 pool-1-thread-6 歇息了 128 pool-1-thread-1 歇息了 146 pool-1-thread-3 歇息了 158 pool-1-thread-7 歇息了 387 pool-1-thread-9 歇息了 486 pool-1-thread-8 歇息了 606 pool-1-thread-4 歇息了 707 pool-1-thread-2 歇息了 817 運行任務消耗了 :819毫秒 146 817 158 707 59 128 387 606 486 1
12.Fork/Join 單機版本的MapReduce
分治任務
static void main(String[] args){ // 建立分治任務線程池 ForkJoinPool fjp = new ForkJoinPool(4); // 建立分治任務 Fibonacci fib = new Fibonacci(30); // 啓動分治任務 Integer result = fjp.invoke(fib); // 輸出結果 System.out.println(result); } // 遞歸任務 static class Fibonacci extends RecursiveTask<Integer>{ final int n; Fibonacci(int n){this.n = n;} protected Integer compute(){ if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); // 建立子任務 f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); // 等待子任務結果,併合並結果 return f2.compute() + f1.join(); } }
13.ThreadLocal