java.util.concurrent(J.U.C)大大提升了併發性能,AQS 被認爲是 J.U.C 的核心。html
用來控制一個或者多個線程等待多個線程。java
維護了一個計數器 cnt,每次調用 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些由於調用 await() 方法而在等待的線程就會被喚醒。git
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {// 建立 totalThread 條線程
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
複製代碼
run..run..run..run..run..run..run..run..run..run..end
複製代碼
用來控制多個線程互相等待,只有當多個線程都到達時,這些線程纔會繼續執行。程序員
和 CountdownLatch 類似,都是經過維護計數器來實現的。線程執行 await() 方法以後計數器會減 1,並進行等待,直到計數器爲 0,全部調用 await() 方法而在等待的線程才能繼續執行。github
CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器經過調用 reset() 方法能夠循環使用,因此它才叫作循環屏障。算法
CyclicBarrier 有兩個構造函數,其中 parties 指示計數器的初始值,barrierAction 在全部線程都到達屏障的時候會執行一次。緩存
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
複製代碼
public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
複製代碼
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
複製代碼
Semaphore 相似於操做系統中的信號量,能夠控制對互斥資源的訪問線程數。安全
如下代碼模擬了對某個服務的併發請求,每次只能有 3 個客戶端同時訪問,請求總數爲 10。bash
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
複製代碼
2 1 2 2 2 2 2 1 2 2
複製代碼
在介紹 Callable 時咱們知道它能夠有返回值,返回值經過 Future 進行封裝。FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既能夠當作一個任務執行,也能夠有返回值。服務器
public class FutureTask<V> implements RunnableFuture<V> 複製代碼
public interface RunnableFuture<V> extends Runnable, Future<V> 複製代碼
FutureTask 可用於異步獲取執行結果或取消執行任務的場景。當一個計算任務須要執行很長時間,那麼就能夠用 FutureTask 來封裝這個任務,主線程在完成本身的任務以後再去獲取結果。
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});
Thread computeThread = new Thread(futureTask);
computeThread.start();
Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
複製代碼
other task is running...
4950
複製代碼
java.util.concurrent.BlockingQueue 接口有如下阻塞隊列的實現:
提供了阻塞的 take() 和 put() 方法:若是隊列爲空 take() 將阻塞,直到隊列中有內容;若是隊列爲滿 put() 將阻塞,直到隊列有空閒位置。
使用 BlockingQueue 實現生產者消費者問題
public class ProducerConsumer {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("produce..");
}
}
private static class Consumer extends Thread {
@Override
public void run() {
try {
String product = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
}
}
}
複製代碼
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
}
}
複製代碼
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
複製代碼
主要用於並行計算中,和 MapReduce 原理相似,都是把大的計算任務拆分紅多個小任務並行計算。
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任務足夠小則直接計算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分紅小任務
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
複製代碼
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
複製代碼
ForkJoin 使用 ForkJoinPool 來啓動,它是一個特殊的線程池,線程數量取決於 CPU 核數。
public class ForkJoinPool extends AbstractExecutorService 複製代碼
ForkJoinPool 實現了工做竊取算法來提升 CPU 的利用率。每一個線程都維護了一個雙端隊列(在線性表的兩端進行插入和刪除),用來存儲須要執行的任務。工做竊取算法容許空閒的線程從其它線程的雙端隊列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和隊列所屬線程發生競爭。例以下圖中,Thread2 從 Thread1 的隊列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。可是若是隊列中只有一個任務時仍是會發生競爭。
若是多個線程對同一個共享數據進行訪問而不採起同步操做的話,那麼操做的結果是不一致的。
如下代碼演示了 1000 個線程同時對 cnt 執行自增操做,操做結束以後它的值有可能小於 1000。
public class ThreadUnsafeExample {
private int cnt = 0;
public void add() {
cnt++;
}
public int get() {
return cnt;
}
}
複製代碼
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
ThreadUnsafeExample example = new ThreadUnsafeExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
複製代碼
997
複製代碼
Java 內存模型試圖屏蔽各類硬件和操做系統的內存訪問差別,以實現讓 Java 程序在各類平臺下都能達到一致的內存訪問效果。
處理器上的寄存器的讀寫的速度比內存快幾個數量級,爲了解決這種速度矛盾,在它們之間加入了高速緩存。
加入高速緩存帶來了一個新的問題:緩存一致性。若是多個緩存共享同一塊主內存區域,那麼多個緩存的數據可能會不一致,須要一些協議來解決這個問題。
全部的變量都存儲在主內存中,每一個線程還有本身的工做內存,工做內存存儲在高速緩存或者寄存器中,保存了該線程使用的變量的主內存副本拷貝。線程只能直接操做工做內存中的變量,不一樣線程之間的變量值傳遞須要經過主內存來完成。
Java 內存模型定義了 8 個操做來完成主內存和工做內存的交互操做。
代表此操做是不可分割的,不可中斷,要所有執行,要麼所有不執行。
可見性指當一個線程修改了共享變量的值,其它線程可以當即得知這個修改。
主要有三種實現可見性的方式:
對前面的線程不安全示例中的 cnt 變量使用 volatile 修飾,不能解決線程不安全問題,由於 volatile 並不能保證操做的原子性。
有序性是指:在本線程內觀察,全部操做都是有序的。在一個線程觀察另外一個線程,全部操做都是無序的,無序是由於發生了指令重排序。
volatile 關鍵字經過添加內存屏障的方式來禁止指令重排,即重排序時不能把後面的指令放到內存屏障以前。
也能夠經過 synchronized 來保證有序性,它保證每一個時刻只有一個線程執行同步代碼,至關因而讓線程順序執行同步代碼。
上面提到了能夠用 volatile 和 synchronized 來保證有序性。除此以外,JVM 還規定了先行發生原則,讓一個操做無需控制就能先於另外一個操做完成。
在一個線程內,在程序前面的操做先行發生於後面的操做。
一個 unlock 操做先行發生於後面對同一個鎖的 lock 操做。
對一個 volatile 變量的寫操做先行發生於後面對這個變量的讀操做。
Thread 對象的 start() 方法調用先行發生於此線程的每個動做。
Thread 對象的結束先行發生於 join() 方法返回。
對線程 interrupt() 方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生,能夠經過 interrupted() 方法檢測到是否有中斷髮生。
一個對象的初始化完成(構造函數執行結束)先行發生於它的 finalize() 方法的開始。
finalize()是Object中的方法,當垃圾回收器將要回收對象所佔內存以前被調用,即當一個對象被虛擬機宣告死亡時會先調用它finalize()方法,讓此對象處理它生前的最後事情(這個對象能夠趁這個時機掙脫死亡的命運)。
若是操做 A 先行發生於操做 B,操做 B 先行發生於操做 C,那麼操做 A 先行發生於操做 C。
多個線程無論以何種方式訪問某個類,而且在主調代碼中不須要進行同步,都能表現正確的行爲。
線程安全有如下幾種實現方式:
不可變(Immutable)的對象必定是線程安全的,不須要再採起任何的線程安全保障措施。只要一個不可變的對象被正確地構建出來,永遠也不會看到它在多個線程之中處於不一致的狀態。多線程環境下,應當儘可能使對象成爲不可變,來知足線程安全。
不可變的類型:
對於集合類型,可使用 Collections.unmodifiableXXX() 方法來獲取一個不可變的集合。
public class ImmutableExample {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
複製代碼
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at ImmutableExample.main(ImmutableExample.java:9)
複製代碼
Collections.unmodifiableXXX() 先對原始的集合進行拷貝,須要對集合進行修改的方法都直接拋出異常。
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
複製代碼
synchronized 和 ReentrantLock。
互斥同步最主要的問題就是線程阻塞和喚醒所帶來的性能問題,所以這種同步也稱爲阻塞同步。
互斥同步屬於一種悲觀的併發策略,老是認爲只要不去作正確的同步措施,那就確定會出現問題。不管共享數據是否真的會出現競爭,它都要進行加鎖(這裏討論的是概念模型,實際上虛擬機會優化掉很大一部分沒必要要的加鎖)、用戶態核心態轉換、維護鎖計數器和檢查是否有被阻塞的線程須要喚醒等操做。
隨着硬件指令集的發展,咱們可使用基於衝突檢測的樂觀併發策略:先進行操做,若是沒有其它線程爭用共享數據,那操做就成功了,不然採起補償措施(不斷地重試,直到成功爲止)。這種樂觀的併發策略的許多實現都不須要將線程阻塞,所以這種同步操做稱爲非阻塞同步。
CAS 指令須要有 3 個操做數,分別是內存地址 V、舊的預期值 A 和新值 B。當執行操做時,只有當 V 的值等於 A,纔將 V 的值更新爲 B。
CAS的語義是「我認爲V的值應該爲A,若是是,那麼將V的值更新爲B,不然不修改並告訴V的值實際爲多少」,CAS是項 樂觀鎖 技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。
J.U.C 包裏面的整數原子類 AtomicInteger 的方法調用了 Unsafe 類的 CAS 操做。
如下代碼使用了 AtomicInteger 執行了自增的操做。
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
複製代碼
如下代碼是 incrementAndGet() 的源碼,它調用了 Unsafe 的 getAndAddInt() 。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
複製代碼
如下代碼是 getAndAddInt() 源碼,var1 指示對象內存地址,var2 指示該字段相對對象內存地址的偏移,var4 指示操做須要加的數值,這裏爲 1。經過 getIntVolatile(var1, var2) 獲得舊的預期值,經過調用 compareAndSwapInt() 來進行 CAS 比較,若是該字段內存地址中的值等於 var5,那麼就更新內存地址爲 var1+var2 的變量爲 var5+var4。
能夠看到 getAndAddInt() 在一個循環中進行,發生衝突的作法是不斷的進行重試。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
複製代碼
若是一個變量初次讀取的時候是 A 值,它的值被改爲了 B,後來又被改回爲 A,那 CAS 操做就會誤認爲它歷來沒有被改變過。
J.U.C 包提供了一個帶有標記的原子引用類 AtomicStampedReference 來解決這個問題,它能夠經過控制變量值的版原本保證 CAS 的正確性。大部分狀況下 ABA 問題不會影響程序併發的正確性,若是須要解決 ABA 問題,改用傳統的互斥同步可能會比原子類更高效。
要保證線程安全,並非必定就要進行同步。若是一個方法原本就不涉及共享數據,那它天然就無須任何同步措施去保證正確性。
多個線程訪問同一個方法的局部變量時,不會出現線程安全問題,由於局部變量存儲在虛擬機棧中,屬於線程私有的。
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
}
複製代碼
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
}
複製代碼
100
100
複製代碼
若是一段代碼中所須要的數據必須與其餘代碼共享,那就看看這些共享數據的代碼是否能保證在同一個線程中執行。若是能保證,咱們就能夠把共享數據的可見範圍限制在同一個線程以內,這樣,無須同步也能保證線程之間不出現數據爭用的問題。
符合這種特色的應用並很多見,大部分使用消費隊列的架構模式(如「生產者-消費者」模式)都會將產品的消費過程儘可能在一個線程中消費完。其中最重要的一個應用實例就是經典 Web 交互模型中的「一個請求對應一個服務器線程」(Thread-per-Request)的處理方式,這種處理方式的普遍應用使得不少 Web 服務端應用均可以使用線程本地存儲來解決線程安全問題。
可使用 java.lang.ThreadLocal 類來實現線程本地存儲功能。
對於如下代碼,thread1 中設置 threadLocal 爲 1,而 thread2 設置 threadLocal 爲 2。過了一段時間以後,thread1 讀取 threadLocal 依然是 1,不受 thread2 的影響。
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
複製代碼
1
複製代碼
這種代碼也叫作純代碼(Pure Code),能夠在代碼執行的任什麼時候刻中斷它,轉而去執行另一段代碼(包括遞歸調用它自己),而在控制權返回後,原來的程序不會出現任何錯誤。
可重入代碼有一些共同的特徵,例如不依賴存儲在堆上的數據和公用的系統資源、用到的狀態量都由參數中傳入、不調用非可重入的方法等。
這裏的鎖優化主要是指 JVM 對 synchronized 的優化。
互斥同步進入阻塞狀態的開銷都很大,應該儘可能避免。在許多應用中,共享數據的鎖定狀態只會持續很短的一段時間。自旋鎖的思想是讓一個線程在請求一個共享數據的鎖時執行忙循環(自旋)一段時間,若是在這段時間內能得到鎖,就能夠避免進入阻塞狀態。
忙循環: 忙循環就是程序員用循環讓一個線程等待,不像傳統方法wait(), sleep() 或 yield() 它們都放棄了CPU控制,而忙循環不會放棄CPU,它就是在運行一個空循環。這麼作的目的是爲了保留CPU緩存。
自旋鎖雖然能避免進入阻塞狀態從而減小開銷,可是它須要進行忙循環操做佔用 CPU 時間,它只適用於共享數據的鎖定狀態很短的場景。
在 JDK 1.6 中引入了自適應的自旋鎖。自適應意味着自旋的次數再也不固定了,而是由前一次在同一個鎖上的自旋次數及鎖的擁有者的狀態來決定。
鎖消除是指對於被檢測出不可能存在競爭的共享數據的鎖進行消除。
鎖消除主要是經過逃逸分析來支持,若是堆上的共享數據不可能逃逸出去被其它線程訪問到,那麼就能夠把它們當成私有數據對待,也就能夠將它們的鎖進行消除。
對於一些看起來沒有加鎖的代碼,其實隱式的加了不少鎖。例以下面的字符串拼接代碼就隱式加了鎖:
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}
複製代碼
String 是一個不可變的類,編譯器會對 String 的拼接自動優化。在 JDK 1.5 以前,會轉化爲 StringBuffer 對象的連續 append() 操做:
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}
複製代碼
每一個 append() 方法中都有一個同步塊。虛擬機觀察變量 sb,很快就會發現它的動態做用域被限制在 concatString() 方法內部。也就是說,sb 的全部引用永遠不會逃逸到 concatString() 方法以外,其餘線程沒法訪問到它,所以能夠進行消除。
若是一系列的連續操做都對同一個對象反覆加鎖和解鎖,頻繁的加鎖操做就會致使性能損耗。
上一節的示例代碼中連續的 append() 方法就屬於這類狀況。若是虛擬機探測到由這樣的一串零碎的操做都對同一個對象加鎖,將會把加鎖的範圍擴展(粗化)到整個操做序列的外部。對於上一節的示例代碼就是擴展到第一個 append() 操做以前直至最後一個 append() 操做以後,這樣只須要加鎖一次就能夠了。
輕量級鎖不是爲了代替重量級鎖,它的本意是在沒有多線程競爭的前提下,減小傳統的重量級鎖使用操做系統互斥量產生的性能消耗
偏向鎖的思想是偏向於讓第一個獲取鎖對象的線程,這個線程在以後獲取該鎖就再也不須要進行同步操做,甚至連 CAS 操做也再也不須要。
本文參考聊聊併發(八)——Fork/Join 框架介紹、線程通訊、Threads and Locks、Threads and Locks、CS-Notes、Java內存模型 之三個特性、輕量級鎖、、thread state java