java
我: 能夠。 一、AQS(AbstractQueuedSynchronizer)即隊列同步器,它是構建鎖或者其餘同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC併發包的 做者Doug Lea指望它成爲實現大部分同步需求的基礎,然而如他所料,AQS是JUC併發包的核心基礎組件。 二、AQS解決了在實現同步器時涉及的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器能夠帶來不少好處。它不只可以極大的減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。在基於AQS構建的同步器中,只能在一個時刻發生阻塞,從而下降上下文切換的開銷,提升了吞吐量。同時在設計AQS時充分考慮了可伸縮性。所以JUC中,全部基於AQS構建的同步器都可以得到這個優點。 三、AQS的主要使用方式是繼承,子類經過繼承同步器,並實現它的抽象方法來管理同步狀態。AQS使用一個int類型的成員變量state來表示同步狀態: 一、當state>0時,表示已經獲取了鎖。 二、當state=0時,表示釋放了鎖。 它提供了三個方法,來對同步狀態state進行操做,而且AQS能夠確保對state的操做時安全的: getState(); setState(int newState); compareAndSetState(int expect, int update); 四、另外,AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做。 (1)若是當前線程獲取同步狀態(鎖)失敗時,AQS則會將當前線程以及等待狀態等信息構造一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程。 (2)當同步狀態(鎖)釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。node
面試官:你說下AQS都提供了哪些有用的方法?web
我:AQS主要提供了以下方法(API): 一、getState():返回同步狀態的值。 二、setState(int newState):設置當前同步狀態。 三、compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法能保證狀態設置的原子性。 四、tryAcquire(int arg):獨佔式獲取同步狀態,獲取同步狀態成功後,其餘線程須要等待該線程釋放同步狀態才能獲取同步狀態。 五、tryRelease(int arg):獨佔式釋放同步狀態。 六、tryAcquireShared(int arg):共享式獲取同步狀態,返回值若是大於等於0,表示獲取成功,不然獲取失敗。 七、tryReleaseShared(int arg):共享式釋放同步狀態。 八、isHeldExclusively():當前同步器是否在獨佔式模式下被線程佔用,通常該方法表示是否被當前線程所獨佔。 九、acquire(int arg):獨佔式獲取同步狀態。若是當前線程獲取同步狀態成功,則由該方法返回,不然將會進入同步隊列等待。該方法將會調用可重寫的tryAcquire(int arg)方法。 十、acquireInterruptibly(int arg):與acquire(int arg)相同,可是該方法響應中斷。當前線程爲獲取到同步狀態而進入到同步隊列中,若是當前線程被中斷,則該方法拋出 InterruptedException異常並返回。 十一、tryAcquireNanos(int arg, long nanos):超時獲取同步狀態。若是當前線程在 nanos 時間內沒有獲取到同步狀態,那麼將會返回 false ,已經獲取則返回 true 。 十二、acquireShared(int arg):共享式獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨佔式的主要區別是在同一時刻能夠有多個線程獲取到同步狀態; 1三、acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷。 1四、tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增長超時限制。 1五、release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態以後,將同步隊列中第一個節點包含的線程喚醒。 1六、releaseShared(int arg):共享式釋放同步狀態。面試
面試官:你剛纔提到AQS的的內部維護了一個FIFO隊列,你能講下這個隊列嗎?設計模式
我:該隊列就是CLH隊列。CLH隊列是一個FIFO雙向隊列(學過數據結構的都應該瞭解),AQS依賴它來完成同步狀態的管理。 一、當前線程若是獲取同步狀態失敗時,AQS會將當前線程已等待狀態等信息構形成一個節點(Node)將其加入到CLH同步隊列中,同時會阻塞當前線程。 二、當同步狀態釋放時,會把首節點喚醒(公平鎖),先進的先出,使其再次嘗試獲取同步狀態。緩存
我:在CLH同步隊列中,一個節點(Node)表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next)。其定義以下: Node是AbstractQueuedSynchronizer的靜態內部類:安全
static final class Node { // 共享 static final Node SHARED = new Node(); // 獨佔 static final Node EXCLUSIVE = null; /** * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態 */ static final int CANCELLED = 1; /** * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 */ static final int SIGNAL = -1; /** * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步狀態獲取,將會無條件地傳播下去 */ static final int PROPAGATE = -3; /** 等待狀態 */ volatile int waitStatus; /** 前驅節點,當節點添加到同步隊列時被設置(尾部添加) */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 等待隊列中的後續節點。若是當前節點是共享的,那麼字段將是一個 SHARED 常量,也就是說節點類型(獨佔和共享)和等待隊列中的後續節點共用同一個字段 */ Node nextWaiter; /** 獲取同步狀態的線程 */ volatile Thread thread; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
說明: 一、waitStatus字段:等待狀態,用來控制線程的阻塞和喚醒,有5種狀態:INITAL/CANCELLED/SINGAL/CONDITION/PROPAGATE。 二、thread字段:Node節點對應的線程Thread。 三、nextWaiter字段:Node節點獲取同步狀態的模型。tryAcquire(int args)和tryAcquireShared(int args)方法,分別爲獨佔式和共享式獲取同步狀態。在獲取失敗時, 它們都會調用addWaiter(Node mode)方法入隊。而nextWaiter就是用來表示哪一種模式: SHARED:枚舉共享模式,EXCLUSIVE:枚舉獨佔模式。 四、predecessor()方法:獲取Node節點的前一個Node節點。在方法內部,Node p = prev的本地拷貝是爲了不併髮狀況下,prev判斷完==null時,剛好被修改,從而保證線程安全。
面試官:那CLH隊列是怎麼進行入隊和出隊操做的呢數據結構
我: 學習數據結構的咱們都知道,CLH入隊很簡單,以下圖所示。 一、tail指向新節點。 二、新節點的rev指向當前最後的節點。 三、當前最後一個節點的next指向入隊的節點。多線程
實際上,入隊邏輯實現的addWaiter(Node)方法,須要考慮併發狀況。它經過CAS方式,來保證正確的添加Node。代碼以下:併發
1: private Node addWaiter(Node mode) { 2: // 新建節點 3: Node node = new Node(Thread.currentThread(), mode); 4: // 記錄原尾節點 5: Node pred = tail; 6: // 快速嘗試,添加新節點爲尾節點 7: if (pred != null) { 8: // 設置新 Node 節點的尾節點爲原尾節點 9: node.prev = pred; 10: // CAS 設置新的尾節點 11: if (compareAndSetTail(pred, node)) { 12: // 成功,原尾節點的下一個節點爲新節點 13: pred.next = node; 14: return node; 15: } 16: } 17: // 失敗,屢次嘗試,直到成功 18: enq(node); 19: return node; 20: }
出隊:CLH同步隊列遵循FIFO,首節點的線程釋放同步狀態後,將會喚醒它的下一個節點(Node.next)。然後繼節點將會在獲取同步狀態成功時,將本身設置爲首節點(head)。 這個過程很是簡單,head執行該節點並斷開原首節點的next和當前節點的prev便可。注意:在這個過程當中是不須要使用CAS來保證的,由於只有一個線程,可以成功獲取到同步狀態。 setHead(Node node)方法,實現上述的出隊邏輯,以下圖所示:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
面試官:你能跟我說下AQS是怎麼獲取和釋放同步狀態的呢?
我:前面說到,AQS的設計模式是模板方法模式,子類經過繼承的方式,實現它的抽象方法來管理同步狀態,AQS提供了大量模板方法實現同步,主要分爲三類:獨佔式獲取和釋放同步狀態、共享式獲取和釋放同步狀態、查詢同步隊列中的等待線程狀況。 一、獨佔式 :同一時刻僅有一個線程持有同步狀態。 獨佔式獲取同步狀態
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
一、tryAcquire:嘗試獲取鎖,獲取成功則設置鎖狀態並返回true,不然返回false。該方法自定義同步組件本身實現,該方法必需要保證線程安全的獲取同步狀態。 二、addWaiter:若是tryAcquire返回false(獲取同步狀態失敗),則調用該方法將當前線程加入到CLH同步隊列尾部。 三、acquireQueued:當前線程會根據公平性原則來進行自旋,直至獲取鎖爲止。 四、selfInterrupt:產生一箇中斷 五、下面看下acquireQueued方法:這個方法爲一個自旋的過程,當前線程(Node)進入同步隊列後,就會進入一個自旋的過程,每一個節點都會自省的觀察,當條件知足獲取到同步狀態後,就能夠退出自旋的過程。從下面代碼中能夠看到,當前線程會一直嘗試獲取同步狀態,固然前提是隻有其前驅節點爲頭結點纔可以嘗試獲取同步狀態,理由:保持FIFO同步隊列原則。頭節點釋放同步狀態後,將會喚醒其後繼節點,後繼節點被喚醒後須要檢查本身是否爲頭節點。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //中斷標誌 boolean interrupted = false; /* * 自旋過程,其實就是一個死循環而已 */ for (;;) { //當前線程的前驅節點 final Node p = node.predecessor(); //當前線程的前驅節點是頭結點,且同步狀態成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //獲取失敗,線程等待--具體後面介紹 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
還有獨佔式獲取響應中斷和獨佔式超時獲取的方法,這裏就不詳細描述了。 獨佔式釋放同步狀態 當線程獲取同步狀態後,執行完相應邏輯後就須要釋放同步狀態。AQS提供了release(int arg)方法釋放同步狀態:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
該方法一樣是調用tryRelease(int arg)方法來釋放同步狀態,釋放成功後,會調用unparkSuccessor(Node node)方法喚醒後繼節點。 總結:在AQS內部維護了一個FIFO同步隊列,當線程獲取同步狀態失敗後,則會加入到這個CLH隊列的隊尾並一直保持着自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅節點是否爲首節點, 若是是首節點就不斷嘗試獲取同步狀態,獲取成功就退出CLH同步隊列。當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。 二、共享式 共享式與獨佔式最主要區別在於同一時刻獨佔式只能有一個線程獲取同步狀態,而共享式在同一時刻能夠有多個線程獲取同步狀態。例如讀操做能夠多個線程同時讀,寫操做同一時刻只能有一個線程寫。 共享式獲取同步狀態 AQS提供acquireShared(int arg)方法共享式獲取同步狀態:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //獲取失敗,自旋獲取同步狀態 doAcquireShared(arg); }
上述方法是先調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,獲取失敗就調用doAcquireShared(int arg)自旋獲取同步狀態。
private void doAcquireShared(int arg) { /共享式節點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //前驅節點 final Node p = node.predecessor(); //若是其前驅節點,獲取同步狀態 if (p == head) { //嘗試獲取同步 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
addWaiter(Node.SHARED)這裏先把當前線程加入CLH同步隊列的隊尾,而後循環(自旋)嘗試獲取同步狀態:node.predecessor()表示當前節點的前驅結點,if (p ==head)若是前驅結點 是首節點的話,則調用tryAcquireShared(int args)方法嘗試獲取同步狀態,獲取成功(r >=0)就退出循環(自旋),在退出前喚醒下一個等待的節點(也就是設置下一個節點的前驅節點爲首節點)。 共享式釋放同步狀態
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
面試官:從你上面列舉的源碼裏,我看到shouldParkAfterFailedAcquire這個方法,貌似是用來阻塞線程的,你能說下AQS是怎麼阻塞和喚醒線程的嗎?
我:在上面講到,在線程獲取同步狀態時失敗後,則加入CLH同步隊列,經過自旋方式不斷獲取同步狀態,可是在自旋過程當中則須要判斷當前線程是否須要阻塞。在獲取同步狀態 失敗後,線程並非立刻阻塞,須要檢查該線程的狀態,檢查方法爲shouldParkAfterFailedAcquire(Node pre, Node node),該方法主要靠前驅結點判斷當前線程是否應該被阻塞。 例如上面源碼的一部分:
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驅節點 int ws = pred.waitStatus; //狀態爲signal,表示當前線程處於等待狀態,直接放回true if (ws == Node.SIGNAL) return true; //前驅節點狀態 > 0 ,則爲Cancelled,代表該節點已經超時或者被中斷了,須要從同步隊列中取消 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //前驅節點狀態爲Condition、propagate else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這段代碼主要檢查當前線程是否須要被阻塞,具體規則以下: 一、若是當前線程的前驅節點狀態爲SINNAL,表示當前線程須要被阻塞,調用unpark()方法喚醒,直接返回true,當前線程阻塞。 二、若是當前線程的前驅節點狀態爲CANCELLED(ws >0),則表示該線程的前驅節點已經等待超時或者被中斷了,則須要從CLH隊列中將該前驅節點刪除掉,直到回溯到前驅節點狀態<=0,返回false。 三、若是前驅節點非SINNAL,非CANCELLED,則經過CAS的方式將其前驅節點設置爲SINNAL,返回false。若是shouldParkAfterFailedAcquire(Node pre, Node node)方法返回false,則調用parkAndCheckInterrupt()方法阻塞當前線程。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt()方法主要是把當前線程掛起,從而阻塞住線程的調用棧,同時返回當前線程的中斷狀態。其內部則是調用LOckSupport工具類的park()方法來阻塞。
面試官:下面咱們聊聊AQS在JUC裏的應用-ReentrantLock吧。你能說下對ReentrantLock的瞭解嗎?
我:ReentrantLock是可重入鎖,是一種遞歸無阻塞的同步機制。它能夠等同於synchronized的使用,可是提供了比synchronized更強大、靈活的機制,能夠減小死鎖的機率。 一、ReentrantLock將由最近成功獲取鎖定而且還沒釋放該鎖定的線程所擁有。若是鎖定沒有被線程佔有,調用lock的線程將成功獲取鎖。若是當前線程已經擁有該鎖,再調用lock()方法會當即返回。 二、Reentrant提供了公平鎖和非公平鎖的選擇。構造方法接收一個可選的公平參數(true表示公平鎖,不然爲非公平鎖)。公平鎖和非公平鎖的區別在於公平鎖的鎖獲取是有順序的。可是公平鎖的效率每每沒有非公平鎖高,在多線程訪問的狀況下,公平鎖表現出較低的吞吐量。 下面來看看源碼吧 一、lock方法
public void lock() { sync.lock(); }
Sync爲ReentrantLock裏的一個內部類,它繼承AQS,它有兩個子類:公平鎖FairSync和非公平鎖NonFairSync。ReenTrantLock裏面大部分的功能都是委託給Sync來實現的,同時Sync 定義了lock()抽象方法由其子類來實現,默認實現了nonfairTryAcquire(int acquires)方法。 咱們來看非公平鎖的lock方法
final void lock() { //嘗試獲取鎖 if (compareAndSetState(0,1)) setExclusiveOwnerThread(Thread.currentThread()); else //獲取失敗,調用AQS的acquire(int arg)方法 acquire(1); }
首先嚐試獲取鎖,若是獲取成功,設置鎖被當前線程獨佔。若是獲取失敗,則調用acquire(1),該方法定義在AQS中,以下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這裏先調用tryAcquire(int arg),在AQS中講過,這個方法須要同步組件本身實現。在NonfairSync中的實現見下:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取同步狀態 int c = getState(); //state==0 表示該鎖處於空閒狀態 if (c == 0) { //用CAS方式佔用該鎖 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //判斷鎖持有的線程是否爲當前線程 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //成功得到鎖的線程再次得到鎖,增長同步狀態 setState(nextc); return true; } return false; }
二、釋放鎖unlock方法
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { //減掉releases int c = getState() - releases; //若是釋放的不是持有鎖的線程,拋異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //state == 0 表示已經釋放徹底了,其餘線程能夠獲取同步狀態了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //從新設置同步狀態的值 setState(c); return free; }
unlock()內部使用Sync的release(int arg)釋放鎖,release(int arg)是在AQS中定義的。tryRelease也是須要同步組件本身實現。若是釋放成功,再判斷首節點後面還有等待同步狀態的線程,則調用unparkSuccessor(Node node)方法喚醒下一個線程。
面試官:那麼ReentrantLock和Synchronized有什麼異同呢?
我:首先他們確定具備相同的功能和內存語義。不一樣之處在於如下幾點: 一、與synchronized相比,ReentrantLock提供了更多更加全面的功能,具有更強的擴展性。例如時間鎖等候,可中斷鎖等候和鎖投票。 二、ReentrantLock還提供了條件Condition,對線程的等待喚醒操做更加詳細靈活,因此在多個條件變量和高度競爭鎖的地方,ReentrantLock更加適合。 三、ReentrantLock提供了可輪詢的鎖請求,它會嘗試去獲取鎖,若是成功則繼續,不然等到下次運行時處理,而synchronized則一旦進入鎖請求要麼成功要麼阻塞, 因此相對於synchronized來講,ReentrantLock會不容易死鎖些。 四、ReentrantLock支持更加靈活的同步代碼塊,可是使用synchronized時,只能在一個synchronized塊結構中獲取和釋放。 五、ReentrantLock支持中斷處理,且性能相對好一些。
面試官:那你用過讀寫鎖ReentrantReadWirteLock嗎?能介紹下嗎?
我:直接來看源碼吧。ReentrantReadWrite的前面幾行很簡單,看下Sync類,先看下Sync的全部屬性:
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; /* 下面這塊就是將state一分爲二,高16位用於共享模式,低16位用於獨佔模式。 */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 取c的高16位值,表明讀鎖的獲取次數 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 取c的低16位值,表明寫鎖的衝入次數,由於寫鎖是獨佔模式 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } /** 這個靜態內部類的實例用來記錄每一個線程持有的讀鎖數量(讀鎖重入) */ static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } /** ThreadLocal的子類 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** 組合使用上面兩個類,用一個ThreadLocal來記錄當前線程持有的讀鎖數量 */ private transient ThreadLocalHoldCounter readHolds; /** 用於緩存,記錄「最後一個獲取讀鎖的線程」的讀鎖重入次數,因此無論哪一個線程獲取到讀鎖後,就把這個值佔用,這樣 就不用到ThreadLocal中查詢map了。在獲取-釋放讀鎖的這段時間,若是沒有其餘線程獲取讀鎖的話,此緩存能夠幫助提升性能。 */ private transient HoldCounter cachedHoldCounter; /** 第一個獲取讀鎖的線程(而且其未獲取讀鎖),以及它持有的讀鎖數量。 */ private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { //初始化readHolds這個ThreadLocal屬性 readHolds = new ThreadLocalHoldCounter(); //爲了保證readHolds內存可見性 setState(getState()); // ensures visibility of readHolds }
ReentrantReadWriteLock與ReentrantLock同樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的,因此ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖 和寫入鎖的方式上不同。它的讀寫鎖其實就是兩個類ReadLock和WriteLock。在ReentrantLock中使用一個int類型的state來表示同步狀態,該值表示鎖被一個線程重複獲取的次數。可是讀寫鎖ReentrantReadWriteLock內部維護着一對鎖,須要用一個變量維護多種狀態,因此讀寫鎖採用「按位切割使用」的方式來維護這個變量,將其切分爲兩部分,高16位表示讀,低16位表示寫。分割以後,經過位運算肯定讀鎖和寫鎖的狀態。假如當前同步狀態爲S,那麼寫狀態=S & 0x0000FFFF(將高16位抹去),讀狀態=S >>>16(無符號補0右移16位)。
我:下面看下寫鎖的獲取。寫鎖就是一個支持可重入的排他鎖。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //當前鎖個數 int c = getState(); //寫鎖 int w = exclusiveCount(c); if (c != 0) { // c !=0 && w == 0表示存在讀鎖,當前線程不是已經獲取寫鎖的線程 if (w == 0 || current != getExclusiveOwnerThread()) return false; //超出最大範圍 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 嘗試獲取寫鎖 setState(c + acquires); return true; } //是否須要阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //設置獲取鎖的線程爲當前線程 setExclusiveOwnerThread(current); return true; }
我:下面看下寫鎖的釋放
protected final boolean tryRelease(int releases) { //若是釋放的線程不是鎖的持有者,拋異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //同步狀態更新 int nextc = getState() - releases; //若寫鎖的新線程數爲0,則將鎖的持有者設置爲null boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
我:下面看下讀鎖的獲取
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { //當前線程 Thread current = Thread.currentThread(); //鎖的個數 int c = getState(); //計算寫鎖,若是存在寫鎖且鎖的持有者不是當前線程,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //計算讀鎖 int r = sharedCount(c); //readerShouldBlock:讀鎖是否須要等待(公平鎖原則),且小於最大線程數,且CAS設置讀取鎖狀態成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { 若是鎖沒有被任何線程獲取,那麼當前線程就是第一個獲取讀鎖的線程 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } //若是獲取讀鎖的線程爲第一次獲取讀鎖的線程,則 firstReaderHoldCount重入數+1 else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
我:下面看下讀鎖的釋放
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { //當前線程 Thread current = Thread.currentThread(); //若是想要釋放鎖的線程爲第一個獲取鎖的線程 if (firstReader == current) { // 僅獲取了一次,則須要將firstReader設置爲Null,不然-1 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } //獲取rh對象,並更新「當前線程獲取鎖的信息」 else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //CAS更新同步狀態 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
我:在讀鎖獲取鎖和釋放鎖的過程當中,能夠看到一個變量rh(HoldCounter),該變量在讀鎖中很重要。爲了更好理解HoldCounter,咱們暫且認爲它不是一個鎖的機率,而至關於一個計數器。一次共享鎖的操做就至關於在該計數器的操做。獲取共享鎖,則該計數器+1,釋放共享鎖,該計數器-1。只有當線程獲取共享鎖後才能對共享鎖進行釋放、重入操做,因此HoldCounter的做用就是當前線程持有共享鎖的數量,這個數量必須與線程綁定在一塊兒,不然操做其餘線程鎖就會拋出異常。
/** HoldConter定義比較簡單,就是一個計數器count和線程id兩個變量。 */ static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } /** 經過ThreadLocal,HoldCounter就能夠與線程綁定了,故而,HoldCounter應該就是綁定線程上的一個計數器,而ThreadLocalHoldCounter則是線程綁定的ThreadLocal。 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
面試官:你能說下線程之間的同步除了Object的wait/notify,還有其餘什麼方法嗎?
我:還有Condition。Lock提供了條件Condition,對線程的等待喚醒操做更加詳細和靈活。Condition是一種廣義上的條件隊列。他爲線程提供了一種更爲靈活的等待/通知模式, 線程在調用await方法後執行掛起操做,直到線程等待的某個條件爲真時纔會被喚醒。Condition必需要配合鎖一塊兒使用,由於對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,所以Condition通常都是做爲Lock的內部實現。一張圖對比Condition與Object的監視器方法。
面試官:Condition提供了哪些方法來阻塞和喚醒線程?
我:Condition提供了一系列的方法來阻塞和喚醒線程: 一、await():形成當前線程在接到信號或被中斷以前一直處於等待狀態。 二、awiat(long time, TimeUnit unit):形成當前線程在接到信號、被中斷或到達執行等待時間之間一直處於等待狀態。 三、awaitNanos(long nanosTimeout):形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。返回值表示剩餘時間,若是在 nanosTimeout以前喚醒,那麼返回值=nanosTimeout-消耗時間,若是返回值<=0,則能夠認定它已經超時了。 四、awaitUninterruptibly():形成當前線程在接到信號以前一直處於等待狀態。(該方法對中斷不敏感)。 五、awaitUntil(Date deadline):形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。若是沒有到指定時間就被通知,則返回true, 不然表示到了指定時間,返回false。 六、signal():喚醒一個等待線程。該線程從等待方法返回前必須得到與Condition相關的鎖。 七、signalAll():喚醒全部等待線程。可以從等待方法返回的線程必須得到與Condition相關的鎖。
面試官:Condition是怎麼實現線程的阻塞和喚醒的?(原理)
我:先看下源碼.獲取一個Condition必須經過Lock的newCondition方法,該方法定義在接口Lock下,返回的結果是綁定到此Lock實例的新Condition實例。 Condition爲一個接口,僅有一個實現類ConditionObject。ConditionObject又是AQS的一個內部類。 在ReentrantLock中
public Condition newCondition() { return sync.newCondition(); }
在Sync中
final ConditionObject newCondition() { return new ConditionObject(); }
在AbstractQueuedSynchronizer中:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject() { }
我接着說: AQS等待隊列與Condition隊列是兩個相互獨立的隊列。 一、await()就是在當前線程持有鎖的基礎上釋放鎖資源,並新建Condition節點加入到Condition的隊列尾部,阻塞當前線程。 二、signal()就是將Condition的頭結點移動到AQS等待節點尾部,讓其等待再次獲取鎖。 如下是AQS與Condition隊列的出入節點的示意圖,能夠經過這幾張圖看出線程節點在兩個隊列中的出入關係和條件。 一、初始化狀態:AQS等待隊列有3個Node,Condition隊列有1個Node(也可能一個都沒有)。
二、節點1執行Condition.await() (1)將head後移。 (2)釋放節點1的鎖,並從AQS等待隊列中移除。 (3)將節點1加入到Condition的等待隊列中。 (4)更新lastWaiter爲節點1。
三、節點2執行Condition.signal()操做 (5)將firstWaiter後移。 (6)將節點4移除Condition隊列。 (7)將節點4加入到AQS的等待隊列中去。 (8)更新AQS的等待隊列的tail。
面試官:說來容易,還得實踐,你能用Condition實現下生產者消費者嗎?
我:拿起了筆,花了五分鐘寫了個Demo:
/** * Condition實現簡單的生產者消費者 */ public class ConditionDemo { private LinkedList<String> buffer; // 容器 private int maxSize; //容量 private Lock lock; private Condition fullCondition; private Condition notFullCondition; ConditionDemo(int maxSize) { this.maxSize = maxSize; buffer = new LinkedList<>(); lock = new ReentrantLock(); fullCondition = lock.newCondition(); notFullCondition = lock.newCondition(); } /** * 生產者 * @param produceStr * @throws InterruptedException */ public void set(String produceStr) throws InterruptedException { //得到鎖 lock.lock(); try { while (maxSize == buffer.size()) { notFullCondition.await(); } buffer.add(produceStr); fullCondition.signal(); } finally { //釋放鎖 lock.unlock(); } } /** * 消費者 * @return * @throws InterruptedException */ public String get() throws InterruptedException { String consumeStr; lock.lock(); try { while (buffer.size() == 0) { fullCondition.await(); } consumeStr = buffer.pollFirst(); notFullCondition.signal(); } finally { lock.unlock(); } return consumeStr; } }
你們有什麼要說的,歡迎在評論區留言
一、點贊+評論(勾選「同時轉發」)