前幾篇文章分析了線程池的原理,接下來研究鎖的方面。顯式鎖ReentrantLock和同步工具類的實現基礎都是AQS,因此合起來一齊研究。node
AQS便是AbstractQueuedSynchronizer,一個用來構建鎖和同步工具的框架,包括經常使用的ReentrantLock、CountDownLatch、Semaphore等。併發
AQS沒有鎖之類的概念,它有個state變量,是個int類型,在不一樣場合有着不一樣含義。本文研究的是鎖,爲了好理解,姑且先把state當成鎖。框架
AQS圍繞state提供兩種基本操做「獲取」和「釋放」,有條雙向隊列存放阻塞的等待線程,並提供一系列判斷和處理方法,簡單說幾點:工具
至於線程是否能夠得到state,如何釋放state,就不是AQS關心的了,要由子類具體實現。性能
直接分析AQS的代碼會比較難明白,因此結合子類ReentrantLock來分析。AQS的功能能夠分爲獨佔和共享,ReentrantLock實現了獨佔功能,是本文分析的目標。ui
Lock lock = new ReentranLock(); lock.lock(); try{ //do something }finally{ lock.unlock(); }
ReentrantLock實現了Lock接口,加鎖和解鎖都須要顯式寫出,注意必定要在適當時候unlock。this
和synchronized相比,ReentrantLock用起來會複雜一些。在基本的加鎖和解鎖上,二者是同樣的,因此無特殊狀況下,推薦使用synchronized。ReentrantLock的優點在於它更靈活、更強大,除了常規的lock()、unlock()以外,還有lockInterruptibly()、tryLock()方法,支持中斷、超時。spa
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock的內部類Sync繼承了AQS,分爲公平鎖FairSync和非公平鎖NonfairSync。線程
ReentrantLock默認使用非公平鎖是基於性能考慮,公平鎖爲了保證線程規規矩矩地排隊,須要增長阻塞和喚醒的時間開銷。若是直接插隊獲取非公平鎖,跳過了對隊列的處理,速度會更快。指針
final void lock() { acquire(1);} public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先來看公平鎖的實現,lock方法很簡單的一句話調用AQS的acquire方法:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
噢,AQS的tryAcquire不能直接調用,由於是否獲取鎖成功是由子類決定的,直接看ReentrantLock的tryAcquire的實現。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
獲取鎖成功分爲兩種狀況,第一個if判斷AQS的state是否等於0,表示鎖沒有人佔有。接着,hasQueuedPredecessors判斷隊列是否有排在前面的線程在等待鎖,沒有的話調用compareAndSetState使用cas的方式修改state,傳入的acquires寫死是1。最後線程獲取鎖成功,setExclusiveOwnerThread將線程記錄爲獨佔鎖的線程。
第二個if判斷當前線程是否爲獨佔鎖的線程,由於ReentrantLock是可重入的,線程能夠不停地lock來增長state的值,對應地須要unlock來解鎖,直到state爲零。
若是最後獲取鎖失敗,下一步須要將線程加入到等待隊列。
AQS內部有一條雙向的隊列存放等待線程,節點是Node對象。每一個Node維護了線程、先後Node的指針和等待狀態等參數。
線程在加入隊列以前,須要包裝進Node,調用方法是addWaiter:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
每一個Node須要標記是獨佔的仍是共享的,由傳入的mode決定,ReentrantLock天然是使用獨佔模式Node.EXCLUSIVE。
建立好Node後,若是隊列不爲空,使用cas的方式將Node加入到隊列尾。注意,這裏只執行了一次修改操做,而且可能由於併發的緣由失敗。所以修改失敗的狀況和隊列爲空的狀況,須要進入enq。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq是個死循環,保證Node必定能插入隊列。注意到,當隊列爲空時,會先爲頭節點建立一個空的Node,由於頭節點表明獲取了鎖的線程,如今尚未,因此先空着。
線程加入隊列後,下一步是調用acquireQueued阻塞線程。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //1 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
標記1是線程喚醒後嘗試獲取鎖的過程。若是前一個節點正好是head,表示本身排在第一位,能夠立刻調用tryAcquire嘗試。若是獲取成功就簡單了,直接修改本身爲head。這步是實現公平鎖的核心,保證釋放鎖時,由下個排隊線程獲取鎖。(看到線程解鎖時,再看回這裏啦)
標記2是線程獲取鎖失敗的處理。這個時候,線程可能等着下一次獲取,也可能不想要了,Node變量waitState描述了線程的等待狀態,一共四種狀況:
static final int CANCELLED = 1; //取消 static final int SIGNAL = -1; //下個節點須要被喚醒 static final int CONDITION = -2; //線程在等待條件觸發 static final int PROPAGATE = -3; //(共享鎖)狀態須要向後傳播
shouldParkAfterFailedAcquire傳入當前節點和前節點,根據前節點的狀態,判斷線程是否須要阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若是線程須要阻塞,由parkAndCheckInterrupt方法進行操做。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt使用了LockSupport,和cas同樣,最終使用UNSAFE調用Native方法實現線程阻塞(之後有機會就分析下LockSupport的原理,park和unpark方法做用相似於wait和notify)。最後返回線程喚醒後的中斷狀態,關於中斷,後文會分析。
到這裏總結一下獲取鎖的過程:線程去競爭一個鎖,可能成功也可能失敗。成功就直接持有資源,不須要進入隊列;失敗的話進入隊列阻塞,等待喚醒後再嘗試競爭鎖。
經過上面詳細的獲取鎖過程分析,釋放鎖過程大概能夠猜到:頭節點是獲取鎖的線程,先移出隊列,再通知後面的節點獲取鎖。
public void unlock() { sync.release(1); }
ReentrantLock的unlock方法很簡單地調用了AQS的release:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
和lock的tryAcquire同樣,unlock的tryRelease一樣由ReentrantLock實現:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
由於鎖是能夠重入的,因此每次lock會讓state加1,對應地每次unlock要讓state減1,直到爲0時將獨佔線程變量設置爲空,返回標記是否完全釋放鎖。
最後,調用unparkSuccessor將頭節點的下個節點喚醒:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
尋找下個待喚醒的線程是從隊列尾向前查詢的,找到線程後調用LockSupport的unpark方法喚醒線程。被喚醒的線程從新執行acquireQueued裏的循環,就是上文關於acquireQueued標記1部分,線程從新嘗試獲取鎖。
static void selfInterrupt() { Thread.currentThread().interrupt(); }
在acquire裏還有最後一句代碼調用了selfInterrupt,功能很簡單,對當前線程產生一箇中斷請求。
爲何要這樣操做呢?由於LockSupport.park阻塞線程後,有兩種可能被喚醒。
第一種狀況,前節點是頭節點,釋放鎖後,會調用LockSupport.unpark喚醒當前線程。整個過程沒有涉及到中斷,最終acquireQueued返回false時,不須要調用selfInterrupt。
第二種狀況,LockSupport.park支持響應中斷請求,可以被其餘線程經過interrupt()喚醒。但這種喚醒並無用,由於線程前面可能還有等待線程,在acquireQueued的循環裏,線程會再次被阻塞。parkAndCheckInterrupt返回的是Thread.interrupted(),不只返回中斷狀態,還會清除中斷狀態,保證阻塞線程忽略中斷。最終acquireQueued返回true時,真正的中斷狀態已經被清除,須要調用selfInterrupt維持中斷狀態。
所以普通的lock方法並不能被其餘線程中斷,ReentrantLock是能夠支持中斷,須要使用lockInterruptibly。
二者的邏輯基本同樣,不一樣之處是parkAndCheckInterrupt返回true時,lockInterruptibly直接throw new InterruptedException()。
分析完公平鎖的實現,還剩下非公平鎖,主要區別是獲取鎖的過程不一樣。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
在NonfairSync的lock方法裏,第一步直接嘗試將state修改成1,很明顯,這是搶先獲取鎖的過程。若是修改state失敗,則和公平鎖同樣,調用acquire。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
nonfairTryAcquire和tryAcquire乍一看幾乎同樣,差別只是缺乏調用hasQueuedPredecessors。這點體驗出公平鎖和非公平鎖的不一樣,公平鎖會關注隊列裏排隊的狀況,老老實實按照FIFO的次序;非公平鎖只要有機會就搶佔,才無論排隊的事。
從ReentrantLock的實現完整分析了AQS的獨佔功能,總的來說並不複雜。別忘了AQS還有共享功能,下一篇是--分析CountDownLatch的實現原理。
做者:展翅而飛 連接:https://www.jianshu.com/p/fe027772e156 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。