原文:慕課網高併發實戰(七)- J.U.C之AQShtml
在【併發編程】【JDK源碼】AQS (AbstractQueuedSynchronizer)(1/2)中簡要介紹了AQS的概念和基本原理,下面繼續對AQS進行分析。java
一、首先 AQS內部維護了一個CLH隊列,來管理鎖。
二、線程嘗試獲取鎖,若是獲取失敗,則將等待信息等包裝成一個Node結點,加入到同步隊列Sync queue裏。
三、不斷從新嘗試獲取鎖(當前結點爲head的直接後繼纔會 嘗試),若是獲取失敗,則會阻塞本身,直到被喚醒。
四、當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程。數據庫
下面幾個主要同步組件:編程
CountDownLatch
Semaphore
CyclicBarrier安全ReentrantLock
Condition
FutureTask數據結構
同步阻塞類,能夠完成阻塞線程的功能
多線程
程序執行須要等待某個條件完成後,才能進行後面的操做。好比父任務等待全部子任務都完成的時候,再繼續往下進行。
實例一併發
@Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { // 爲防止出現異常,放在finally更保險一些 countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
實例二
好比有多個線程完成一個任務,可是這個任務只想給他一個指定的時間,超過這個任務就不繼續等待了。完成多少算多少。框架
@Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; // 放在這裏沒有用的,由於這時候仍是在主線程中阻塞,阻塞完之後纔開始執行下面的await // Thread.sleep(1); exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } // 等待指定的時間 參數1:等待時間 參數2:時間單位 countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); // 並非第一時間內銷燬掉全部線程,而是先讓正在執行線程執行完 exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }
僅能提供有限訪問的資源:好比數據庫的鏈接數最大隻有20,而上層的併發數遠遠大於20,這時候若是不作限制,可能會因爲沒法獲取鏈接而致使併發異常,這時候可使用Semaphore來進行控制,當信號量設置爲1的時候,就和單線程很類似了。高併發
實例一:每次獲取1個許可
@Slf4j public class SemaphoreExample1 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); // 獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
實例2:一次性獲取多個許可
@Slf4j public class SemaphoreExample2 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(3); // 獲取多個許可 test(threadNum); semaphore.release(3); // 釋放多個許可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
實例三:併發很高,想要超過容許的併發數以後,就拋棄
@Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try{ if (semaphore.tryAcquire()) { // 嘗試獲取一個許可 // 本例中只有一個三個線程能夠執行到這裏 test(threadNum); semaphore.release(); // 釋放一個許可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
下面是Semaphore的方法列表
嘗試獲取獲取許可的時候等一段時間
嘗試獲取獲取許可的次數以及超時時間均可以設置
@Slf4j public class SemaphoreExample4 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
同步輔助類,容許一組線程相互等待,知道全部線程都準備就緒後,才能繼續操做,當某個線程調用了await方法以後,就會進入等待狀態,並將計數器-1,直到全部線程調用await方法使計數器爲0,才能夠繼續執行,因爲計數器能夠重複使用,因此咱們又叫他循環屏障。
使用場景
多線程計算數據,最後合併計算結果的應用場景,好比用Excel保存了用戶的銀行流水,每一頁保存了一個用戶近一年的每一筆銀行流水,如今須要統計用戶的日均銀行流水,這時候咱們就能夠用多線程處理每一頁裏的銀行流水,都執行完之後,獲得每個頁的日均銀行流水,以後經過CyclicBarrier的action,利用這些線程的計算結果,計算出整個excel的日均流水。
CyclicBarrier與CountDownLatch區別
一、CyclicBarrier能夠重複使用(使用reset方法),CountDownLatch只能用一次
二、CountDownLatch主要用於實現一個或n個線程須要等待其餘線程完成某項操做以後,才能繼續往下執行,描述的是一個或n個線程等待其餘線程的關係,而CyclicBarrier是多個線程相互等待,知道知足條件之後再一塊兒往下執行。描述的是多個線程相互等待的場景
實例一:能夠設置等待時間
@Slf4j public class CyclicBarrierExample1 { // 1.給定一個值,說明有多少個線程同步等待 private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; // 延遲1秒,方便觀察 Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); // 2.使用await方法進行等待 barrier.await(); log.info("{} continue", threadNum); } }
實例二
@Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { // 因爲狀態可能會改變,因此會拋出BarrierException異常,若是想繼續往下執行,須要加上try-catch barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
實例三
@Slf4j public class CyclicBarrierExample3 { private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { // 當線程所有到達屏障時,優先執行這裏的runable log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
Java一共分爲兩類鎖,一類是由synchornized修飾的鎖,還有一種是JUC裏提供的鎖,核心就是ReentrantLock
synchornized與ReentrantLock的區別對比:
對比維度 | synchornized | ReentrantLock |
---|---|---|
可重入性(進入鎖的時候計數器自增1) | 可重入 | 可重入 |
鎖的實現 | JVM實現,很難操做源碼,獲得實現 | JDK實現 |
性能 | 在引入輕量級鎖後性能大大提高,建議均可以選擇的時候選擇synchornized | - |
功能區別 | 方便簡潔,由編譯器負責加鎖和釋放鎖 | 手工操做 |
粗粒度,不靈活 | 細粒度,可靈活控制 | - |
能否指定公平所 | 不能夠 | 能夠 |
能否放棄鎖 | 不能夠 | 能夠 |
ReentrantLock獨有的功能
能夠指定是公平鎖仍是非公平鎖
提供了一個Condition類,能夠分組喚醒須要喚醒的線程
提供可以中斷等待鎖的線程的機制,lock.lockInterruptibly()
ReentrantLock實現:自旋鎖,循環調用CAS操做來實現加鎖,避免了使線程進入內核態的阻塞狀態。想辦法組織線程進入內核態的阻塞狀態,是咱們分析和理解鎖的關鍵鑰匙。
基本用法
@Slf4j @ThreadSafe public class LockExample2 { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
源碼分析
默認使用非公平鎖,能夠傳入true和false來使用公平所仍是非公平鎖。
tryLock,能夠設置等待時間,或者直接返回
在沒有任何讀寫鎖的時候才能取得寫入的鎖,可用於實現悲觀讀取,讀多寫少的場景下可能會出現線程飢餓。
@Slf4j public class LockExample3 { private final Map<String, Data> map = new TreeMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public Set<String> getAllKeys() { readLock.lock(); try { return map.keySet(); } finally { readLock.unlock(); } } // 在沒有任何讀寫鎖的時候才能夠進行寫入操做 public Data put(String key, Data value) { writeLock.lock(); try { return map.put(key, value); } finally { readLock.unlock(); } } class Data { } }
StempedLock控制鎖有三種形式,分別是寫,讀,和樂觀讀,重點在樂觀鎖。一個StempedLock,狀態是由版本和模式兩個部分組成;鎖獲取的方法返回的是一個數字做爲票據(Stempe),他用相應的鎖狀態來表示並控制相關的訪問,數字0表示沒有寫鎖被受權訪問;在讀鎖上分爲悲觀讀和樂觀讀;
樂觀讀:若是讀的操做不少,寫操做不多的狀況下,咱們能夠樂觀的認爲,讀寫同時發生的概率很小,所以不悲觀的使用讀取鎖定很小,程序能夠在查看相關的狀態以後,判斷有沒有寫操做的變動,再採起相應的措施,這一小小的改進,能夠大大提高執行效率。
源碼案例解釋
import java.util.concurrent.locks.StampedLock; public class LockExample4 { class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //下面看看樂觀讀鎖案例 double distanceFromOrigin() { // A read-only method long stamp = sl.tryOptimisticRead(); //得到一個樂觀讀鎖 double currentX = x, currentY = y; //將兩個字段讀入本地局部變量 if (!sl.validate(stamp)) { //檢查發出樂觀讀鎖後同時是否有其餘寫鎖發生? stamp = sl.readLock(); //若是沒有,咱們再次得到一個讀悲觀鎖 try { currentX = x; // 將兩個字段讀入本地局部變量 currentY = y; // 將兩個字段讀入本地局部變量 } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } //下面是悲觀讀鎖案例 void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { //循環,檢查當前狀態是否符合 long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉爲寫鎖 if (ws != 0L) { //這是確認轉爲寫鎖是否成功 stamp = ws; //若是成功 替換票據 x = newX; //進行狀態改變 y = newY; //進行狀態改變 break; } else { //若是不能成功轉換爲寫鎖 sl.unlockRead(stamp); //咱們顯式釋放讀鎖 stamp = sl.writeLock(); //顯式直接進行寫鎖 而後再經過循環再試 } } } finally { sl.unlock(stamp); //釋放讀鎖或寫鎖 } } } }
簡單使用
@Slf4j @ThreadSafe public class LockExample5 { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; public static int count = 0; private final static StampedLock lock = new StampedLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { // 會返回一個stamp的值 long stamp = lock.writeLock(); try { count++; } finally { //釋放的時候要釋放 lock.unlock(stamp); } } }
總結關於鎖的幾個類:
synchronized:JVM實現,不但能夠經過一些監控工具監控,並且在出現未知異常的時候JVM也會自動幫咱們釋放鎖
ReentrantLock、ReentrantRead/WriteLock、StempedLock 他們都是對象層面的鎖定,要想保證鎖必定被釋放,要放到finally裏面,纔會更安全一些;StempedLock對性能有很大的改進,特別是在讀線程愈來愈多的狀況下,StempedLock有一個複雜的API。要注意使用
如何使用:
1.在只有少許競爭者的時候,synchronized是一個很好的鎖的實現
2.競爭者很多,可是增加量是能夠競爭的,ReentrantLock是一個很好的鎖的實現(適合本身的纔是最好的,不是越高級越好)
@Slf4j public class LockExample6 { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); // 從reentrantLock實例裏獲取了condition Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { // 線程1調用了lock方法,加入到了AQS的等待隊裏裏面去 reentrantLock.lock(); log.info("wait signal"); // 1 等待信號 // 調用await方法後,從AQS隊列裏移除了,進入到了condition隊列裏面去,等待一個信號 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("get signal"); // 4 獲得信號 // 線程1釋放鎖 reentrantLock.unlock(); }).start(); new Thread(() -> { // 線程1await釋放鎖之後,這裏就獲取了鎖,加入到了AQS等待隊列中 reentrantLock.lock(); log.info("get lock"); // 2 獲取鎖 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //調用signalAll發送信號的方法,Condition節點的線程1節點元素被取出,放在了AQS等待隊列裏(注意並無被喚醒) condition.signalAll(); log.info("send signal ~ "); // 3 發送信號 // 線程2釋放鎖,這時候AQS隊列中只剩下線程1,線程1開始執行 reentrantLock.unlock(); }).start(); } }