#0 系列目錄#java
#1 什麼是同步器# 多線程併發的執行,之間經過某種 共享 狀態來同步,只有當狀態知足 xxxx 條件,才能觸發線程執行 xxxx 。
這個共同的語義能夠稱之爲同步器
。能夠認爲全部的鎖機制均可以基於同步器定製來實現的
。node
而juc(java.util.concurrent)裏的思想是 將這些場景抽象出來的語義經過統一的同步框架來支持。juc 裏全部的這些鎖機制都是基於 AQS ( AbstractQueuedSynchronizer )框架上構建的
。下面簡單介紹下 AQS( AbstractQueuedSynchronizer )。windows
咱們來看下java.util.concurrent.locks大體結構:多線程
上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上
,爲什麼圖中沒有用UML線表示呢,這是每一個Lock實現類都持有本身內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)
。爲什麼要實現不一樣的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制
。下文會舉例說明不一樣同步器內的Sync與state實現。併發
#2 AQS框架如何構建同步器# ##2.1 同步器的基本功能## 一個同步器至少須要包含兩個功能:框架
根據做者論文, aqs 同步機制同時考慮了以下需求:ui
獨佔鎖和共享鎖兩種機制
。若是須要取消,須要支持中斷
。若是有超時要求,應該支持超時後中斷的機制
。##2.2 同步狀態的獲取與釋放## AQS實現了一個同步器的基本結構,下面以獨佔鎖與共享鎖分
開討論,來講明AQS怎樣實現獲取、釋放同步狀態。this
###2.2.1 獨佔模式### 獨佔獲取: tryAcquire 自己不會阻塞線程,若是返回 true 成功就繼續,若是返回 false 那麼就阻塞線程並加入阻塞隊列。操作系統
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取失敗,則加入等待隊列 selfInterrupt(); }
獨佔且可中斷模式獲取:支持中斷取消.net
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
獨佔且支持超時模式獲取: 帶有超時時間,若是通過超時時間則會退出。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
獨佔模式釋放:釋放成功會喚醒後續節點
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
###2.2.2 共享模式### 共享模式獲取
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
可中斷模式共享獲取
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
共享模式帶定時獲取
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
共享鎖釋放
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
注意以上框架只定義了一個同步器的基本結構框架,基本方法裏依賴的 tryAcquire 、 tryRelease 、tryAcquireShared 、 tryReleaseShared 四個方法在 AQS 裏沒有實現
,這四個方法不會涉及線程阻塞,而是由各自不一樣的使用場景根據狀況來定製:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
從以上源碼能夠看出AQS實現基本的功能: AQS雖然實現了acquire,和release方法是可能阻塞的,可是裏面調用的tryAcquire和tryRelease是由子類來定製的且是不阻塞的
。能夠認爲同步狀態的維護、獲取、釋放動做是由子類實現的功能,而動做成功與否的後續行爲時有AQS框架來實現
。
##2.3 狀態獲取、釋放成功或失敗的後續行爲:線程的阻塞、喚醒機制## 有別於wait和notiry。這裏利用 jdk1.5 開始提供的 LockSupport.park() 和 LockSupport.unpark() 的本地方法實現,實現線程的阻塞和喚醒
。
獲得鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(這幾個native方法的實現代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,可是關鍵代碼park的最終實現是和操做系統相關的,好比windows下實現是在os_windows.cpp中,有興趣的同窗能夠下載jdk源碼查看)。喚醒一個被park()線程主要手段包括如下幾種:
以被park()線程爲參數
的unpark(Thread thread)。中斷被park()線程
,如waiters.peek().interrupt();waiters爲存儲線程對象的隊列。park()方法返回並不會報告究竟是上訴哪一種返回,因此返回後最好檢查下線程狀態,如:
LockSupport.park(); // 禁用當前線程 if(Thread.interrupted){ //doSomething }
AbstractQueuedSynchronizer(AQS)對於這點實現得至關巧妙,以下所示:
private void doAcquireSharedInterruptibly(int arg)throwsInterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } //parkAndCheckInterrupt()會返回park住的線程在被unpark後的線程狀態,若是線程中斷,跳出循環。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // 只有線程被interrupt後纔會走到這裏 cancelAcquire(node); throw new InterruptedException(); } //在park()住的線程被unpark()後,第一時間返回當前線程是否被打斷 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
##2.4 線程阻塞隊列的維護## 阻塞線程節點隊列 CHL Node queue 。
根據論文裏描述, AQS 裏將阻塞線程封裝到一個內部類 Node 裏。並維護一個 CHL Node FIFO 隊列
。 CHL隊列是一個非阻塞的 FIFO 隊列
,也就是說往裏面插入或移除一個節點的時候,在併發條件下不會阻塞,而是經過自旋鎖和 CAS 保證節點插入和移除的原子性
。實現無鎖且快速的插入。CHL隊列對應代碼以下:
/** * CHL頭節點 */ private transient volatile Node head; /** * CHL尾節點 */ private transient volatile Node tail;
Node節點是對Thread的一個封裝,結構大概以下:
static final class Node { /** 表明線程已經被取消*/ static final int CANCELLED = 1; /** 表明後續節點須要喚醒 */ static final int SIGNAL = -1; /** 表明線程在等待某一條件/ static final int CONDITION = -2; /** 標記是共享模式*/ static final Node SHARED = new Node(); /** 標記是獨佔模式*/ static final Node EXCLUSIVE = null; /** * 狀態位 ,分別可使CANCELLED、SINGNAL、CONDITION、0 */ volatile int waitStatus; /** * 前置節點 */ volatile Node prev; /** * 後續節點 */ volatile Node next; /** * 節點表明的線程 */ volatile Thread thread; /** *鏈接到等待condition的下一個節點 */ Node nextWaiter; }
##2.5 小結## 從源碼能夠看出AQS實現基本的功能:
AQS雖然實現了acquire,和release方法,可是裏面調用的tryAcquire和tryRelease是由子類來定製的。能夠認爲同步狀態的維護、獲取、釋放動做是由子類實現的功能,而動做成功與否的後續行爲時有AQS框架來實現,還有如下一些私有方法,用於輔助完成以上的功能:
final boolean acquireQueued(final Node node, int arg) :申請隊列 private Node enq(final Node node) : 入隊 private Node addWaiter(Node mode) :以mode建立建立節點,並加入到隊列 private void unparkSuccessor(Node node) : 喚醒節點的後續節點,若是存在的話。 private void doReleaseShared() :釋放共享鎖 private void setHeadAndPropagate(Node node, int propagate):設置頭,而且若是是共享模式且propagate大於0,則喚醒後續節點。 private void cancelAcquire(Node node) :取消正在獲取的節點 private static void selfInterrupt() :自我中斷 private final boolean parkAndCheckInterrupt() : park 並判斷線程是否中斷
#3 AQS在各同步器內的Sync與State實現# ##3.1 什麼是state機制## 提供 volatile 變量 state; 用於同步線程之間的共享狀態。經過 CAS 和 volatile 保證其原子性和可見性
。對應源碼裏的定義:
/** * 同步狀態 */ private volatile int state; /** *cas */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
##3.2 不一樣實現類的Sync與State## 基於AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不同而已
。
###3.2.1 ReentrantLock### 須要記錄當前線程獲取原子狀態的次數,若是次數爲零,那麼就說明這個線程放棄了鎖(也有可能其餘線程佔據着鎖從而須要等待),若是次數大於1,也就是得到了重進入的效果
,而其餘線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。如下爲ReetranLock的FairSync的tryAcquire實現代碼解析。
//公平獲取鎖 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //若是當前重進入數爲0,說明有機會取得鎖 if (c == 0) { //若是是第一個等待者,而且設置重進入數成功,那麼當前線程得到鎖 if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若是當前線程自己就持有鎖,那麼疊加劇進入數,而且繼續得到鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //以上條件都不知足,那麼線程進入等待隊列。 return false; }
###3.2.2 Semaphore### 則是要記錄當前還有多少次許可可使用,到0,就須要等待
,也就實現併發量的控制,Semaphore一開始設置許可數爲1,實際上就是一把互斥鎖。如下爲Semaphore的FairSync實現:
protected int tryAcquireShared(int acquires) { Thread current = Thread.currentThread(); for (;;) { Thread first = getFirstQueuedThread(); //若是當前等待隊列的第一個線程不是當前線程,那麼就返回-1表示當前線程須要等待 if (first != null && first != current) return -1; //若是當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那麼先取得semaphore還有幾個許可證,而且減去當前線程須要的許可證獲得剩下的值 int available = getState(); int remaining = available - acquires; //若是remining<0,那麼反饋給AQS當前線程須要等待,若是remaining>0,而且設置availble成功設置成剩餘數,那麼返回剩餘值(>0),也就告知AQS當前線程拿到許可,能夠繼續執行。 if (remaining < 0 ||compareAndSetState(available, remaining)) return remaining; } }
###3.2.3 CountDownLatch### 閉鎖則要保持其狀態,在這個狀態到達終止態以前,全部線程都會被park住
,閉鎖能夠設定初始值,這個值的含義就是這個閉鎖須要被countDown()幾回,由於每次CountDown是sync.releaseShared(1)
,而一開始初始值爲10的話,那麼這個閉鎖須要被countDown()十次,纔可以將這個初始值減到0,從而釋放原子狀態,讓等待的全部線程經過。
//await時候執行,只查看當前須要countDown數量減爲0了,若是爲0,說明能夠繼續執行,不然須要park住,等待countDown次數足夠,而且unpark全部等待線程 public int tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; } //countDown 時候執行,若是當前countDown數量爲0,說明沒有線程await,直接返回false而不須要喚醒park住線程,若是不爲0,獲得剩下須要 countDown的數量而且compareAndSet,最終返回剩下的countDown數量是否爲0,供AQS斷定是否釋放全部await線程。 public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
###3.2.4 FutureTask### 須要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法
,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續仍是park
,而FutureTask的tryAcquireShared方法所作的惟一事情就是檢查狀態,若是是RUNNING狀態那麼讓當前線程park
。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,而且經過unpark喚醒正在等待的線程,返回結果
。
//get時待用,只檢查當前任務是否完成或者被Cancel,若是未完成而且沒有被cancel,那麼告訴AQS當前線程須要進入等待隊列而且park住 protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } //斷定任務是否完成或者被Cancel boolean innerIsDone() { return ranOrCancelled(getState()) && runner == null; } //get時調用,對於CANCEL與其餘異常進行拋錯 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0,nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } //任務的執行線程執行完畢調用(set(V v)) void innerSet(V v) { for (;;) { int s = getState(); //若是線程任務已經執行完畢,那麼直接返回(多線程執行任務?) if (s == RAN) return; //若是被CANCEL了,那麼釋放等待線程,而且會拋錯 if (s == CANCELLED) { releaseShared(0); return; } //若是成功設定任務狀態爲已完成,那麼設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及後續清理工做(通常由FutrueTask的子類實現) if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); return; } } }
以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪裏呢?而且是能夠計數的。從以上4個example,咱們能夠很快獲得答案,AQS提供給了子類一個int state屬性
。而且暴露給子類getState()和setState()兩個方法(protected)。這樣就爲上述狀態解決了存儲問題,RetrantLock能夠將這個state用於存儲當前線程的重進入次數
,Semaphore能夠用這個state存儲許可數
,CountDownLatch則能夠存儲須要被countDown的次數
,而Future則能夠存儲當前任務的執行狀態(RUNING,RAN,CANCELL)
。其餘的Synchronizer存儲他們的一些狀態。
AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法爲須要獨佔形式獲取的synchronizer實現的
,好比線程獨佔ReetranLock的Sync,而tryAcquireShared和tryReleasedShared爲須要共享形式獲取的synchronizer實現
。
ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法
(由於獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現
);Semaphore內部類Sync則實現了tryAcquireShared和tryReleasedShared
(與CountDownLatch類似,由於公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現
)。CountDownLatch內部類Sync實現了tryAcquireShared和tryReleasedShared
。FutureTask內部類Sync也實現了tryAcquireShared和tryReleasedShared
。