如下是本篇文章的大綱java
synchronized是java內置的關鍵字,它提供了一種獨佔的加鎖方式。synchronized的獲取和釋放鎖由JVM實現,用戶不須要顯示的釋放鎖,很是方便。然而synchronized也有必定的侷限性,例如:node
JDK1.5以後發佈,加入了Doug Lea實現的concurrent包。包內提供了Lock類,用來提供更多擴展的加鎖功能。Lock彌補了synchronized的侷限,提供了更加細粒度的加鎖功能。編程
Lock api以下api
void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition();
其中最經常使用的就是lock和unlock操做了。由於使用lock時,須要手動的釋放鎖,因此須要使用try..catch來包住業務代碼,而且在finally中釋放鎖。典型使用以下安全
private Lock lock = new ReentrantLock(); public void test(){ lock.lock(); try{ doSomeThing(); }catch (Exception e){ // ignored }finally { lock.unlock(); } }
AbstractQueuedSynchronizer簡稱AQS,是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題。併發
AQS使用一個FIFO的隊列表示排隊等待鎖的線程,隊列頭節點稱做「哨兵節點」或者「啞節點」,它不與任何線程關聯。其餘的節點與等待線程關聯,每一個節點維護一個等待狀態waitStatus。如圖框架
AQS中還有一個表示狀態的字段state,例如ReentrantLocky用它表示線程重入鎖的次數,Semaphore用它表示剩餘的許可數量,FutureTask用它表示任務的狀態。對state變量值的更新都採用CAS操做保證更新操做的原子性。ui
AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個類只有一個變量:exclusiveOwnerThread,表示當前佔用該鎖的線程,而且提供了相應的get,set方法。this
理解AQS能夠幫助咱們更好的理解JCU包中的同步容器。spa
ReentrantLock是Lock的默認實現之一。那麼lock()和unlock()是怎麼實現的呢?首先咱們要弄清楚幾個概念
ReentrantLock提供了兩個構造器,分別是
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
默認構造器初始化爲NonfairSync對象,即非公平鎖,而帶參數的構造器能夠指定使用公平鎖和非公平鎖。由lock()和unlock的源碼能夠看到,它們只是分別調用了sync對象的lock()和release(1)方法。
Sync是ReentrantLock的內部類,它的結構以下
能夠看到Sync擴展了AbstractQueuedSynchronizer。
咱們從源代碼出發,分析非公平鎖獲取鎖和釋放鎖的過程。
lock()源碼以下
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
首先用一個CAS操做,判斷state是不是0(表示當前鎖未被佔用),若是是0則把它置爲1,而且設置當前線程爲該鎖的獨佔線程,表示獲取鎖成功。當多個線程同時嘗試佔用同一個鎖時,CAS操做只能保證一個線程操做成功,剩下的只能乖乖的去排隊啦。
「非公平」即體如今這裏,若是佔用鎖的線程剛釋放鎖,state置爲0,而排隊等待鎖的線程還未喚醒時,新來的線程就直接搶佔了該鎖,那麼就「插隊」了。
若當前有三個線程去競爭鎖,假設線程A的CAS操做成功了,拿到了鎖開開心心的返回了,那麼線程B和C則設置state失敗,走到了else裏面。咱們往下看acquire。
acquire(arg)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
代碼很是簡潔,可是背後的邏輯卻很是複雜,可見Doug Lea大神的編程功力。
1. 第一步。嘗試去獲取鎖。若是嘗試獲取鎖成功,方法直接返回。
tryAcquire(arg)
final boolean nonfairTryAcquire(int acquires) { //獲取當前線程 final Thread current = Thread.currentThread(); //獲取state變量值 int c = getState(); if (c == 0) { //沒有線程佔用鎖 if (compareAndSetState(0, acquires)) { //佔用鎖成功,設置獨佔線程爲當前線程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //當前線程已經佔用該鎖 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新state值爲新的重入次數 setState(nextc); return true; } //獲取鎖失敗 return false; }
非公平鎖tryAcquire的流程是:檢查state字段,若爲0,表示鎖未被佔用,那麼嘗試佔用,若不爲0,檢查當前鎖是否被本身佔用,若被本身佔用,則更新state字段,表示重入鎖的次數。若是以上兩點都沒有成功,則獲取鎖失敗,返回false。
2. 第二步,入隊。因爲上文中提到線程A已經佔用了鎖,因此B和C執行tryAcquire失敗,而且入等待隊列。若是線程A拿着鎖死死不放,那麼B和C就會被掛起。
先看下入隊的過程。
先看addWaiter(Node.EXCLUSIVE)
/** * 將新節點和當前線程關聯而且入隊列 * @param mode 獨佔/共享 * @return 新節點 */ private Node addWaiter(Node mode) { //初始化節點,設置關聯線程和模式(獨佔 or 共享) Node node = new Node(Thread.currentThread(), mode); // 獲取尾節點引用 Node pred = tail; // 尾節點不爲空,說明隊列已經初始化過 if (pred != null) { node.prev = pred; // 設置新節點爲尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾節點爲空,說明隊列還未初始化,須要初始化head節點併入隊新節點 enq(node); return node; }
B、C線程同時嘗試入隊列,因爲隊列還沒有初始化,tail==null,故至少會有一個線程會走到enq(node)。咱們假設同時走到了enq(node)裏。
/** * 初始化隊列而且入隊新節點 */ private Node enq(final Node node) { //開始自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize // 若是tail爲空,則新建一個head節點,而且tail指向head if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // tail不爲空,將新節點入隊 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這裏體現了經典的自旋+CAS組合來實現非阻塞的原子操做。因爲compareAndSetHead的實現使用了unsafe類提供的CAS操做,因此只有一個線程會建立head節點成功。假設線程B成功,以後B、C開始第二輪循環,此時tail已經不爲空,兩個線程都走到else裏面。假設B線程compareAndSetTail成功,那麼B就能夠返回了,C因爲入隊失敗還須要第三輪循環。最終全部線程均可以成功入隊。
當B、C入等待隊列後,此時AQS隊列以下:
3. 第三步,掛起。B和C相繼執行acquireQueued(final Node node, int arg)。這個方法讓已經入隊的線程嘗試獲取鎖,若失敗則會被掛起。
/** * 已經入隊的線程嘗試獲取鎖 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //標記是否成功獲取鎖 try { boolean interrupted = false; //標記線程是否被中斷過 for (;;) { final Node p = node.predecessor(); //獲取前驅節點 //若是前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); // 獲取成功,將當前節點設置爲head節點 p.next = null; // 原head節點出隊,在某個時間點被GC回收 failed = false; //獲取成功 return interrupted; //返回是否被中斷過 } // 判斷獲取失敗後是否能夠掛起,若能夠則掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 線程若被中斷,設置interrupted爲true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
code裏的註釋已經很清晰的說明了acquireQueued的執行流程。假設B和C在競爭鎖的過程當中A一直持有鎖,那麼它們的tryAcquire操做都會失敗,所以會走到第2個if語句中。咱們再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都作了哪些事吧。
/** * 判斷當前線程獲取鎖失敗以後是否須要掛起. */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驅節點的狀態 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 前驅節點狀態爲signal,返回true return true; // 前驅節點狀態爲CANCELLED if (ws > 0) { // 從隊尾向前尋找第一個狀態不爲CANCELLED的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 將前驅節點的狀態設置爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 掛起當前線程,返回線程中斷狀態並重置 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
線程入隊後可以掛起的前提是,它的前驅節點的狀態爲SIGNAL,它的含義是「Hi,前面的兄弟,若是你獲取鎖而且出隊後,記得把我喚醒!」。因此shouldParkAfterFailedAcquire會先判斷當前節點的前驅是否狀態符合要求,若符合則返回true,而後調用parkAndCheckInterrupt,將本身掛起。若是不符合,再看前驅節點是否>0(CANCELLED),如果那麼向前遍歷直到找到第一個符合要求的前驅,若不是則將前驅節點的狀態設置爲SIGNAL。
整個流程中,若是前驅結點的狀態不是SIGNAL,那麼本身就不能安心掛起,須要去找個安心的掛起點,同時能夠再嘗試下看有沒有機會去嘗試競爭鎖。
最終隊列可能會以下圖所示
線程B和C都已經入隊,而且都被掛起。當線程A釋放鎖的時候,就會去喚醒線程B去獲取鎖啦。
unlock相對於lock就簡單不少。源碼以下
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
若是理解了加鎖的過程,那麼解鎖看起來就容易多了。流程大體爲先嚐試釋放鎖,若釋放成功,那麼查看頭結點的狀態是否爲SIGNAL,若是是則喚醒頭結點的下個節點關聯的線程,若是釋放失敗那麼返回false表示解鎖失敗。這裏咱們也發現了,每次都只喚起頭結點的下一個節點關聯的線程。
最後咱們再看下tryRelease的執行過程
/** * 釋放當前線程佔用的鎖 * @param releases * @return 是否釋放成功 */ protected final boolean tryRelease(int releases) { // 計算釋放後state值 int c = getState() - releases; // 若是不是當前線程佔用鎖,那麼拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 鎖被重入次數爲0,表示釋放成功 free = true; // 清空獨佔線程 setExclusiveOwnerThread(null); } // 更新state值 setState(c); return free; }
這裏入參爲1。tryRelease的過程爲:當前釋放鎖的線程若不持有鎖,則拋出異常。若持有鎖,計算釋放後的state值是否爲0,若爲0表示鎖已經被成功釋放,而且則清空獨佔線程,最後更新state值,返回free。
用一張流程圖總結一下非公平鎖的獲取鎖的過程。
公平鎖和非公平鎖不一樣之處在於,公平鎖在獲取鎖的時候,不會先去檢查state狀態,而是直接執行aqcuire(1),這裏再也不贅述。
在ReetrantLock的tryLock(long timeout, TimeUnit unit) 提供了超時獲取鎖的功能。它的語義是在指定的時間內若是獲取到鎖就返回true,獲取不到則返回false。這種機制避免了線程無限期的等待鎖釋放。那麼超時的功能是怎麼實現的呢?咱們仍是用非公平鎖爲例來一探究竟。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
仍是調用了內部類裏面的方法。咱們繼續向前探究
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
這裏的語義是:若是線程被中斷了,那麼直接拋出InterruptedException。若是未中斷,先嚐試獲取鎖,獲取成功就直接返回,獲取失敗則進入doAcquireNanos。tryAcquire咱們已經看過,這裏重點看一下doAcquireNanos作了什麼。
/** * 在有限的時間內去競爭鎖 * @return 是否獲取成功 */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 起始時間 long lastTime = System.nanoTime(); // 線程入隊 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 又是自旋! for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 若是前驅是頭節點而且佔用鎖成功,則將當前節點變成頭結點 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 若是已經超時,返回false if (nanosTimeout <= 0) return false; // 超時時間未到,且須要掛起 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 阻塞當前線程直到超時時間到期 LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); // 更新nanosTimeout nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) //相應中斷 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireNanos的流程簡述爲:線程先入等待隊列,而後開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在隊列裏找一個安全點把本身掛起直到超時時間過時。這裏爲何還須要循環呢?由於當前線程節點的前驅狀態可能不是SIGNAL,那麼在當前這一輪循環中線程不會被掛起,而後更新超時時間,開始新一輪的嘗試
ReentrantLock的核心功能講解差很少落下帷幕,理解AQS,就很容易理解ReentrantLock的實現原理。文中慘雜着筆者的我的理解,若有不正之處,還望指正。
做者:mayday芋頭