目錄java
ReentrantLock是基於同步器AbstractQueuedSynchronizer(AQS)實現的獨佔式重入鎖,支持公平鎖、非公平鎖(默認是非公平鎖)、申請鎖可響應中斷以及限時獲取鎖等高級功能,分析ReentrantLock就離不開同步器AQS,關係圖以下:node
在AQS中實現瞭如何獲取鎖和釋放鎖的模板方法,重入鎖ReentrantLock實現時經過內部類繼承Sync同步器AbstractQueuedSynchronizer。並調用同步器提供的模板方法,而這些模板方法將會調用ReentrantLock重寫的方法,這是典型的模板方法設計模式。AQS實現同步器功能離不開三大基礎組件:設計模式
AQS中使用了一個int型的volatile變量來表示同步狀態,線程在嘗試獲取鎖的時候,就回去比較同步器同步狀態state是否爲0,爲0,那麼線程就拿到了鎖並改變同步狀態;不爲0,說明有其餘線程拿到了鎖。AQS中提供瞭如下三個方法來訪問或修改同步狀態:數據結構
//AQS成員變量,同步狀態 private volatile int state; //獲取當前同步狀態 protected final int getState() { return state; } //設置當前同步狀態 protected final void setState(int newState) { state = newState; } //使用CAS設置當前狀態,該方法可以保證狀態設置的原子性 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
當有多個線程競爭獲取鎖時,只有一個線程能獲取到鎖,那麼這些沒有獲取到鎖的線程就須要等待,等到線程把鎖釋放了再喚醒等待線程去獲取鎖,爲了實現等待-喚醒機制,AQS提供了基於CLH隊列(Craig, Landin,Hagersten)實現的等待隊列,是一個先入先出的雙向隊列。同步隊列是一個非阻塞的 FIFO 隊列。也就是說往裏面插入或移除一個節點的時候,在併發條件下不會阻塞,而是經過自旋鎖和CAS保證節點插入和移除的原子性。
併發
AQS中的內部類Node是構建同步隊列和等待隊列(後面介紹Condition再介紹)的基礎節點類,Node類部分源碼以下:函數
static final class Node { //等待狀態 volatile int waitStatus; //前驅結點 volatile Node prev; //後繼節點 volatile Node next; //等待獲取鎖的線程 volatile Thread thread; //condition隊列的後繼節點 Node nextWaiter; }
關於節點Node的waitStatus,它反映的是節點中線程的等待狀態,有以下取值:ui
從關係圖能夠看出,ReentrantLock實現了Lock接口,內部類Sync是AQS的子類,Sync有兩個子類FairSync(公平鎖)和NonFairSync(非公平鎖)。ReentrantLock只有一個成員變量sync,經過構造函數初始化,能夠看到經過默認的構造函數構造的ReentrantLock是非公平鎖。this
private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock獲取鎖方法以下:線程
public void lock() { sync.lock(); }
公平鎖調用的是FairSync的lock方法:設計
final void lock() { acquire(1); }
acquire方法是AQS實現的方法,介紹一下參數的1的意思:AQS規定同步狀態state,想要得到鎖就去改變同步狀態,就是把同步狀態加1。acquire方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
獲取鎖的過程:
公平鎖嘗試獲取,在FairSync裏實現,獲取同步狀態成功返回true,不然返回false
protected final boolean tryAcquire(int acquires) { //獲取當前線程 final Thread current = Thread.currentThread(); //獲取同步狀態 int c = getState(); //同步狀態爲0,沒有其餘線程佔據鎖 if (c == 0) { //檢測同步隊列沒有其餘線程等待(確保公平性),若是沒有獲取鎖就以CAS方式嘗試改變同步狀態 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //設置鎖的擁有者爲當前線程 setExclusiveOwnerThread(current); return true; } } //同步狀態不爲0,檢測是不是當前線程擁有鎖 else if (current == getExclusiveOwnerThread()) { //當前線程擁有鎖,直接更新同步狀態,重入鎖 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { //同步隊列尾節點 Node t = tail; //同步隊列頭節點 Node h = head; Node s; //h!=t 頭節點和尾節點不一樣,說明同步隊列不爲空 //同步隊列不爲空,檢測下一個等待獲取鎖的線程(h.next.thread)是否是當前線程 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }
addWaiter在AQS中實現,以當前線程構成節點加入到同步隊列末尾,並返回這個節點Node。
private Node addWaiter(Node mode) { //以當前線程和給定模式構成節點Node Node node = new Node(Thread.currentThread(), mode); // 同步隊列不爲空,以CAS方式把當前線程加入到隊列末尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //隊列爲空,創建同步隊列,再把當前線程加入同步隊列 enq(node); return node; }
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
若是當前線程的節點的前驅結點,就去嘗試獲取同步狀態,若是不是或者獲取失敗根據waitStatus對同步隊列進行清理:把waitStatus爲CANCELLED從同步隊列清除,修改錯誤的waitStatus,而後把線程堵塞,返回當前線程是否被中斷。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //當前節點的前驅結點 final Node p = node.predecessor(); //前驅結點是head頭節點,嘗試獲取同步狀態 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
ReentrantLock公平鎖與非公平鎖的釋放機制是同樣的,釋放鎖方法以下:
public void unlock() { sync.release(1); }
unlock方法調用的release方法是在AQS中實現的,這裏的1相似於acquire(1),適用於用來設置同步狀態的,釋放鎖時會把同步狀態減1。release方法會先調用tryRelease來嘗試釋放當前線程鎖持有的鎖。成功的話,則喚醒後繼等待線程,並返回true。不然,直接返回false
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease嘗試獲取鎖,當同步狀態爲0時清空佔據鎖的線程,返回true;若是同步狀態不爲0返回false,由於ReentrantLock是重入鎖,只有完全釋放tryRelease纔會返回true。
protected final boolean tryRelease(int releases) { // c是本次釋放鎖以後的同步狀態 int c = getState() - releases; //當前線程不是鎖的擁有者,拋出IllegalMonitorStateException異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是「鎖」已經被當前線程完全釋放,則設置「鎖」的持有者爲null,即鎖是可獲取狀態。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
當前線程釋放鎖成功的話,會喚醒當前線程的後繼線程。從aquireQueued方法能夠看出,一旦頭結點的後繼結點被喚醒,那麼後繼結點就嘗試去獲取鎖,若是獲取成功就將頭結點設置爲自身,並將前一個頭節點清空。
private void unparkSuccessor(Node node) { // 獲取當前線程(要釋放鎖)的等待狀態 int ws = node.waitStatus; if (ws < 0) //設置爲初始狀態 compareAndSetWaitStatus(node, ws, 0); //同步隊列頭節點的下一個等待節點 Node s = node.next; //等待節點無效,從同步節點尾部開始遍歷找到有效的等待節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒等待節點的線程 if (s != null) LockSupport.unpark(s.thread); }
NonfairSync類中lock()實現,首先嚐試用CAS更改同步狀態,若是成功,把當前線程設置爲獨佔鎖的擁有者;而後調用acquire(1)方法。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
acquire方法除了tryAcquire是由AQS的子類實現的,其餘方法都是在AQS類實現的,tryAcquire的實現機制不一樣體現了公平鎖與非公平鎖的不一樣。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock中的NonfairSync的tryAcquire方法,調用了nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
非公平鎖的嘗試獲取鎖時,若是同步狀態爲0,即沒有其餘線程獲取到鎖,當前線程直接以CAS方式改變同步狀態,不會去同步隊列找是否有其餘線程早於當前線程等在同步隊列中,效率較高。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //同步狀態爲0,嘗試以CAS方式改變同步狀態 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //重入鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
本文介紹了ReentrantLock基於AQS同步器實現的公平鎖和非公平鎖的獲取和釋放,基於CAS改變同步狀態是得到獨佔鎖的基礎,爲了不多個線程同時對進行競爭,在AQS中維護了FIFO的同步隊列,當獨佔鎖釋放時,AQS同步器調度同步隊列隊首等待節點的線程去獲取鎖,有效避免了海量競爭獨佔鎖形成資源的浪費,是一個很是巧妙的方法。