本系列研究總結高併發下的幾種同步鎖的使用以及之間的區別,分別是:ReentrantLock、CountDownLatch、CyclicBarrier、Phaser、ReadWriteLock、StampedLock、Semaphore、Exchanger、LockSupport。因爲博客園對博客字數的要求限制,會分爲三個系列:html
高併發之ReentrantLock、CountDownLatch、CyclicBarrierjava
高併發之Phaser、ReadWriteLock、StampedLocknode
高併發之Semaphore、Exchanger、LockSupport算法
ReentrantLock重入鎖,是實現Lock接口的一個類,也是在實際編程中使用頻率很高的一個鎖,支持重入性,表示可以對共享資源可以重複加鎖,即當前線程獲取該鎖再次獲取不會被阻塞。在java關鍵字synchronized隱式支持重入性(關於synchronized能夠看這篇文章),synchronized經過獲取自增,釋放自減的方式實現重入。與此同時,ReentrantLock還支持公平鎖和非公平鎖兩種方式。那麼,要想完徹底全的弄懂ReentrantLock的話,主要也就是ReentrantLock同步語義的學習:1. 重入性的實現原理;2. 公平鎖和非公平鎖。shell
要想支持重入性,就要解決兩個問題:
1. 在線程獲取鎖的時候,若是已經獲取鎖的線程是當前線程的話則直接再次獲取成功;
2. 因爲鎖會被獲取n次,那麼只有鎖在被釋放一樣的n次以後,該鎖纔算是徹底釋放成功。編程
咱們知道,同步組件主要是經過重寫AQS的幾個protected方法來表達本身的同步語義。針對第一個問題,咱們來看看ReentrantLock是怎樣實現的,以非公平鎖爲例,判斷當前線程可否得到鎖爲例,核心方法爲nonfairTryAcquire:多線程
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //1. 若是該鎖未被任何線程佔有,該鎖能被當前線程獲取 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //2.若被佔有,檢查佔有線程是不是當前線程 else if (current == getExclusiveOwnerThread()) { // 3. 再次獲取,計數加一 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
爲了支持重入性,在第二步增長了處理邏輯,若是該鎖已經被線程所佔有了,會繼續檢查佔有線程是否爲當前線程,若是是的話,同步狀態加1返回true,表示能夠再次獲取成功。每次從新獲取都會對同步狀態進行加一的操做,那麼釋放的時候處理思路是怎樣的了?(依然仍是以非公平鎖爲例)核心方法爲tryRelease:併發
protected final boolean tryRelease(int releases) { //1. 同步狀態減1 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //2. 只有當同步狀態爲0時,鎖成功被釋放,返回true free = true; setExclusiveOwnerThread(null); } // 3. 鎖未被徹底釋放,返回false setState(c); return free; }
須要注意的是,重入鎖的釋放必須得等到同步狀態爲0時鎖纔算成功釋放,不然鎖仍未釋放。若是鎖被獲取n次,釋放了n-1次,該鎖未徹底釋放返回false,只有被釋放n次纔算成功釋放,返回true。到如今咱們能夠理清ReentrantLock重入性的實現了,也就是理解了同步語義的第一條。dom
ReentrantLock支持兩種鎖:公平鎖和非公平鎖。何謂公平性,是針對獲取鎖而言的,若是一個鎖是公平的,那麼鎖的獲取順序就應該符合請求上的絕對時間順序,知足FIFO。ReentrantLock的構造方法無參時是構造非公平鎖,源碼爲:jvm
public ReentrantLock() { sync = new NonfairSync(); }
另外還提供了另一種方式,可傳入一個boolean值,true時爲公平鎖,false時爲非公平鎖,源碼爲:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
在上面非公平鎖獲取時(nonfairTryAcquire方法)只是簡單的獲取了一下當前狀態作了一些邏輯處理,並無考慮到當前同步隊列中線程等待的狀況。咱們來看看公平鎖的處理邏輯是怎樣的,核心方法爲:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
這段代碼的邏輯與nonfairTryAcquire基本上一致,惟一的不一樣在於增長了hasQueuedPredecessors的邏輯判斷,方法名就可知道該方法用來判斷當前節點在同步隊列中是否有前驅節點的判斷,若是有前驅節點說明有線程比當前線程更早的請求資源,根據公平性,當前線程請求資源失敗。若是當前節點沒有前驅節點的話,再纔有作後面的邏輯判斷的必要性。公平鎖每次都是從同步隊列中的第一個節點獲取到鎖,而非公平性鎖則不必定,有可能剛釋放鎖的線程能再次獲取到鎖。
CountDownLatch,這個詞語是由Count Down、Latch兩部分組成,意思是倒數門閂,爲了形象記憶,能夠理解爲一個門上有不少個門閂,只有全部門閂都打開,也就是爲0時才能夠經過這個門。
建立對象時須要在構造CountDownLatch中傳入一個整數n,設置門閂的個數, 在這個整數「倒數」到0以前,線程須要等待在門口,而這個「倒數」過程則是由各個執行線程驅動的,每一個線程執行完一個任務「倒數」一次(門閂減一)。總結來講,CountDownLatch的做用就是等待其餘的線程都執行完任務,必要時能夠對各個任務的執行結果進行彙總,而後線程才繼續往下執行。
CountDownLatch主要有兩個方法:countDown()和await()。countDown()方法用於使計數器減一,其通常是執行任務的線程調用,await()方法則使調用該方法的線程處於等待狀態,其通常是主線程調用。這裏須要注意的是,countDown()方法並無規定一個線程只能調用一次,當同一個線程調用屢次countDown()方法時,每次都會使計數器減一;另外,await()方法也並無規定只能有一個線程執行該方法,若是多個線程同時執行await()方法,那麼這幾個線程都將處於等待狀態,而且以共享模式享有同一個鎖。
public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); Service service = new Service(latch); Runnable task = () -> service.exec(); for (int i = 0; i < 5; i++) { Thread thread = new Thread(task); thread.start(); } System.out.println("main thread await. "); latch.await(); System.out.println("main thread finishes await. "); } }
其中的Service類以下:
public class Service { private CountDownLatch latch; //須要將countdownLatch傳遞進來 public Service(CountDownLatch latch) { this.latch = latch; } public void exec() { try { System.out.println(Thread.currentThread().getName() + " execute task. "); sleep(2); System.out.println(Thread.currentThread().getName() + " finished task. "); } finally { //放在finally中是爲了異常也能執行,不至於主線程死鎖! latch.countDown(); } } private void sleep(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } }
首先聲明瞭一個CountDownLatch對象,而且由主線程建立了5個線程,分別執行任務,在每一個任務中,當前線程會休眠2秒。在啓動線程以後,主線程調用了CountDownLatch.await()方法,此時,主線程將在此處等待建立的5個線程執行完任務以後才繼續往下執行。
以下是執行結果:
main thread await. Thread-1 execute task. Thread-3 execute task. Thread-0 execute task. Thread-2 execute task. Thread-4 execute task. Thread-2 finished task. Thread-3 finished task. Thread-1 finished task. Thread-0 finished task. Thread-4 finished task. main thread finishes await.
從輸出結果能夠看出,主線程先啓動了五個線程,而後主線程進入等待狀態,當這五個線程都執行完任務以後主線程才結束了等待。上述代碼中須要注意的是,在執行任務的線程中,使用了try...finally結構,該結構能夠保證建立的線程發生異常時CountDownLatch.countDown()方法也會執行,也就保證了主線程不會一直處於等待狀態。
CountDownLatch很是適合於對任務進行拆分,使其並行執行,好比某個任務執行2s,其對數據的請求能夠分爲五個部分,那麼就能夠將這個任務拆分爲5個子任務,分別交由五個線程執行,執行完成以後再由主線程進行彙總,此時,總的執行時間將決定於執行最慢的任務,平均來看,仍是大大減小了總的執行時間。
另一種比較合適使用CountDownLatch的地方是使用某些外部連接請求數據的時候,好比圖片,由於咱們使用的圖片服務只提供了獲取單個圖片的功能,而每次獲取圖片的時間不等,通常都須要1.5s~2s。當咱們須要批量獲取圖片的時候,好比列表頁須要展現一系列的圖片,若是使用單個線程順序獲取,那麼等待時間將會極長,此時咱們就可使用CountDownLatch對獲取圖片的操做進行拆分,並行的獲取圖片,這樣也就縮短了總的獲取時間。
CountDownLatch是基於AbstractQueuedSynchronizer實現的,在AbstractQueuedSynchronizer中維護了一個volatile類型的整數state,volatile能夠保證多線程環境下該變量的修改對每一個線程均可見,而且因爲該屬性爲整型,於是對該變量的修改也是原子的。建立一個CountDownLatch對象時,所傳入的整數n就會賦值給state屬性,當countDown()方法調用時,該線程就會嘗試對state減一,而調用await()方法時,當前線程就會判斷state屬性是否爲0,若是爲0,則繼續往下執行,若是不爲0,則使當前線程進入等待狀態,直到某個線程將state屬性置爲0,其就會喚醒在await()方法中等待的線程。
public void countDown() { sync.releaseShared(1); }
這裏sync也即一個繼承了AbstractQueuedSynchronizer的類實例,該類是CountDownLatch的一個內部類,
其聲明以下:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); // 獲取當前state屬性的值 if (c == 0) // 若是state爲0,則說明當前計數器已經計數完成,直接返回 return false; int nextc = c-1; if (compareAndSetState(c, nextc)) // 使用CAS算法對state進行設置 return nextc == 0; // 設置成功後返回當前是否爲最後一個設置state的線程 } } }
這裏tryReleaseShared(int)方法即對state屬性進行減一操做的代碼。能夠看到,CAS也即compare and set的縮寫,jvm會保證該方法的原子性,其會比較state是否爲c,若是是則將其設置爲nextc(自減1),若是state不爲c,則說明有另外的線程在getState()方法和compareAndSetState()方法調用之間對state進行了設置,當前線程也就沒有成功設置state屬性的值,其會進入下一次循環中,如此往復,直至其成功設置state屬性的值,即countDown()方法調用成功。
在countDown()方法中調用的sync.releaseShared(1)調用時實際仍是調用的tryReleaseShared(int)方法,
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
能夠看到,在執行sync.releaseShared(1)方法時,其在調用tryReleaseShared(int)方法時會在無限for循環中設置state屬性的值,設置成功以後其會根據設置的返回值(此時state已經自減了一),即當前線程是否爲將state屬性設置爲0的線程,來判斷是否執行if塊中的代碼。doReleaseShared()方法主要做用是喚醒調用了await()方法的線程。須要注意的是,若是有多個線程調用了await()方法,這些線程都是以共享的方式等待在await()方法處的,試想,若是以獨佔的方式等待,那麼當計數器減小至零時,就只有一個線程會被喚醒執行await()以後的代碼,這顯然不符合邏輯。
private void doReleaseShared() { for (;;) { Node h = head; // 記錄等待隊列中的頭結點的線程 if (h != null && h != tail) { // 頭結點不爲空,且頭結點不等於尾節點 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // SIGNAL狀態表示當前節點正在等待被喚醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 清除當前節點的等待狀態 continue; unparkSuccessor(h); // 喚醒當前節點的下一個節點 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) // 若是h仍是指向頭結點,說明前面這段代碼執行過程當中沒有其餘線程對頭結點進行過處理 break; } }
在doReleaseShared()方法中(始終注意當前方法是最後一個執行countDown()方法的線程執行的),首先判斷頭結點不爲空,且不爲尾節點,說明等待隊列中有等待喚醒的線程,這裏須要說明的是,在等待隊列中,頭節點中並無保存正在等待的線程,其只是一個空的Node對象,真正等待的線程是從頭節點的下一個節點開始存放的,於是會有對頭結點是否等於尾節點的判斷。在判斷等待隊列中有正在等待的線程以後,其會清除頭結點的狀態信息,而且調用unparkSuccessor(Node)方法喚醒頭結點的下一個節點,使其繼續往下執行。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 清除當前節點的等待狀態 Node s = node.next; if (s == null || s.waitStatus > 0) { // s的等待狀態大於0說明該節點中的線程已經被外部取消等待了 s = null; // 從隊列尾部往前遍歷,找到最後一個處於等待狀態的節點,用s記錄下來 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 喚醒離傳入節點最近的處於等待狀態的節點線程 }
能夠看到,unparkSuccessor(Node)方法的做用是喚醒離傳入節點最近的一個處於等待狀態的線程,使其繼續往下執行。前面咱們講到過,等待隊列中的線程可能有多個,而調用countDown()方法的線程只喚醒了一個處於等待狀態的線程,這裏剩下的等待線程是如何被喚醒的呢?其實這些線程是被當前喚醒的線程喚醒的。具體的咱們能夠看看await()方法的具體執行過程。
以下是await()方法的代碼:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await()方法實際仍是調用了Sync對象的方法acquireSharedInterruptibly(int)方法,以下是該方法的具體實現:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
能夠看到acquireSharedInterruptibly(int)方法判斷當前線程是否須要以共享狀態獲取執行權限,這裏tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一個模板方法,其具體實如今前面的Sync類中,能夠看到,其主要是判斷state是否爲零,若是爲零則返回1,表示當前線程不須要進行權限獲取,可直接執行後續代碼,返回-1則表示當前線程須要進行共享權限。具體的獲取執行權限的代碼在doAcquireSharedInterruptibly(int)方法中。
以下是該方法的具體實現:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); // 使用當前線程建立一個共享模式的節點 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 獲取當前節點的前一個節點 if (p == head) { // 判斷前一個節點是否爲頭結點 int r = tryAcquireShared(arg); // 查看當前線程是否獲取到了執行權限 if (r >= 0) { // 大於0表示獲取了執行權限 setHeadAndPropagate(node, r); // 將當前節點設置爲頭結點,而且喚醒後面處於等待狀態的節點 p.next = null; // help GC failed = false; return; } } // 走到這一步說明沒有獲取到執行權限,就使當前線程進入「擱置」狀態 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在doAcquireSharedInterruptibly(int)方法中,首先使用當前線程建立一個共享模式的節點。而後在一個for循環中判斷當前線程是否獲取到執行權限,若是有(r >= 0判斷)則將當前節點設置爲頭節點,而且喚醒後續處於共享模式的節點;若是沒有,則對調用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法使當前線程處於「擱置」狀態,該「擱置」狀態是由操做系統進行的,這樣能夠避免該線程無限循環而獲取不到執行權限,形成資源浪費,這裏也就是線程處於等待狀態的位置,也就是說當線程被阻塞的時候就是阻塞在這個位置。當有多個線程調用await()方法而進入等待狀態時,這幾個線程都將等待在此處。這裏回過頭來看前面將的countDown()方法,其會喚醒處於等待隊列中離頭節點最近的一個處於等待狀態的線程,也就是說該線程被喚醒以後會繼續從這個位置開始往下執行,此時執行到tryAcquireShared(int)方法時,發現r大於0(由於state已經被置爲0了),該線程就會調用setHeadAndPropagate(Node, int)方法,而且退出當前循環,也就開始執行awat()方法以後的代碼。
這裏咱們看看setHeadAndPropagate(Node, int)方法的具體實現:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); // 將當前節點設置爲頭節點 // 檢查喚醒過程是否須要往下傳遞,而且檢查頭結點的等待狀態 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 若是下一個節點是嘗試以共享狀態獲取獲取執行權限的節點,則將其喚醒 doReleaseShared(); } }
setHeadAndPropagate(Node, int)方法主要做用是設置當前節點爲頭結點,而且將喚醒工做往下傳遞,在傳遞的過程當中,其會判斷被傳遞的節點是不是以共享模式嘗試獲取執行權限的,若是不是,則傳遞到該節點處爲止(通常狀況下,等待隊列中都只會都是處於共享模式或者處於獨佔模式的節點)。也就是說,頭結點會依次喚醒後續處於共享狀態的節點,這也就是共享鎖與獨佔鎖的實現方式。這裏doReleaseShared()方法也就是咱們前面講到的會將離頭結點最近的一個處於等待狀態的節點喚醒的方法。
CyclicBarrier也叫同步屏障,在JDK1.5被引入,可讓一組線程達到一個屏障時被阻塞,直到最後一個線程達到屏障時,因此被阻塞的線程才能繼續執行。 CyclicBarrier比如一扇門,默認狀況下關閉狀態,堵住了線程執行的道路,直到全部線程都就位,門纔打開,讓全部線程一塊兒經過。
使用方法
能夠想象飛虎隊(American Volunteer Group,AVG)要執行某項任務,須要等全部人到齊以後才能開始行動
class AVG implements Runnable { private CyclicBarrier cyclicBarrier; private String name; public AVG(CyclicBarrier cyclicBarrier, String name) { this.cyclicBarrier = cyclicBarrier; this.name = name; } @Override public void run() { System.out.println(name + "就位"); try { cyclicBarrier.await(); Random random =new Random(); double time = random.nextDouble() + 9; System.out.println(name + ": "+ time); } catch (Exception e) { } } }
就位:
class Ready { private CyclicBarrier cyclicBarrier = new CyclicBarrier(8); public void start() { List<Athlete> avgList = new ArrayList<>(); athleteList.add(new AVG(cyclicBarrier,"何永道")); athleteList.add(new AVG(cyclicBarrier,"約翰·理查德·羅西")); athleteList.add(new AVG(cyclicBarrier,"查爾斯·龐德")); athleteList.add(new AVG(cyclicBarrier,"羅伯特·尼爾")); athleteList.add(new AVG(cyclicBarrier,"羅伯特·桑德爾")); athleteList.add(new AVG(cyclicBarrier,"法蘭克·史基爾")); athleteList.add(new AVG(cyclicBarrier,"約翰·牛柯克")); athleteList.add(new AVG(cyclicBarrier,"大衛·李·希爾")); Executor executor = Executors.newFixedThreadPool(8); for (AVG avgmember : avgList) { executor.execute(avgmember); } } }
CyclicBarrier實現主要基於ReentrantLock
public class CyclicBarrier { private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); ...省略後面代碼 }
其中Generation用來控制屏障的循環使用,若是generation.broken爲true的話,說明這個屏障已經損壞,當某個線程await的時候,直接拋出異常。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //利用ReentrantLock加鎖 lock.lock(); try { final Generation g = generation; //若是已經損壞,則拋出BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //即便沒有被中斷,也將完成等待,所以該中斷被視爲「屬於」後續執行。 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
解釋
從新生成Generation對象
恢復count值
舉例子:CD:司機在等人坐滿了纔開車,阻塞主體是外部線程。 CB:人在等其餘人來了再上車,阻塞主體是多個線程。