<!-- GFM-TOC -->php
<!-- GFM-TOC -->
建立後還沒有啓動。
可能正在運行,也可能正在等待 CPU 時間片。
包含了操做系統線程狀態中的 Running 和 Ready。
等待獲取一個排它鎖,若是其線程釋放了鎖就會結束此狀態。
等待其它線程顯式地喚醒,不然不會被分配 CPU 時間片。
進入方法 | 退出方法 |
---|---|
沒有設置 Timeout 參數的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
沒有設置 Timeout 參數的 Thread.join() 方法 | 被調用的線程執行完畢 |
LockSupport.park() 方法 | LockSupport.unpark(Thread) |
無需等待其它線程顯式地喚醒,在必定時間以後會被系統自動喚醒。
調用 Thread.sleep() 方法使線程進入限期等待狀態時,經常用「使一個線程睡眠」進行描述。
調用 Object.wait() 方法使線程進入限期等待或者無限期等待時,經常用「掛起一個線程」進行描述。
睡眠和掛起是用來描述行爲,而阻塞和等待用來描述狀態。
阻塞和等待的區別在於,阻塞是被動的,它是在等待獲取一個排它鎖。而等待是主動的,經過調用 Thread.sleep() 和 Object.wait() 等方法進入。
進入方法 | 退出方法 |
---|---|
Thread.sleep() 方法 | 時間結束 |
設置了 Timeout 參數的 Object.wait() 方法 | 時間結束 / Object.notify() / Object.notifyAll() |
設置了 Timeout 參數的 Thread.join() 方法 | 時間結束 / 被調用的線程執行完畢 |
LockSupport.parkNanos() 方法 | LockSupport.unpark(Thread) |
LockSupport.parkUntil() 方法 | LockSupport.unpark(Thread) |
能夠是線程結束任務以後本身結束,或者產生了異常而結束。
有三種使用線程的方法:
實現 Runnable 和 Callable 接口的類只能當作一個能夠在線程中運行的任務,不是真正意義上的線程,所以最後還須要經過 Thread 來調用。能夠說任務是經過線程驅動從而執行的。
須要實現 run() 方法。
經過 Thread 調用 start() 方法來啓動線程。
public class MyRunnable implements Runnable { public void run() { // ... } }
public static void main(String[] args) { MyRunnable instance = new MyRunnable(); Thread thread = new Thread(instance); thread.start(); }
與 Runnable 相比,Callable 能夠有返回值,返回值經過 FutureTask 進行封裝。
public class MyCallable implements Callable<Integer> { public Integer call() { return 123; } }
public static void main(String[] args) throws ExecutionException, InterruptedException { MyCallable mc = new MyCallable(); FutureTask<Integer> ft = new FutureTask<>(mc); Thread thread = new Thread(ft); thread.start(); System.out.println(ft.get()); }
一樣也是須要實現 run() 方法,由於 Thread 類也實現了 Runable 接口。
當調用 start() 方法啓動一個線程時,虛擬機會將該線程放入就緒隊列中等待被調度,當一個線程被調度時會執行該線程的 run() 方法。
public class MyThread extends Thread { public void run() { // ... } }
public static void main(String[] args) { MyThread mt = new MyThread(); mt.start(); }
實現接口會更好一些,由於:
Executor 管理多個異步任務的執行,而無需程序員顯式地管理線程的生命週期。這裏的異步是指多個任務的執行互不干擾,不須要進行同步操做。
主要有三種 Executor:
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { executorService.execute(new MyRunnable()); } executorService.shutdown(); }
守護線程是程序運行時在後臺提供服務的線程,不屬於程序中不可或缺的部分。
當全部非守護線程結束時,程序也就終止,同時會殺死全部守護線程。
main() 屬於非守護線程。
使用 setDaemon() 方法將一個線程設置爲守護線程。
public static void main(String[] args) { Thread thread = new Thread(new MyRunnable()); thread.setDaemon(true); }
Thread.sleep(millisec) 方法會休眠當前正在執行的線程,millisec 單位爲毫秒。
sleep() 可能會拋出 InterruptedException,由於異常不能跨線程傳播回 main() 中,所以必須在本地進行處理。線程中拋出的其它異常也一樣須要在本地進行處理。
public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
對靜態方法 Thread.yield() 的調用聲明瞭當前線程已經完成了生命週期中最重要的部分,能夠切換給其它線程來執行。該方法只是對線程調度器的一個建議,並且也只是建議具備相同優先級的其它線程能夠運行。
public void run() { Thread.yield(); }
一個線程執行完畢以後會自動結束,若是在運行過程當中發生異常也會提早結束。
經過調用一個線程的 interrupt() 來中斷該線程,若是該線程處於阻塞、限期等待或者無限期等待狀態,那麼就會拋出 InterruptedException,從而提早結束該線程。可是不能中斷 I/O 阻塞和 synchronized 鎖阻塞。
對於如下代碼,在 main() 中啓動一個線程以後再中斷它,因爲線程中調用了 Thread.sleep() 方法,所以會拋出一個 InterruptedException,從而提早結束線程,不執行以後的語句。
public class InterruptExample { private static class MyThread1 extends Thread { @Override public void run() { try { Thread.sleep(2000); System.out.println("Thread run"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public static void main(String[] args) throws InterruptedException { Thread thread1 = new MyThread1(); thread1.start(); thread1.interrupt(); System.out.println("Main run"); }
Main run java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at InterruptExample.lambda$main$0(InterruptExample.java:5) at InterruptExample$$Lambda$1/713338599.run(Unknown Source) at java.lang.Thread.run(Thread.java:745)
若是一個線程的 run() 方法執行一個無限循環,而且沒有執行 sleep() 等會拋出 InterruptedException 的操做,那麼調用線程的 interrupt() 方法就沒法使線程提早結束。
可是調用 interrupt() 方法會設置線程的中斷標記,此時調用 interrupted() 方法會返回 true。所以能夠在循環體中使用 interrupted() 方法來判斷線程是否處於中斷狀態,從而提早結束線程。
public class InterruptExample { private static class MyThread2 extends Thread { @Override public void run() { while (!interrupted()) { // .. } System.out.println("Thread end"); } } }
public static void main(String[] args) throws InterruptedException { Thread thread2 = new MyThread2(); thread2.start(); thread2.interrupt(); }
Thread end
調用 Executor 的 shutdown() 方法會等待線程都執行完畢以後再關閉,可是若是調用的是 shutdownNow() 方法,則至關於調用每一個線程的 interrupt() 方法。
如下使用 Lambda 建立線程,至關於建立了一個匿名內部線程。
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> { try { Thread.sleep(2000); System.out.println("Thread run"); } catch (InterruptedException e) { e.printStackTrace(); } }); executorService.shutdownNow(); System.out.println("Main run"); }
Main run java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9) at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
若是隻想中斷 Executor 中的一個線程,能夠經過使用 submit() 方法來提交一個線程,它會返回一個 Future<?> 對象,經過調用該對象的 cancel(true) 方法就能夠中斷線程。
Future<?> future = executorService.submit(() -> { // .. }); future.cancel(true);
Java 提供了兩種鎖機制來控制多個線程對共享資源的互斥訪問,第一個是 JVM 實現的 synchronized,而另外一個是 JDK 實現的 ReentrantLock。
1. 同步一個代碼塊
public void func() { synchronized (this) { // ... } }
它只做用於同一個對象,若是調用兩個對象上的同步代碼塊,就不會進行同步。
對於如下代碼,使用 ExecutorService 執行了兩個線程,因爲調用的是同一個對象的同步代碼塊,所以這兩個線程會進行同步,當一個線程進入同步語句塊時,另外一個線程就必須等待。
public class SynchronizedExample { public void func1() { synchronized (this) { for (int i = 0; i < 10; i++) { System.out.print(i + " "); } } } }
public static void main(String[] args) { SynchronizedExample e1 = new SynchronizedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> e1.func1()); executorService.execute(() -> e1.func1()); }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
對於如下代碼,兩個線程調用了不一樣對象的同步代碼塊,所以這兩個線程就不須要同步。從輸出結果能夠看出,兩個線程交叉執行。
public static void main(String[] args) { SynchronizedExample e1 = new SynchronizedExample(); SynchronizedExample e2 = new SynchronizedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> e1.func1()); executorService.execute(() -> e2.func1()); }
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
2. 同步一個方法
public synchronized void func () { // ... }
它和同步代碼塊同樣,做用於同一個對象。
3. 同步一個類
public void func() { synchronized (SynchronizedExample.class) { // ... } }
做用於整個類,也就是說兩個線程調用同一個類的不一樣對象上的這種同步語句,也會進行同步。
public class SynchronizedExample { public void func2() { synchronized (SynchronizedExample.class) { for (int i = 0; i < 10; i++) { System.out.print(i + " "); } } } }
public static void main(String[] args) { SynchronizedExample e1 = new SynchronizedExample(); SynchronizedExample e2 = new SynchronizedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> e1.func2()); executorService.execute(() -> e2.func2()); }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
4. 同步一個靜態方法
public synchronized static void fun() { // ... }
做用於整個類。
ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。
public class LockExample { private Lock lock = new ReentrantLock(); public void func() { lock.lock(); try { for (int i = 0; i < 10; i++) { System.out.print(i + " "); } } finally { lock.unlock(); // 確保釋放鎖,從而避免發生死鎖。 } } }
public static void main(String[] args) { LockExample lockExample = new LockExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> lockExample.func()); executorService.execute(() -> lockExample.func()); }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
1. 鎖的實現
synchronized 是 JVM 實現的,而 ReentrantLock 是 JDK 實現的。
2. 性能
新版本 Java 對 synchronized 進行了不少優化,例如自旋鎖等,synchronized 與 ReentrantLock 大體相同。
3. 等待可中斷
當持有鎖的線程長期不釋放鎖的時候,正在等待的線程能夠選擇放棄等待,改成處理其餘事情。
ReentrantLock 可中斷,而 synchronized 不行。
4. 公平鎖
公平鎖是指多個線程在等待同一個鎖時,必須按照申請鎖的時間順序來依次得到鎖。
synchronized 中的鎖是非公平的,ReentrantLock 默認狀況下也是非公平的,可是也能夠是公平的。
5. 鎖綁定多個條件
一個 ReentrantLock 能夠同時綁定多個 Condition 對象。
除非須要使用 ReentrantLock 的高級功能,不然優先使用 synchronized。這是由於 synchronized 是 JVM 實現的一種鎖機制,JVM 原生地支持它,而 ReentrantLock 不是全部的 JDK 版本都支持。而且使用 synchronized 不用擔憂沒有釋放鎖而致使死鎖問題,由於 JVM 會確保鎖的釋放。
當多個線程能夠一塊兒工做去解決某個問題時,若是某些部分必須在其它部分以前完成,那麼就須要對線程進行協調。
在線程中調用另外一個線程的 join() 方法,會將當前線程掛起,而不是忙等待,直到目標線程結束。
對於如下代碼,雖然 b 線程先啓動,可是由於在 b 線程中調用了 a 線程的 join() 方法,b 線程會等待 a 線程結束才繼續執行,所以最後可以保證 a 線程的輸出先於 b 線程的輸出。
public class JoinExample { private class A extends Thread { @Override public void run() { System.out.println("A"); } } private class B extends Thread { private A a; B(A a) { this.a = a; } @Override public void run() { try { a.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("B"); } } public void test() { A a = new A(); B b = new B(a); b.start(); a.start(); } }
public static void main(String[] args) { JoinExample example = new JoinExample(); example.test(); }
A B
調用 wait() 使得線程等待某個條件知足,線程在等待時會被掛起,當其餘線程的運行使得這個條件知足時,其它線程會調用 notify() 或者 notifyAll() 來喚醒掛起的線程。
它們都屬於 Object 的一部分,而不屬於 Thread。
只能用在同步方法或者同步控制塊中使用,不然會在運行時拋出 IllegalMonitorStateException。
使用 wait() 掛起期間,線程會釋放鎖。這是由於,若是沒有釋放鎖,那麼其它線程就沒法進入對象的同步方法或者同步控制塊中,那麼就沒法執行 notify() 或者 notifyAll() 來喚醒掛起的線程,形成死鎖。
public class WaitNotifyExample { public synchronized void before() { System.out.println("before"); notifyAll(); } public synchronized void after() { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("after"); } }
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); WaitNotifyExample example = new WaitNotifyExample(); executorService.execute(() -> example.after()); executorService.execute(() -> example.before()); }
before after
wait() 和 sleep() 的區別
java.util.concurrent 類庫中提供了 Condition 類來實現線程之間的協調,能夠在 Condition 上調用 await() 方法使線程等待,其它線程調用 signal() 或 signalAll() 方法喚醒等待的線程。
相比於 wait() 這種等待方式,await() 能夠指定等待的條件,所以更加靈活。
使用 Lock 來獲取一個 Condition 對象。
public class AwaitSignalExample { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void before() { lock.lock(); try { System.out.println("before"); condition.signalAll(); } finally { lock.unlock(); } } public void after() { lock.lock(); try { condition.await(); System.out.println("after"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); AwaitSignalExample example = new AwaitSignalExample(); executorService.execute(() -> example.after()); executorService.execute(() -> example.before()); }
before after
java.util.concurrent(J.U.C)大大提升了併發性能,AQS 被認爲是 J.U.C 的核心。
用來控制一個線程等待多個線程。
維護了一個計數器 cnt,每次調用 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些由於調用 await() 方法而在等待的線程就會被喚醒。
public class CountdownLatchExample { public static void main(String[] args) throws InterruptedException { final int totalThread = 10; CountDownLatch countDownLatch = new CountDownLatch(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("run.."); countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); } }
run..run..run..run..run..run..run..run..run..run..end
用來控制多個線程互相等待,只有當多個線程都到達時,這些線程纔會繼續執行。
和 CountdownLatch 類似,都是經過維護計數器來實現的。線程執行 await() 方法以後計數器會減 1,並進行等待,直到計數器爲 0,全部調用 await() 方法而在等待的線程才能繼續執行。
CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器經過調用 reset() 方法能夠循環使用,因此它才叫作循環屏障。
CyclicBarrier 有兩個構造函數,其中 parties 指示計數器的初始值,barrierAction 在全部線程都到達屏障的時候會執行一次。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
public class CyclicBarrierExample { public static void main(String[] args) { final int totalThread = 10; CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print("before.."); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.print("after.."); }); } executorService.shutdown(); } }
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
Semaphore 相似於操做系統中的信號量,能夠控制對互斥資源的訪問線程數。
如下代碼模擬了對某個服務的併發請求,每次只能有 3 個客戶端同時訪問,請求總數爲 10。
public class SemaphoreExample { public static void main(String[] args) { final int clientCount = 3; final int totalRequestCount = 10; Semaphore semaphore = new Semaphore(clientCount); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalRequestCount; i++) { executorService.execute(()->{ try { semaphore.acquire(); System.out.print(semaphore.availablePermits() + " "); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }); } executorService.shutdown(); } }
2 1 2 2 2 2 2 1 2 2
在介紹 Callable 時咱們知道它能夠有返回值,返回值經過 Future<V> 進行封裝。FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future<V> 接口,這使得 FutureTask 既能夠當作一個任務執行,也能夠有返回值。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask 可用於異步獲取執行結果或取消執行任務的場景。當一個計算任務須要執行很長時間,那麼就能夠用 FutureTask 來封裝這個任務,主線程在完成本身的任務以後再去獲取結果。
public class FutureTaskExample { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i++) { Thread.sleep(10); result += i; } return result; } }); Thread computeThread = new Thread(futureTask); computeThread.start(); Thread otherThread = new Thread(() -> { System.out.println("other task is running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); otherThread.start(); System.out.println(futureTask.get()); } }
other task is running... 4950
java.util.concurrent.BlockingQueue 接口有如下阻塞隊列的實現:
提供了阻塞的 take() 和 put() 方法:若是隊列爲空 take() 將阻塞,直到隊列中有內容;若是隊列爲滿 put() 將阻塞,直到隊列有空閒位置。
使用 BlockingQueue 實現生產者消費者問題
public class ProducerConsumer { private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); private static class Producer extends Thread { @Override public void run() { try { queue.put("product"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("produce.."); } } private static class Consumer extends Thread { @Override public void run() { try { String product = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("consume.."); } } }
public static void main(String[] args) { for (int i = 0; i < 2; i++) { Producer producer = new Producer(); producer.start(); } for (int i = 0; i < 5; i++) { Consumer consumer = new Consumer(); consumer.start(); } for (int i = 0; i < 3; i++) { Producer producer = new Producer(); producer.start(); } }
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
主要用於並行計算中,和 MapReduce 原理相似,都是把大的計算任務拆分紅多個小任務並行計算。
public class ForkJoinExample extends RecursiveTask<Integer> { private final int threshold = 5; private int first; private int last; public ForkJoinExample(int first, int last) { this.first = first; this.last = last; } @Override protected Integer compute() { int result = 0; if (last - first <= threshold) { // 任務足夠小則直接計算 for (int i = first; i <= last; i++) { result += i; } } else { // 拆分紅小任務 int middle = first + (last - first) / 2; ForkJoinExample leftTask = new ForkJoinExample(first, middle); ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last); leftTask.fork(); rightTask.fork(); result = leftTask.join() + rightTask.join(); } return result; } }
public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinExample example = new ForkJoinExample(1, 10000); ForkJoinPool forkJoinPool = new ForkJoinPool(); Future result = forkJoinPool.submit(example); System.out.println(result.get()); }
ForkJoin 使用 ForkJoinPool 來啓動,它是一個特殊的線程池,線程數量取決於 CPU 核數。
public class ForkJoinPool extends AbstractExecutorService
ForkJoinPool 實現了工做竊取算法來提升 CPU 的利用率。每一個線程都維護了一個雙端隊列,用來存儲須要執行的任務。工做竊取算法容許空閒的線程從其它線程的雙端隊列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和隊列所屬線程發生競爭。例以下圖中,Thread2 從 Thread1 的隊列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。可是若是隊列中只有一個任務時仍是會發生競爭。
若是多個線程對同一個共享數據進行訪問而不採起同步操做的話,那麼操做的結果是不一致的。
如下代碼演示了 1000 個線程同時對 cnt 執行自增操做,操做結束以後它的值有可能小於 1000。
public class ThreadUnsafeExample { private int cnt = 0; public void add() { cnt++; } public int get() { return cnt; } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; ThreadUnsafeExample example = new ThreadUnsafeExample(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
997
Java 內存模型試圖屏蔽各類硬件和操做系統的內存訪問差別,以實現讓 Java 程序在各類平臺下都能達到一致的內存訪問效果。
處理器上的寄存器的讀寫的速度比內存快幾個數量級,爲了解決這種速度矛盾,在它們之間加入了高速緩存。
加入高速緩存帶來了一個新的問題:緩存一致性。若是多個緩存共享同一塊主內存區域,那麼多個緩存的數據可能會不一致,須要一些協議來解決這個問題。
全部的變量都存儲在主內存中,每一個線程還有本身的工做內存,工做內存存儲在高速緩存或者寄存器中,保存了該線程使用的變量的主內存副本拷貝。
線程只能直接操做工做內存中的變量,不一樣線程之間的變量值傳遞須要經過主內存來完成。
Java 內存模型定義了 8 個操做來完成主內存和工做內存的交互操做。
Java 內存模型保證了 read、load、use、assign、store、write、lock 和 unlock 操做具備原子性,例如對一個 int 類型的變量執行 assign 賦值操做,這個操做就是原子性的。可是 Java 內存模型容許虛擬機將沒有被 volatile 修飾的 64 位數據(long,double)的讀寫操做劃分爲兩次 32 位的操做來進行,即 load、store、read 和 write 操做能夠不具有原子性。
有一個錯誤認識就是,int 等原子性的類型在多線程環境中不會出現線程安全問題。前面的線程不安全示例代碼中,cnt 屬於 int 類型變量,1000 個線程對它進行自增操做以後,獲得的值爲 997 而不是 1000。
爲了方便討論,將內存間的交互操做簡化爲 3 個:load、assign、store。
下圖演示了兩個線程同時對 cnt 進行操做,load、assign、store 這一系列操做總體上看不具有原子性,那麼在 T1 修改 cnt 而且尚未將修改後的值寫入主內存,T2 依然能夠讀入舊值。能夠看出,這兩個線程雖然執行了兩次自增運算,可是主內存中 cnt 的值最後爲 1 而不是 2。所以對 int 類型讀寫操做知足原子性只是說明 load、assign、store 這些單個操做具有原子性。
AtomicInteger 能保證多個線程修改的原子性。
使用 AtomicInteger 重寫以前線程不安全的代碼以後獲得如下線程安全實現:
public class AtomicExample { private AtomicInteger cnt = new AtomicInteger(); public void add() { cnt.incrementAndGet(); } public int get() { return cnt.get(); } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; AtomicExample example = new AtomicExample(); // 只修改這條語句 final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
1000
除了使用原子類以外,也可使用 synchronized 互斥鎖來保證操做的原子性。它對應的內存間交互操做爲:lock 和 unlock,在虛擬機實現上對應的字節碼指令爲 monitorenter 和 monitorexit。
public class AtomicSynchronizedExample { private int cnt = 0; public synchronized void add() { cnt++; } public synchronized int get() { return cnt; } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; AtomicSynchronizedExample example = new AtomicSynchronizedExample(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
1000
可見性指當一個線程修改了共享變量的值,其它線程可以當即得知這個修改。Java 內存模型是經過在變量修改後將新值同步回主內存,在變量讀取前從主內存刷新變量值來實現可見性的。
主要有有三種實現可見性的方式:
對前面的線程不安全示例中的 cnt 變量使用 volatile 修飾,不能解決線程不安全問題,由於 volatile 並不能保證操做的原子性。
有序性是指:在本線程內觀察,全部操做都是有序的。在一個線程觀察另外一個線程,全部操做都是無序的,無序是由於發生了指令重排序。在 Java 內存模型中,容許編譯器和處理器對指令進行重排序,重排序過程不會影響到單線程程序的執行,卻會影響到多線程併發執行的正確性。
volatile 關鍵字經過添加內存屏障的方式來禁止指令重排,即重排序時不能把後面的指令放到內存屏障以前。
也能夠經過 synchronized 來保證有序性,它保證每一個時刻只有一個線程執行同步代碼,至關因而讓線程順序執行同步代碼。
上面提到了能夠用 volatile 和 synchronized 來保證有序性。除此以外,JVM 還規定了先行發生原則,讓一個操做無需控制就能先於另外一個操做完成。
Single Thread rule
在一個線程內,在程序前面的操做先行發生於後面的操做。
Monitor Lock Rule
一個 unlock 操做先行發生於後面對同一個鎖的 lock 操做。
Volatile Variable Rule
對一個 volatile 變量的寫操做先行發生於後面對這個變量的讀操做。
Thread Start Rule
Thread 對象的 start() 方法調用先行發生於此線程的每個動做。
Thread Join Rule
Thread 對象的結束先行發生於 join() 方法返回。
Thread Interruption Rule
對線程 interrupt() 方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生,能夠經過 interrupted() 方法檢測到是否有中斷髮生。
Finalizer Rule
一個對象的初始化完成(構造函數執行結束)先行發生於它的 finalize() 方法的開始。
Transitivity
若是操做 A 先行發生於操做 B,操做 B 先行發生於操做 C,那麼操做 A 先行發生於操做 C。
多個線程無論以何種方式訪問某個類,而且在主調代碼中不須要進行同步,都能表現正確的行爲。
線程安全有如下幾種實現方式:
不可變(Immutable)的對象必定是線程安全的,不須要再採起任何的線程安全保障措施。只要一個不可變的對象被正確地構建出來,永遠也不會看到它在多個線程之中處於不一致的狀態。多線程環境下,應當儘可能使對象成爲不可變,來知足線程安全。
不可變的類型:
對於集合類型,可使用 Collections.unmodifiableXXX() 方法來獲取一個不可變的集合。
public class ImmutableExample { public static void main(String[] args) { Map<String, Integer> map = new HashMap<>(); Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map); unmodifiableMap.put("a", 1); } }
Exception in thread "main" java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at ImmutableExample.main(ImmutableExample.java:9)
Collections.unmodifiableXXX() 先對原始的集合進行拷貝,須要對集合進行修改的方法都直接拋出異常。
public V put(K key, V value) { throw new UnsupportedOperationException(); }
synchronized 和 ReentrantLock。
互斥同步最主要的問題就是線程阻塞和喚醒所帶來的性能問題,所以這種同步也稱爲阻塞同步。
互斥同步屬於一種悲觀的併發策略,老是認爲只要不去作正確的同步措施,那就確定會出現問題。不管共享數據是否真的會出現競爭,它都要進行加鎖(這裏討論的是概念模型,實際上虛擬機會優化掉很大一部分沒必要要的加鎖)、用戶態核心態轉換、維護鎖計數器和檢查是否有被阻塞的線程須要喚醒等操做。
隨着硬件指令集的發展,咱們可使用基於衝突檢測的樂觀併發策略:先進行操做,若是沒有其它線程爭用共享數據,那操做就成功了,不然採起補償措施(不斷地重試,直到成功爲止)。這種樂觀的併發策略的許多實現都不須要將線程阻塞,所以這種同步操做稱爲非阻塞同步。
樂觀鎖須要操做和衝突檢測這兩個步驟具有原子性,這裏就不能再使用互斥同步來保證了,只能靠硬件來完成。硬件支持的原子性操做最典型的是:比較並交換(Compare-and-Swap,CAS)。CAS 指令須要有 3 個操做數,分別是內存地址 V、舊的預期值 A 和新值 B。當執行操做時,只有當 V 的值等於 A,纔將 V 的值更新爲 B。
J.U.C 包裏面的整數原子類 AtomicInteger 的方法調用了 Unsafe 類的 CAS 操做。
如下代碼使用了 AtomicInteger 執行了自增的操做。
private AtomicInteger cnt = new AtomicInteger(); public void add() { cnt.incrementAndGet(); }
如下代碼是 incrementAndGet() 的源碼,它調用了 Unsafe 的 getAndAddInt() 。
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }
如下代碼是 getAndAddInt() 源碼,var1 指示對象內存地址,var2 指示該字段相對對象內存地址的偏移,var4 指示操做須要加的數值,這裏爲 1。經過 getIntVolatile(var1, var2) 獲得舊的預期值,經過調用 compareAndSwapInt() 來進行 CAS 比較,若是該字段內存地址中的值等於 var5,那麼就更新內存地址爲 var1+var2 的變量爲 var5+var4。
能夠看到 getAndAddInt() 在一個循環中進行,發生衝突的作法是不斷的進行重試。
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
若是一個變量初次讀取的時候是 A 值,它的值被改爲了 B,後來又被改回爲 A,那 CAS 操做就會誤認爲它歷來沒有被改變過。
J.U.C 包提供了一個帶有標記的原子引用類 AtomicStampedReference 來解決這個問題,它能夠經過控制變量值的版原本保證 CAS 的正確性。大部分狀況下 ABA 問題不會影響程序併發的正確性,若是須要解決 ABA 問題,改用傳統的互斥同步可能會比原子類更高效。
要保證線程安全,並非必定就要進行同步。若是一個方法原本就不涉及共享數據,那它天然就無須任何同步措施去保證正確性。
多個線程訪問同一個方法的局部變量時,不會出現線程安全問題,由於局部變量存儲在虛擬機棧中,屬於線程私有的。
public class StackClosedExample { public void add100() { int cnt = 0; for (int i = 0; i < 100; i++) { cnt++; } System.out.println(cnt); } }
public static void main(String[] args) { StackClosedExample example = new StackClosedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> example.add100()); executorService.execute(() -> example.add100()); executorService.shutdown(); }
100 100
若是一段代碼中所須要的數據必須與其餘代碼共享,那就看看這些共享數據的代碼是否能保證在同一個線程中執行。若是能保證,咱們就能夠把共享數據的可見範圍限制在同一個線程以內,這樣,無須同步也能保證線程之間不出現數據爭用的問題。
符合這種特色的應用並很多見,大部分使用消費隊列的架構模式(如「生產者-消費者」模式)都會將產品的消費過程儘可能在一個線程中消費完。其中最重要的一個應用實例就是經典 Web 交互模型中的「一個請求對應一個服務器線程」(Thread-per-Request)的處理方式,這種處理方式的普遍應用使得不少 Web 服務端應用均可以使用線程本地存儲來解決線程安全問題。
可使用 java.lang.ThreadLocal 類來實現線程本地存儲功能。
對於如下代碼,thread1 中設置 threadLocal 爲 1,而 thread2 設置 threadLocal 爲 2。過了一段時間以後,thread1 讀取 threadLocal 依然是 1,不受 thread2 的影響。
public class ThreadLocalExample { public static void main(String[] args) { ThreadLocal threadLocal = new ThreadLocal(); Thread thread1 = new Thread(() -> { threadLocal.set(1); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadLocal.get()); threadLocal.remove(); }); Thread thread2 = new Thread(() -> { threadLocal.set(2); threadLocal.remove(); }); thread1.start(); thread2.start(); } }
1
爲了理解 ThreadLocal,先看如下代碼:
public class ThreadLocalExample1 { public static void main(String[] args) { ThreadLocal threadLocal1 = new ThreadLocal(); ThreadLocal threadLocal2 = new ThreadLocal(); Thread thread1 = new Thread(() -> { threadLocal1.set(1); threadLocal2.set(1); }); Thread thread2 = new Thread(() -> { threadLocal1.set(2); threadLocal2.set(2); }); thread1.start(); thread2.start(); } }
它所對應的底層結構圖爲:
每一個 Thread 都有一個 ThreadLocal.ThreadLocalMap 對象。
/* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null;
當調用一個 ThreadLocal 的 set(T value) 方法時,先獲得當前線程的 ThreadLocalMap 對象,而後將 ThreadLocal->value 鍵值對插入到該 Map 中。
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
get() 方法相似。
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
ThreadLocal 從理論上講並非用來解決多線程併發問題的,由於根本不存在多線程競爭。
在一些場景 (尤爲是使用線程池) 下,因爲 ThreadLocal.ThreadLocalMap 的底層數據結構致使 ThreadLocal 有內存泄漏的狀況,應該儘量在每次使用 ThreadLocal 後手動調用 remove(),以免出現 ThreadLocal 經典的內存泄漏甚至是形成自身業務混亂的風險。
這種代碼也叫作純代碼(Pure Code),能夠在代碼執行的任什麼時候刻中斷它,轉而去執行另一段代碼(包括遞歸調用它自己),而在控制權返回後,原來的程序不會出現任何錯誤。
可重入代碼有一些共同的特徵,例如不依賴存儲在堆上的數據和公用的系統資源、用到的狀態量都由參數中傳入、不調用非可重入的方法等。
這裏的鎖優化主要是指 JVM 對 synchronized 的優化。
互斥同步進入阻塞狀態的開銷都很大,應該儘可能避免。在許多應用中,共享數據的鎖定狀態只會持續很短的一段時間。自旋鎖的思想是讓一個線程在請求一個共享數據的鎖時執行忙循環(自旋)一段時間,若是在這段時間內能得到鎖,就能夠避免進入阻塞狀態。
自旋鎖雖然能避免進入阻塞狀態從而減小開銷,可是它須要進行忙循環操做佔用 CPU 時間,它只適用於共享數據的鎖定狀態很短的場景。
在 JDK 1.6 中引入了自適應的自旋鎖。自適應意味着自旋的次數再也不固定了,而是由前一次在同一個鎖上的自旋次數及鎖的擁有者的狀態來決定。
鎖消除是指對於被檢測出不可能存在競爭的共享數據的鎖進行消除。
鎖消除主要是經過逃逸分析來支持,若是堆上的共享數據不可能逃逸出去被其它線程訪問到,那麼就能夠把它們當成私有數據對待,也就能夠將它們的鎖進行消除。
對於一些看起來沒有加鎖的代碼,其實隱式的加了不少鎖。例以下面的字符串拼接代碼就隱式加了鎖:
public static String concatString(String s1, String s2, String s3) { return s1 + s2 + s3; }
String 是一個不可變的類,編譯器會對 String 的拼接自動優化。在 JDK 1.5 以前,會轉化爲 StringBuffer 對象的連續 append() 操做:
public static String concatString(String s1, String s2, String s3) { StringBuffer sb = new StringBuffer(); sb.append(s1); sb.append(s2); sb.append(s3); return sb.toString(); }
每一個 append() 方法中都有一個同步塊。虛擬機觀察變量 sb,很快就會發現它的動態做用域被限制在 concatString() 方法內部。也就是說,sb 的全部引用永遠不會逃逸到 concatString() 方法以外,其餘線程沒法訪問到它,所以能夠進行消除。
若是一系列的連續操做都對同一個對象反覆加鎖和解鎖,頻繁的加鎖操做就會致使性能損耗。
上一節的示例代碼中連續的 append() 方法就屬於這類狀況。若是虛擬機探測到由這樣的一串零碎的操做都對同一個對象加鎖,將會把加鎖的範圍擴展(粗化)到整個操做序列的外部。對於上一節的示例代碼就是擴展到第一個 append() 操做以前直至最後一個 append() 操做以後,這樣只須要加鎖一次就能夠了。
JDK 1.6 引入了偏向鎖和輕量級鎖,從而讓鎖擁有了四個狀態:無鎖狀態(unlocked)、偏向鎖狀態(biasble)、輕量級鎖狀態(lightweight locked)和重量級鎖狀態(inflated)。
如下是 HotSpot 虛擬機對象頭的內存佈局,這些數據被稱爲 Mark Word。其中 tag bits 對應了五個狀態,這些狀態在右側的 state 表格中給出。除了 marked for gc 狀態,其它四個狀態已經在前面介紹過了。
下圖左側是一個線程的虛擬機棧,其中有一部分稱爲 Lock Record 的區域,這是在輕量級鎖運行過程建立的,用於存放鎖對象的 Mark Word。而右側就是一個鎖對象,包含了 Mark Word 和其它信息。
輕量級鎖是相對於傳統的重量級鎖而言,它使用 CAS 操做來避免重量級鎖使用互斥量的開銷。對於絕大部分的鎖,在整個同步週期內都是不存在競爭的,所以也就不須要都使用互斥量進行同步,能夠先採用 CAS 操做進行同步,若是 CAS 失敗了再改用互斥量進行同步。
當嘗試獲取一個鎖對象時,若是鎖對象標記爲 0 01,說明鎖對象的鎖未鎖定(unlocked)狀態。此時虛擬機在當前線程的虛擬機棧中建立 Lock Record,而後使用 CAS 操做將對象的 Mark Word 更新爲 Lock Record 指針。若是 CAS 操做成功了,那麼線程就獲取了該對象上的鎖,而且對象的 Mark Word 的鎖標記變爲 00,表示該對象處於輕量級鎖狀態。
若是 CAS 操做失敗了,虛擬機首先會檢查對象的 Mark Word 是否指向當前線程的虛擬機棧,若是是的話說明當前線程已經擁有了這個鎖對象,那就能夠直接進入同步塊繼續執行,不然說明這個鎖對象已經被其餘線程線程搶佔了。若是有兩條以上的線程爭用同一個鎖,那輕量級鎖就再也不有效,要膨脹爲重量級鎖。
偏向鎖的思想是偏向於讓第一個獲取鎖對象的線程,這個線程在以後獲取該鎖就再也不須要進行同步操做,甚至連 CAS 操做也再也不須要。
當鎖對象第一次被線程得到的時候,進入偏向狀態,標記爲 1 01。同時使用 CAS 操做將線程 ID 記錄到 Mark Word 中,若是 CAS 操做成功,這個線程之後每次進入這個鎖相關的同步塊就不須要再進行任何同步操做。
當有另一個線程去嘗試獲取這個鎖對象時,偏向狀態就宣告結束,此時撤銷偏向(Revoke Bias)後恢復到未鎖定狀態或者輕量級鎖狀態。
以爲文章不錯的歡迎關注個人WX公衆號: 程序員喬戈裏
我是 百度後臺開發工程師,哈工大計算機本碩,專一分享技術乾貨/編程資源/求職面試/成長感悟等,關注送3000G編程資源,免費下載CSDN資源。