併發一般是用於提升運行在單處理器
上的程序的性能。在單 CPU 機器上使用多任務的程序在任意時刻只在執行一項工做。html
併發編程使得一個程序能夠被劃分爲多個分離的、獨立的任務。一個線程就是在進程中的一個單一的順序控制流。java的線程機制是搶佔式。
java
線程的好處是提供了輕量級的執行上下文切換,只改變了程序的執行序列和局部變量。程序員
多線程的主要缺陷:<!-- java編程思想 -->算法
- 等待共享資源的時候性能下降。
- 須要處理線程的額外 CPU 花費。
- 糟糕的程序設計致使沒必要要的複雜度。
- 有可能產生一些病態行爲,若餓死、競爭、死鎖和活鎖。
- 不一樣平臺致使的不同。
源來:數據庫
當程序運行,JVM會爲每個線程分配一個獨立的緩存用於提升執行效率,每個線程都在本身獨立的緩存中操做各自的數據。一個線程在緩衝中對數據進行修改,寫入到主存後,其餘線程沒法得知數據已被更改,仍在操做緩存中已過期的數據,爲了解決這個問題,提供了volatile關鍵字,實現內存可見,一旦主存數據被修改,便導致其餘線程緩存數據行無效,強制前往主存獲取新數據。編程
Example:內存不可見,致使主線程沒法結束。api
class ThreadDemo implements Runnable { //添加volatile關鍵字可實現內存可見性 public volatile boolean flag = false; public boolean flag = Boolean.false; @Override public void run() { try { Thread.sleep(200); } catch (InterruptedException e) { } flag = Boolean.true; System.out.println("ThreadDemo over"); } public boolean isFlag() { return flag; } } public class TestVolatile { public static void main(String[] args) { ThreadDemo demo = new ThreadDemo(); new Thread(demo).start(); while (true) { if (demo.flag || demo.isFlag()) { System.out.println("Main over"); break; } } } }/*output:打印ThreadDemo over,主線程持續循環*/
做用:數組
當多個線程操做共享數據時,保證內存中的數據可見性。採用底層的內存柵欄,及時的將緩存中修改的數據刷新到主存中,並致使其餘線程所緩存的數據無效,使得這些線程必須去主存中獲取修改的數據。緩存
優缺點:安全
Example:使用volatile修飾,number自增問題。
class ThreadDemo implements Runnable { public volatile int number = 0; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } System.out.print(getIncrementNumber() + " "); } public int getIncrementNumber() { return ++number; } } public class TestAtomic { public static void main(String[] args) { ThreadDemo demo = new ThreadDemo(); for (int i = 0; i < 10; i++) { new Thread(demo).start(); } } }/*output: 1 5 4 7 3 9 2 1 8 6 */
// ++number底層原理思想 int temp = number; // ① number = number + 1; // ② temp = number; // ③ return temp; // ④
由 ++number 可知,返回的是 temp 中存儲的值,且自增是一個多步操做,當多個線程調用 incrementNumber方法時,方法去主存中獲取 number 值放入 temp 中,根據 CPU 時間片切換,當 A 線程完成了 ③ 操做時,時間片到了被中斷,A 線程開始執行 ① 時不幸被中斷,接着 A 獲取到了CPU執行權,繼續執行完成 ④ 操做更新了主存中的值,緊接着 B 線程開始執行,可是 B 線程 temp中存儲的值已通過時了。注意:自增操做爲四步,只有在第四步的時候纔會刷新主存的值,而不是number = number + 1 操做就反映到主存中去。
如圖所示:
源來:
volatile只能保證內存可見性,對多步操做的變量,沒法保證其原子性,爲了解決這個問題,提供了原子變量。
做用:
原子變量既含有volatile的內存可見性,又提供了對變量原子性操做的支持,採用底層硬件對併發操做共享數據的 CAS(Compare-And-Swap)算法,保證數據的原子性。
提供的原子類:
類 | 描述 |
---|---|
AtomicBoolean | 一個 boolean 值能夠用原子更新。 |
AtomicInteger | 可能原子更新的 int 值。 |
AtomicIntegerArray | 一個 int 數組,其中元素能夠原子更新。 |
AtomicIntegerFieldUpdater<T> | 基於反射的實用程序,能夠對指定類的指定的 volatile int 字段進行原子更新。 |
AtomicLong | 一個 long 值能夠用原子更新。 |
AtomicLongArray | 能夠 long 地更新元素的 long 數組。 |
AtomicLongFieldUpdater<T> | 基於反射的實用程序,能夠對指定類的指定的 volatile long 字段進行原子更新。 |
AtomicMarkableReference<V> | AtomicMarkableReference 維護一個對象引用以及能夠原子更新的標記位。 |
AtomicReference<V> | 能夠原子更新的對象引用。 |
AtomicReferenceArray<E> | 能夠以原子方式更新元素的對象引用數組。 |
AtomicReferenceFieldUpdater<T,V> | 一種基於反射的實用程序,能夠對指定類的指定的 volatile volatile引用原子更新。 |
AtomicStampedReference<V> | AtomicStampedReference 維護對象引用以及能夠原子更新的整數「印記」。 |
DoubleAccumulator | 一個或多個變量一塊兒維護使用提供的功能更新的運行的值 double 。 |
DoubleAdder | 一個或多個變量一塊兒保持初始爲零 double 和。 |
LongAccumulator | 一個或多個變量,它們一塊兒保持運行 long 使用所提供的功能更新值。 |
LongAdder | 一個或多個變量一塊兒保持初始爲零 long 總和。 |
CAS算法:
CAS(Compare-And-Swap)是底層硬件對於原子操做的一種算法,其包含了三個操做數:內存值(V),預估值(A),更新值(B)。當且僅當 V == A 時, 執行 V = B 操做;不然不執行任何結果。這裏須要注意,A 和 B 兩個操做數是原子性的,同一時刻只能有一個線程進行AB操做。
優缺點:
HashMap 與 HashTable簡述
HashMap是線程不安全的,而HashTable是線程安全的,由於HashTable所維護的Hash表存在着獨佔鎖,當多個線程併發訪問時,只能有一個線程可進行操做,可是對於複合操做時,HashTable仍然存在線程安全問題,不使用HashTable的主要緣由仍是效率低下。
// 功能:不包含obj,則添加 if (!hashTable.contains(obj)) { // 複合操做,執行此處時線程中斷,obj被其餘線程添加至容器中,此處繼續執行將致使重複添加 hashTable.put(obj); } 可知上述兩個操做須要 「原子性」,爲了達到效果,還不是得對代碼塊進行同步
ConcurrentHashMap
採用鎖分段機制,分爲 16 個段(併發級別),每個段下有一張表,該表採用鏈表結構連接着各個元素,每一個段都使用獨立的鎖。當多個線程併發操做的時候,根據各自的級別不一樣,操做不一樣的段,多個線程並行操做,明顯提升了效率,其次還提供了複合操做的諸多方法。注:jdk1.8由原來的數組+單向鏈表結構轉換成數據+單向鏈表+紅黑樹結構。
ConcurrentSkipListMap和ConcurrentSkipListSet
有序的哈希表,經過跳錶實現,不容許null做爲鍵或值。ConcurrentSkipListMap詳解
CopyOnWriteArrayList 和 CopyOnWriteArraySet
對collection進行寫入操做時,將致使建立整個底層數組的副本,而源數組將保留在原地,使得複製的數組在被修改時,讀取操做能夠安全的執行。當修改完成時,一個原子性的操做將把心的數組換人,使得新的讀取操做能夠看到新的修改。<!--Java編程思想-->
好處之一是當多個迭代器同時遍歷和修改列表時,不會拋出ConcurrentModificationException。
小結:
源由:
當一個修房子的 A 線程正在執行,須要磚頭時,開啓了一個線程 B 去拉磚頭,此時 A 線程須要等待 B 線程的結果後才能繼續執行時,可是線程之間都是並行操做的,爲了解決這個問題,提供了CountDownLatch。
做用:
一個同步輔助類,爲了保證執行某些操做時,「全部準備事項都已就緒」,僅當某些操做執行完畢後,才能執行後續的代碼塊,不然一直等待。
CountDownLatch中存在一個鎖計數器,若是鎖計數器不爲 0 的話,它會阻塞任何一個調用 await() 方法的線程。也就是說,當一個線程調用 await() 方法時,若是鎖計數器不等於 0,那麼就會一直等待鎖計數器爲 0 的那一刻,這樣就解決了須要等待其餘線程執行完畢才執行的需求。
Example:
class ThreadDemo implements Runnable { private CountDownLatch latch = null; public ThreadDemo(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { System.out.println("execute over"); } finally { latch.countDown(); // 必須保證計數器減一 } } } public class TestCountDownLatch { public static void main(String[] args) { final int count = 10; final CountDownLatch latch = new CountDownLatch(count); ThreadDemo demo = new ThreadDemo(latch); for (int i = 0; i < count; ++i) { new Thread(demo).start(); } try { latch.await(); // 等待計數器爲 0 System.out.println("其餘線程結束,繼續往下執行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }/**output: execute over ... 其餘線程結束,繼續往下執行... */
細節:
源由:
當開啓一個線程執行運算時,可能會須要該線程的計算結果,以前的 implements Runnable
和 extends Thread
的 run() 方法並無提供能夠返回的功能,所以提供了 Callable接口。 Callable 的運行結果, 須要使用 FutureTask 類來接受。
Example:
class ThreadDemo implements Callable<Integer> { private Integer cycleValue; public ThreadDemo(Integer cycleValue) { this.cycleValue = cycleValue; } @Override public Integer call() throws Exception { int result = 0; for (int i=0; i<cycleValue; ++i) { result += i; } return result; } } public class TestCallable { public static void main(String[] args) throws Exception { ThreadDemo demo = new ThreadDemo(Integer.MAX_VALUE); // 使用FutureTask接受結果 FutureTask<Integer> task = new FutureTask<>(demo); new Thread(task).start(); Integer result = task.get(); // 等待計算結果返回, 閉鎖 System.out.println(result); } }/*output:1073741825 */
Lock:在進行性能測試時,使用Lock一般會比使用synchronized要高效許多,而且synchronized的開銷變化範圍很大,而Lock相對穩定。只有在性能調優時才使用Lock對象。<!--Java編程思想-->
Condition: 替代了 Object 監視器方法的使用,描述了可能會與鎖有關的條件標量,相比 Object 的 notifyAll()
,Condition 的 signalAll()
更安全。Condition 實質上被綁定到一個鎖上,使用newCondition() 方法爲 Lock 實例獲取 Condition。
Lock和Condition對象只有在困難的多線程問題中才是必須的。<!--Java編程思想-->
synchonized與Lock的區別:
synchonized | Lock |
---|---|
隱式鎖 | 顯示鎖 |
JVM底層實現,由JVM維護 | 由程序員手動維護 |
靈活控制(也有風險) |
「虛假喚醒」:當一個線程A在等待時,被另外一個線程喚醒,被喚醒的線程不必定知足了可繼續向下執行的條件,若是被喚醒的線程未知足條件,而又向下執行了,那麼稱這個現象爲 「虛假喚醒」。
// 安全的方式,保證退出等待循環前,必定能知足條件 while (條件) { wait(); }
Example:生產消費者<!--參考Java編程思想 P712-->
// 產品car class Car { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean available = false; // false:無貨;true有貨 public void put(){ lock.lock(); try { while (available) { // 有貨等待 condition.await(); } System.out.println(Thread.currentThread().getName() + "put(): 進貨"); available = true; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void get() { lock.lock(); try { while (!available) { // 無貨等待 condition.await(); } System.out.println(Thread.currentThread().getName() + "get():出貨"); available = false; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } // 消費者 class Consume implements Runnable { private Car car; public Consume(Car car) { this.car = car; } @Override public void run() { for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; ++i) { car.get(); try { Thread.sleep(100); } catch (InterruptedException e) { } } } } // 生產者 class Produce implements Runnable { private Car car; public Produce(Car car) { this.car = car; } @Override public void run() { for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; i++) { car.put(); } } } public class TestProduceAndConsume { public static final int LOOP_SIZE = 10; public static void main(String[] args) { Car car = new Car(); for (int i=0; i<5; ++i) { Consume consume = new Consume(car); Produce produce = new Produce(car); new Thread(consume, i + "--").start(); new Thread(produce, i + "--").start(); } } }
每個 對lock()
的調用都必須緊跟着一個 try-finally 子句,用以保證能夠在任何狀況下都能釋放鎖,任務在調用await()
、signal()
、signalAll()
以前,必須擁有鎖。
lock.lock(); try { ... // 業務代碼 } finally { lock.unlock(); }
源由:
上述講解的鎖都是讀寫一把鎖,不管是讀或寫,都是一把鎖解決,當多線程訪問數據時,若發生了一千次操做,其中的寫操做只執行了一次,數據的更新率很是低,那麼每次進行讀操做時,都要加鎖讀取」不會更改的「數據,顯然是沒必要要的開銷,所以出現了 ReadWriteLock 讀寫鎖,該對象提供讀鎖和寫鎖。
做用:
ReadWriteLock 維護了一對相關的鎖,一個用於只讀操做,另外一個用於寫入操做。只要沒有 write寫入操做,那麼多個線程能夠同時進行持有讀鎖。而寫入鎖是獨佔的,當執行寫操做時,其餘線程不可寫,也不可讀。
性能的提高取決於讀寫操做期間讀取數據相對於修改數據的頻率,若是讀取操做遠遠大於寫入操做時,便能加強併發性。
Example:
class Demo { private int value = 0; private ReadWriteLock lock = new ReentrantReadWriteLock(); public void read() { lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " : " + value); } finally { lock.readLock().unlock(); } } public void write(int value) { lock.writeLock().lock(); try { this.value = value; System.out.println("write(" + value + ")"); } finally { lock.writeLock().unlock(); } } } class ReadLock implements Runnable { private Demo demo = null; public ReadLock(Demo demo) { this.demo = demo; } @Override public void run() { for (int i=0; i<20; ++i) { demo.read(); try { Thread.sleep(320); } catch (InterruptedException e) { } } } } class WriteLock implements Runnable { private Demo demo = null; public WriteLock(Demo demo) { this.demo = demo; } @Override public void run() { for (int i=0; i<10; ++i) { demo.write(i); try { Thread.sleep(200); } catch (InterruptedException e) { } } } } public class TestReadWriteLock { public static void main(String[] args) { Demo demo = new Demo(); ReadLock readLock = new ReadLock(demo); WriteLock writeLock = new WriteLock(demo); for (int i=0; i<3; ++i) { new Thread(readLock, i + "--").start(); } new Thread(writeLock).start(); } }/**output: 0-- : 0 1-- : 0 2-- : 0 write(0) write(1) 1-- : 1 2-- : 1 0-- : 1 write(2) write(3) 1-- : 3 0-- : 3 ... */
源來:
在傳統操做中(如鏈接數據庫),當咱們須要使用一個線程的時候,就 直接建立一個線程,線程完畢後被垃圾收集器回收。每一次須要線程的時候,不斷的建立與銷燬,大大增長了資源的開銷。
做用:
線程池維護着一個線程隊列,該隊列中保存着全部等待着的線程,避免了重複的建立與銷燬而帶來的開銷。
體系結構:
Execuotr:負責線程的使用與調度的根接口。 |- ExecutorService:線程池的主要接口。 |- ForkJoinPool:採用分而治之技術將任務分解。 |- ThreadPoolExecutor:線程池的實現類。 |- ScheduledExecutorService:負責線程調度的子接口。 |- ScheduledThreadPoolExecutor:負責線程池的調度。繼承ThreadPoolExecutor並實現ScheduledExecutorService接口
Executors 工具類API描述:
方法 | 描述 |
---|---|
ExecutorService newFixedThreadPool(int nThreads) | 建立一個可重用固定數量的無界隊列線程池。使用了有限的線程集來執行所提交的全部任務。建立的時候能夠一次性預先進行代價高昂的線程分配。 |
ExecutorService newWorkStealingPool(int parallelism) | 建立一個維護足夠的線程以支持給定的parallelism並行級別的線程池。 |
ExecutorService newSingleThreadExecutor() | 建立一個使用單個線程運行的無界隊列的執行程序。 |
ExecutorService newCachedThreadPool() | 建立一個根據須要建立新線程的線程池,當有可用線程時將從新使用之前構造的線程。 |
ScheduledExecutorService newSingleThreadScheduledExecutor() | 建立一個單線程執行器,能夠調度命令在給定的延遲以後運行,或按期執行。 |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 建立一個線程池,能夠調度命令在給定的延遲以後運行,或按期執行。 |
ThreadFactory privilegedThreadFactory() | 返回一個用於建立與當前線程具備相同權限的新線程的線程工廠。 |
補充:
ExecutorService.shutdown():防止新任務被提交,並繼續運行被調用以前所提交的全部任務,待任務都完成後退出。
CachedThreadPoo在程序執行過程當中一般會建立與所需數量相同的線程,而後在它回收舊線程時中止建立新線程,是Executor的首選。僅當這個出現問題時,才需切換 FixedThreadPool。
SingleThreadExecutor: 相似於線程數量爲 1 的FixedThreadPool,但它提供了不會存在兩個及以上的線程被併發調用的併發。
Example:線程池
public class TestThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; ++i) { Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName(); } }); String threadName = future.get(); System.out.println(threadName); } pool.shutdown(); // 拒絕新任務並等待正在執行的線程完成當前任務後關閉。 } }/**output: pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-2 ... */
Example:線程調度
public class TestThreadPool { public static void main(String[] args) throws Exception { ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); for (int i = 0; i < 5; ++i) { ScheduledFuture<String> future = pool.schedule(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName() + " : " + Instant.now(); } }, 1, TimeUnit.SECONDS); // 延遲執行單位爲 1秒的任務 String result = future.get(); System.out.println(result); } pool.shutdown(); } }/**output: pool-1-thread-1 : 2019-03-18T12:10:31.260Z pool-1-thread-1 : 2019-03-18T12:10:32.381Z pool-1-thread-2 : 2019-03-18T12:10:33.382Z pool-1-thread-1 : 2019-03-18T12:10:34.383Z pool-1-thread-2 : 2019-03-18T12:10:35.387Z */
<span style="color: red">注意:若沒有執行 shutdown()
方法,則線程會一直等待而不中止。</span>
源由:
在一個線程隊列中,假如隊頭的線程因爲某種緣由致使了阻塞,那麼在該隊列中的後繼線程須要等待隊頭線程結束,只要隊頭一直阻塞,這個隊列中的全部線程都將等待。此時,可能其餘線程隊列都已經完成了任務而空閒,這種狀況下,就大大減小了吞吐量。
ForkJoin的「工做竊取」模式:
當執行一個新任務時,採用分而治之的思想,將其分解成更小的任務執行,並將分解的任務加入到線程隊列中,當某一個線程隊列沒有任務時,會隨機從其餘線程隊列中「偷取」一個任務,放入本身的隊列中執行。
Example:
// 求次方: value爲底,size爲次方數 class CountPower extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; public Long value = 0L; public int size = 0; public static final Long CRITICAL = 10L; // 閾值 public CountPower(Long value, int size) { this.value = value; this.size = size; } @Override protected Long compute() { // 當要開方的此時 小於 閾值,則計算 (視爲最小的任務單元) if(size <= CRITICAL) { Long sum = 1L; for (int i=0; i<size; ++i) { sum *= value; } return sum; } else { int mid = size / 2; // 拆分任務,並壓入線程隊列 CountPower leftPower = new CountPower(value, mid); leftPower.fork(); CountPower rightPower = new CountPower(value, size - mid); rightPower.fork(); // 將當前兩個任務返回的執行結果再相乘 return leftPower.join() * rightPower.join(); } } } public class TestForkJoinPool { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); CountPower task = new CountPower(2L, 11); Long result = pool.invoke(task); System.out.println(result); } }/**output: 2048*/
根據分而治之的思想進行分解,須要一個結束遞歸的條件,該條件內的代碼就是被分解的最小單元。使用fork()在當前任務正在運行的池中異步執行此任務,即將該任務壓入線程隊列。調用
join()`返回計算結果。RecursiveTask是有返回值的task,RecursiveAction則是沒有返回值的。