阿里巴巴2021版JDK源碼筆記(2月第三版).pdfjava
連接:https://pan.baidu.com/s/1XhVcfbGTpU83snOZVu8AXg
提取碼:l3gynode
當一個線程調用 object.lock()拿到鎖,進入互斥區後,再次調用object.lock(), 仍然能夠拿到該鎖(不然會死鎖)數組
lock.java安全
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
Sync是一個抽象類,它有兩個子類FairSync與NonfairSync,分別 對應公平鎖和非公平鎖多線程
Sync的父類AbstractQueuedSynchronizer常常被稱做隊列同步器 (AQS),這個類很是關鍵ide
AbstractOwnableSynchronizer具備阻塞線程的做用,爲了實現一把具備阻塞和喚醒功能的鎖,須要一下核心要素:函數
針對1,2ui
針對3this
在Unsafe類中,提供了阻塞或喚醒線程的一對操做原語,也就是park/unpark線程
LockSupport對其作了簡單的封裝
public class LockSupport { public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } }
在當前線程中調用park(),該線程就會被阻塞;在另一個線 程中,調用unpark(Thread t),傳入一個被阻塞的線程,就能夠喚醒阻塞在park()地方的線程
尤爲是 unpark(Thread t),它實現了一個線程對另一個線程 的「精準喚醒」
針對4
在AQS中利用雙向鏈表和CAS實現了一個阻塞隊列。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; } private transient volatile Node head; private transient volatile Node tail; }
FairSync 公平鎖
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//等於0,資源空閒,能夠拿到鎖 if (!hasQueuedPredecessors() && //判斷是否存在等待隊列或者當前線程是不是隊頭 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//被鎖了,可是當前線程就是已經獲取鎖了(重入鎖),state+1 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
NonfairSync 非公平鎖
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平鎖和非公平鎖的區別:
公平鎖就多了這塊代碼 !hasQueuedPredecessors()
,看源碼
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
這裏其實就是判斷當前線程是否能夠被公平的執行(隊列爲空,或者當前在隊頭的時候表示到當前線程處理了)
AQS類中,有嘗試拿鎖的方法
public final void acquire(int arg) { if (!tryAcquire(arg) && //這裏嘗試去拿鎖,沒有拿到鎖才執行下一個條件 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //將當前線程入隊進入等待 //addWaiter就是將線程加入到隊列中, //acquireQueued該線程被阻塞。在該函數返回 的一刻,就是拿到鎖的那一刻,也就是被喚醒的那一刻,此時會刪除隊列的第一個元素(head指針前移1個節點) selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//這裏會阻塞住,直到拿到鎖 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
unlock不區分公平仍是非公平
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; //只有鎖的擁有者才能夠釋放鎖 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //這裏須要考慮重入鎖 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
release()裏面作了兩件事:tryRelease(..)函數釋放鎖;unparkSuccessor(..)函數喚醒隊列中的後繼者。
當parkAndCheckInterrupt()返回true的時候,說明有其餘線程發送中斷信號,直接拋出InterruptedException,跳出for循環,整個函數返回。
tryLock()實現基於調用非公平鎖的tryAcquire(..),對state進行CAS操做,若是操做成功就拿到鎖;若是操做不成功則直接返回false,也不阻塞
和互斥鎖相比,讀寫鎖(ReentrantReadWriteLock)就是讀線程 和讀線程之間能夠不用互斥了。在正式介紹原理以前,先看一下相關類的繼承體系。
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
當使用 ReadWriteLock 的時候,並非直接使用,而是得到其內部的讀鎖和寫鎖,而後分別調用lock/unlock。
public static void main(String[] args) { ReadWriteLock rwlock = new ReentrantReadWriteLock(); Lock rlock = rwlock.readLock(); rlock.lock(); rlock.unlock(); Lock wlock = rwlock.writeLock(); wlock.lock(); wlock.unlock(); }
從表面來看,ReadLock和WriteLock是兩把鎖,實際上它只是同一 把鎖的兩個視圖而已
兩個視圖: 能夠理解爲是一把鎖,線程分紅兩類:讀線程和寫線程。讀線程和讀線程之間不互斥(能夠同時拿到這把鎖),讀線程和寫線程互斥,寫線程和寫線程也互斥。
readerLock和writerLock實際共 用同一個sync對象
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
同互斥鎖同樣,讀寫鎖也是用state變量來表示鎖狀態的。只是state變量在這裏的含義和互斥鎖徹底不一樣
是把 state 變量拆成兩半,低16位,用來記錄寫鎖,高16位,用來「讀」鎖。但同一 時間既然只能有一個線程寫,爲何還須要16位呢?這是由於一個寫 線程可能屢次重入
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
爲何要把一個int類型變量拆成兩半, 而不是用兩個int型變量分別表示讀鎖和寫鎖的狀態呢?這是由於沒法 用一次CAS 同時操做兩個int變量,因此用了一個int型的高16位和低16位分別表示讀鎖和寫鎖的狀態。
當state=0時,說明既沒有線程持有讀鎖,也沒有線程持有寫鎖; 當state!=0時,要麼有線程持有讀鎖,要麼有線程持有寫鎖,二者不 能同時成立,由於讀和寫互斥。
ReentrantReadWriteLock的兩個內部類ReadLock和WriteLock中,是如何使用state變量的
acquire/release、acquireShared/releaseShared 是AQS裏面的 兩對模板方法。互斥鎖和讀寫鎖的寫鎖都是基於acquire/release模板 方法來實現的。讀寫鎖的讀鎖是基於acquireShared/releaseShared這對模板方法來實現的
將讀/寫、公平/非公平進行排列組合,就有4種組合
對於公平,比較容易理解,不管是讀鎖,仍是寫鎖,只要隊列中 有其餘線程在排隊(排隊等讀鎖,或者排隊等寫鎖),就不能直接去搶鎖,要排在隊列尾部。
對於非公平,讀鎖和寫鎖的實現策略略有差別。先說寫鎖,寫線 程能搶鎖,前提是state=0,只有在沒有其餘線程持有讀鎖或寫鎖的情 況下,它纔有機會去搶鎖。或者state!=0,但那個持有寫鎖的線程是 它本身,再次重入。寫線程是非公平的,就是無論三七二十一就去搶,即一直返回false。
由於讀線程和讀線程是不互斥的,假設當前線程被讀線程持有,而後其餘讀線程還非公平地一直去搶,可能致使寫線程永遠拿不到鎖,所 以對於讀線程的非公平,要作一些「約束」
當發現隊列的第1個元素 是寫線程的時候,讀線程也要阻塞一下,不能「肆無忌憚」地直接去搶
寫鎖是排他鎖,實現策略相似於互斥鎖,重寫了tryAcquire/tryRelease方法。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc);//寫鎖是排他的 return free; }
讀鎖是共享鎖,重寫了 tryAcquireShared/tryReleaseShared 方法,其實現策略和排他鎖有很大的差別。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && //寫鎖被某線程持有,且不是本身,讀鎖確定拿不到,直接返回 getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() &&//公平和非公平的差別 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//高位讀鎖+1 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
由於讀鎖是共享鎖,多個線程會同時持有讀鎖,因此對讀鎖的釋 放不能直接減1,而是須要經過一個for循環+CAS操做不斷重試。這是tryReleaseShared和tryRelease的根本差別所在。
Condition自己也是一個接口,其功能和wait/notify相似
public interface Condition { void await() throws InterruptedException; void signal(); void signalAll(); }
在講多線程基礎的時候,強調wait()/notify()必須和synchronized一塊兒使用,Condition也是如此,必須和Lock一塊兒使用。所以,在Lock的接口中,有一個與Condition相關的接口:
public interface Lock { Condition newCondition(); }
爲一個用數組實現的阻塞 隊列,執行put(..)操做的時候,隊列滿了,生成者線程被阻塞;執行take()操做的時候,隊列爲空,消費者線程被阻塞。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //核心就是一把鎖,兩個條件 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } }
使用很簡潔,避免了 wait/notify 的生成者通知生 成者、消費者通知消費者的問題。
由於Condition必須和Lock一塊兒使用,因此Condition的實現也是Lock的一部分
public final void await() throws InterruptedException { if (Thread.interrupted())//正要執行await操做,收到了中斷信號,拋出異常 throw new InterruptedException(); Node node = addConditionWaiter();//加入condition等待隊列 long savedState = fullyRelease(node);//阻塞在condition以前必須釋放鎖,不然會釋放鎖 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this);//本身阻塞本身 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//從新拿鎖 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
與await()不一樣,awaitUninterruptibly()不會響應中斷,其 函數的定義中不會有中斷異常拋出,下面分析其實現和await()的區別
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); long savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted())//從park中醒來,收到中斷,不退出,繼續執行循環 interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
能夠看出,總體代碼和 await()相似,區別在於收到異常後,不會拋出異常,而是繼續執行while循環。
public final void signal() { if (!isHeldExclusively())//只有持有鎖的隊列才能夠調用signal throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) {//喚醒隊列的第一個線程 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node);//先把Node放入互斥鎖的同步隊列中,再調用下面的unpark方法 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
JDK8引入
StampedLock引入了「樂觀讀」策略,讀的時候不加讀鎖,讀出來發現數據被修改 了,再升級爲「悲觀讀」,至關於下降了「讀」的地位,把搶鎖的天平往「寫」的一方傾斜了一下,避免寫線程被餓死。
public class Point { private double x, y; private final StampedLock s1 = new StampedLock(); void move(double deltaX, double deltaY) { //多個線程調用,修改x,y的值 long stamp = s1.writeLock(); try { x = deltaX; y = deltaY; } finally { s1.unlock(stamp); } } double distanceFromOrigin() { long stamp = s1.tryOptimisticRead();//使用樂觀鎖 double currentX = x, currentY = y; if (!s1.validate(stamp)) { /** * 上面這三行關鍵代碼對順序很是敏感,不能有重排序。 因 * 爲 state 變量已是volatile,因此能夠禁止重排序,但stamp並 不是volatile的。 * 爲此,在validate(stamp)函數裏面插入內存屏 障。 */ stamp = s1.readLock();//升級悲觀鎖 try { currentX = x; currentY = y; } finally { s1.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } }
StampedLock是一個讀寫鎖,所以也會像讀寫鎖那樣,把一 個state變量分紅兩半,分別表示讀鎖和寫鎖的狀態。同時,它還須要 一個數據的version。但正如前面所說,一次CAS沒有辦法操做兩個變 量,因此這個state變量自己同時也表示了數據的version。下面先分析state變量。
同ReadWriteLock同樣,StampedLock也要進行悲觀的讀鎖和寫鎖 操做。不過,它不是基於AQS實現的,而是內部從新實現了一個阻塞隊列
public class StampedLock implements java.io.Serializable { static final class WNode { volatile WNode prev; volatile WNode next; volatile WNode cowait; // list of linked readers volatile Thread thread; // non-null while possibly parked volatile int status; // 0, WAITING, or CANCELLED final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } } }
這個阻塞隊列和 AQS 裏面的很像。剛開始的時候,whead=wtail=NULL,而後初始化,建一個空節點,whead和wtail都指向這個空節 點,以後往裏面加入一個個讀線程或寫線程節點。但基於這個阻塞隊 列實現的鎖的調度策略和AQS很不同,也就是「自旋」。在AQS裏 面,當一個線程CAS state失敗以後,會當即加入阻塞隊列,而且進入 阻塞狀態。但在StampedLock中,CAS state失敗以後,會不斷自旋, 自旋足夠多的次數以後,若是還拿不到鎖,才進入阻塞狀態。爲此, 根據CPU的核數,定義了自旋次數的常量值。若是是單核的CPU,確定不能自旋,在多核狀況下,才採用自旋策略。