Doug Lea是JDK中concurrent工具包的做者,這位大神是誰能夠自行google。java
本文淺析ReentrantLock(可重入鎖)的原理node
Lock接口定義了這幾個方法:安全
Condition
,Condition之後會分析;ReentrantLock實現了Lock接口,ReentrantLock中有一個重要的成員變量,同步器
sync繼承了AbstractQueuedSynchronizer
簡稱AQS
,咱們先介紹AQS
;多線程
AQS用一個隊列(結構是一個FIFO隊列)來管理同步狀態,當線程獲取同步狀態失敗時,會將當前線程包裝成一個Node
放入隊列,當前線程進入阻塞狀態;當同步狀態釋放時,會從隊列去出線程獲取同步狀態。app
AQS裏定義了head、tail、state,他們都是volatile修飾的,head指向隊列的第一個元素,tail指向隊列的最後一個元素,state表示了同步狀態,這個狀態很是重要,在ReentrantLock中,state爲0的時候表明鎖被釋放,state爲1時表明鎖已經被佔用;工具
看下面代碼:oop
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }
這一段靜態初始化代碼初始了state、head、tail等變量的在內存中的偏移量;Unsafe
類是sun.misc下的類,不屬於java標準。Unsafe
讓java能夠像C語言同樣操做內存指針,其中就提供了CAS
的一些原子操做和park、unpark
對線程掛起與恢復的操做;關於CAS
是concurrent工具包的基礎,之後會單獨介紹,其主要做用就是在硬件級別提供了compareAndSwap
的功能,從而實現了比較和交換的原子性操做。ui
AQS還有一個內部類叫Node,它將線程封裝,利用prev和next能夠將Node串連成雙向鏈表,這就是一開始說的FIFO的結構;this
ReentrantLock提供了公平鎖和非公平鎖,咱們這裏從非公平鎖分析AQS的應用;
Lock調用lock()方法時調用了AQS的lock()方法,咱們來看這個非公平鎖NonfairSync
的lock方法:google
final void lock() { //首先調用CAS搶佔同步狀態state,若是成功則將當前線程設置爲同步器的獨佔線程, //這也是非公平的體現,由於新來的線程沒有立刻加入隊列尾部,而是先嚐試搶佔同步狀態。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //搶佔同步狀態失敗,調用AQS的acquire acquire(1); }
瞄一眼acquire方法:
public final void acquire(int arg) { //在這裏仍是先試着搶佔一下同步狀態 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire調用的是NonfairSync
的實現,而後又調用了Sync
的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //和以前同樣,利用CAS搶佔同步狀態,成功則設置當前線程爲獨佔線程而且返回true if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若是當前線程已是獨佔線程,即當前線程已經得到了同步狀態則將同步狀態state加1, //這裏是可重入鎖的體現 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //沒有搶佔到同步狀態返回false return false; }
再看addWaiter方法:
private Node addWaiter(Node mode) { //新建一個Node,封裝了當前線程和模式,這裏傳入的是獨佔模式Node.EXCLUSIVE Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //若是tail不爲空就不須要初始化node隊列了 if (pred != null) { //將node做爲隊列最後一個元素入列 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; //返回新建的node return node; } } //若是tail爲空則表示node隊列尚未初始化,此時初始化隊列 enq(node); return node; }
瞄一眼enq方法:
private Node enq(final Node node) { //無限loop直到CAS成功,其餘地方也大量使用了無限loop for (;;) { Node t = tail; if (t == null) { // Must initialize //隊列尾部爲空,必須初始化,head初始化爲一個空node,不包含線程,tail = head if (compareAndSetHead(new Node())) tail = head; } else { //隊列已經初始化,將當前node加在列尾 node.prev = t; //將當前node設置爲tail,CAS操做,enqueue安全 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
拿到新建的node後傳給acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //標記是否中斷狀態 boolean interrupted = false; for (;;) { //拿到當前node的前驅 final Node p = node.predecessor(); //若是前驅正好爲head,即當前線程在列首,立刻tryAcquire搶佔同步狀態 if (p == head && tryAcquire(arg)) { //搶佔成功後,將當前節點的thread、prev清空做爲head setHead(node); p.next = null; // help GC 原來的head等待GC回收 failed = false; return interrupted; } //沒有搶佔成功後,判斷是否要park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
瞄一眼shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若是前驅node的狀態爲SIGNAL,說明當前node能夠park /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //若是前驅的狀態大於0說明前驅node的thread已經被取消 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //從前驅node開始,將取消的node移出隊列 //當前節點以前的節點不會變化,因此這裏能夠更新prev,並且沒必要用CAS來更新。 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前驅node狀態等於0或者爲PROPAGATE(之後會介紹) //將前驅node狀態設置爲SIGNAL,返回false,表示當前node暫不須要park, //能夠再嘗試一下搶佔同步狀態 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
看一下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { //阻塞當前線程 LockSupport.park(this); //返回當前線程是否設置中斷標誌,並清空中斷標誌 return Thread.interrupted(); }
這裏解釋一下爲何要保存一下中斷標誌:中斷會喚醒被park的阻塞線程,但被park的阻塞線程不會響應中斷,因此這裏保存一下中斷狀態並返回,若是狀態爲true說明發生過中斷,會補發一次中斷,即調用interrupt()方法
在acquireQueued中發生異常時執行cancelAcquire:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //清空node的線程 node.thread = null; // Skip cancelled predecessors //移除被取消的前繼node,這裏只移動了node的prev,沒有改變next Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //獲取前繼node的後繼node Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //設置當前node等待狀態爲取消,其餘線程檢測到取消狀態會移除它們 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { //若是當前node爲tail,將前驅node設置爲tail(CAS) //設置前驅node(即如今的tail)的後繼爲null(CAS) //此時,若是中間有取消的node,將沒有引用指向它,將被GC回收 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //若是當前node既不是head也不是tail,設置前繼node的後繼爲當前node後繼 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //喚醒當前node後繼 unparkSuccessor(node); } //當前node的next設置爲本身 //注意如今當前node的後繼的prev還指向當前node,因此當前node還未被刪除,prev是在移除取消節點時更新的 //這裏就是爲何在前面要從後往前找可換新的node緣由了,next會致使死循環 node.next = node; // help GC } }
畫圖描述解析一下cancelAcquire:
首先看如何跳過取消的前驅
這時,前驅被取消的node並無被移出隊列,前驅的前驅的next還指向前驅;
若是當前node是tail的狀況:
這時,沒有任何引用指向當前node;
若是當前node既不是tail也不是head:
這時,當前node的前驅的next指向當前node的後繼,當前node的next指向本身,pre都沒有更新;
若是當前node是head的後繼:
這時,只是簡單的將當前node的next指向本身;
到這裏,當線程搶佔同步狀態的時候,會進入FIFO隊列等待同步狀態被釋放。在unlock()方法中調用了同步器的release方法;看一下release方法:
public final boolean release(int arg) { //判斷是否釋放同步狀態成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //若是head不爲null,且head的等待狀態不爲0, //喚醒後繼node的線程 unparkSuccessor(h); return true; } return false; }
再來看一下tryRelease方法(在Sync類中實現):
protected final boolean tryRelease(int releases) { int c = getState() - releases; //當前thread不是獨佔模式的那個線程,拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //若是同步狀態state爲0,釋放成功,將獨佔線程設置爲null free = true; setExclusiveOwnerThread(null); } //更新同步狀態state setState(c); return free; }
繼續看unparkSuccessor(喚醒後繼node的tread)方法:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //head的等待狀態爲負數,設置head的等待狀態爲0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { //若是head的後繼node不存在或者後繼node等待狀態大於0(即取消) //從尾部往當前node迭代找到等待狀態爲負數的node,unpark //由於會有取消的節點 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
介紹完ReentrantLock後,咱們大致瞭解了AQS的工做原理。AQS主要就是使用了同步狀態和隊列實現了鎖的功能。有了CAS這個基礎,AQS才能發揮做用,使得在enqueue、dequeque、節點取消和異常時可以保證隊列在多線程下的完整性。