使用 synchronized
來作同步處理時,鎖的獲取和釋放都是隱式的,實現的原理是經過編譯後加上不一樣的機器指令來實現。java
而 ReentrantLock
就是一個普通的類,它是基於 AQS(AbstractQueuedSynchronizer)
來實現的。node
是一個重入鎖:一個線程得到了鎖以後仍然能夠反覆的加鎖,不會出現本身阻塞本身的狀況。git
AQS
是Java
併發包裏實現鎖、同步的一個重要的基礎框架。github
ReentrantLock 分爲公平鎖和非公平鎖,能夠經過構造方法來指定具體類型:併發
//默認非公平鎖 public ReentrantLock() { sync = new NonfairSync(); } //公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
默認通常使用非公平鎖,它的效率和吞吐量都比公平鎖高的多(後面會分析具體緣由)。框架
一般的使用方式以下:ide
private ReentrantLock lock = new ReentrantLock(); public void run() { lock.lock(); try { //do bussiness } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
首先看下獲取鎖的過程:ui
public void lock() { sync.lock(); }
能夠看到是使用 sync
的方法,而這個方法是一個抽象方法,具體是由其子類(FairSync
)來實現的,如下是公平鎖的實現:this
final void lock() { acquire(1); } //AbstractQueuedSynchronizer 中的 acquire() public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
第一步是嘗試獲取鎖(tryAcquire(arg)
),這個也是由其子類實現:線程
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
首先會判斷 AQS
中的 state
是否等於 0,0 表示目前沒有其餘線程得到鎖,當前線程就能夠嘗試獲取鎖。
注意:嘗試以前會利用 hasQueuedPredecessors()
方法來判斷 AQS 的隊列中中是否有其餘線程,若是有則不會嘗試獲取鎖(這是公平鎖特有的狀況)。
若是隊列中沒有線程就利用 CAS 來將 AQS 中的 state 修改成1,也就是獲取鎖,獲取成功則將當前線程置爲得到鎖的獨佔線程(setExclusiveOwnerThread(current)
)。
若是 state
大於 0 時,說明鎖已經被獲取了,則須要判斷獲取鎖的線程是否爲當前線程(ReentrantLock
支持重入),是則須要將 state + 1
,並將值更新。
若是 tryAcquire(arg)
獲取鎖失敗,則須要用 addWaiter(Node.EXCLUSIVE)
將當前線程寫入隊列中。
寫入以前須要將當前線程包裝爲一個 Node
對象(addWaiter(Node.EXCLUSIVE)
)。
AQS 中的隊列是由 Node 節點組成的雙向鏈表實現的。
包裝代碼:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
首先判斷隊列是否爲空,不爲空時則將封裝好的 Node
利用 CAS
寫入隊尾,若是出現併發寫入失敗就須要調用 enq(node);
來寫入了。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這個處理邏輯就至關於自旋
加上 CAS
保證必定能寫入隊列。
寫入隊列以後須要將當前線程掛起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
):
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
首先會根據 node.predecessor()
獲取到上一個節點是否爲頭節點,若是是則嘗試獲取一次鎖,獲取成功就萬事大吉了。
若是不是頭節點,或者獲取鎖失敗,則會根據上一個節點的 waitStatus
狀態來處理(shouldParkAfterFailedAcquire(p, node)
)。
waitStatus
用於記錄當前節點的狀態,如節點取消、節點等待等。
shouldParkAfterFailedAcquire(p, node)
返回當前線程是否須要掛起,若是須要則調用 parkAndCheckInterrupt()
:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
他是利用 LockSupport
的 part
方法來掛起當前線程的,直到被喚醒。
公平鎖與非公平鎖的差別主要在獲取鎖:
公平鎖就至關於買票,後來的人須要排到隊尾依次買票,不能插隊。
而非公平鎖則沒有這些規則,是搶佔模式,每來一我的不會去管隊列如何,直接嘗試獲取鎖。
非公平鎖:
final void lock() { //直接嘗試獲取鎖 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
公平鎖:
final void lock() { acquire(1); }
還要一個重要的區別是在嘗試獲取鎖時tryAcquire(arg)
,非公平鎖是不須要判斷隊列中是否還有其餘線程,也是直接嘗試獲取鎖:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //沒有 !hasQueuedPredecessors() 判斷 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平鎖和非公平鎖的釋放流程都是同樣的:
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒被掛起的線程 unparkSuccessor(h); return true; } return false; } //嘗試釋放鎖 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
首先會判斷當前線程是否爲得到鎖的線程,因爲是重入鎖因此須要將 state
減到 0 才認爲徹底釋放鎖。
釋放以後須要調用 unparkSuccessor(h)
來喚醒被掛起的線程。
因爲公平鎖須要關心隊列的狀況,得按照隊列裏的前後順序來獲取鎖(會形成大量的線程上下文切換),而非公平鎖則沒有這個限制。
因此也就能解釋非公平鎖的效率會被公平鎖更高。
原文連接 https://github.com/crossoverJie/JCSprout/blob/master/MD/ReentrantLock.md