JUC鎖框架_AbstractQueuedSynchronizer詳細分析

 

 

AQS是JUC鎖框架中最重要的類,經過它來實現獨佔鎖和共享鎖的。本章是對AbstractQueuedSynchronizer源碼的徹底解析,分爲四個部分介紹:java

  1. CLH隊列即同步隊列:儲存着全部等待鎖的線程
  2. 獨佔鎖
  3. 共享鎖
  4. Condition條件

注: 還有一個AbstractQueuedLongSynchronizer類,它與AQS功能和實現幾乎同樣,惟一不一樣的是AQLS中表明鎖被獲取次數的成員變量state類型是long長整類型,而AQS中該成員變量是int類型。node

一. CLH隊列(線程同步隊列)

由於獲取鎖是有條件的,沒有獲取鎖的線程就要阻塞等待,那麼就要存儲這些等待的線程。安全

在AQS中咱們使用CLH隊列儲存這些等待的線程,但它並非直接儲存線程,而是儲存擁有線程的node節點。因此先介紹重要內部類Node。多線程

1.1 內部類Node

static final class Node { // 共享模式的標記 static final Node SHARED = new Node(); // 獨佔模式的標記 static final Node EXCLUSIVE = null; // waitStatus變量的值,標誌着線程被取消 static final int CANCELLED = 1; // waitStatus變量的值,標誌着後繼線程(即隊列中此節點以後的節點)須要被阻塞.(用於獨佔鎖) static final int SIGNAL = -1; // waitStatus變量的值,標誌着線程在Condition條件上等待阻塞.(用於Condition的await等待) static final int CONDITION = -2; // waitStatus變量的值,標誌着下一個acquireShared方法線程應該被容許。(用於共享鎖) static final int PROPAGATE = -3; // 標記着當前節點的狀態,默認狀態是0, 小於0的狀態都是有特殊做用,大於0的狀態表示已取消 volatile int waitStatus; // prev和next實現一個雙向鏈表 volatile Node prev; volatile Node next; // 該節點擁有的線程 volatile Thread thread; // 可能有兩種做用:1. 表示下一個在Condition條件上等待的節點 // 2. 表示是共享模式或者獨佔模式,注意第一種狀況節點必定是共享模式 Node nextWaiter; // 是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一個節點prev,若是爲null,則拋出NullPointerException異常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 用於建立鏈表頭head,或者共享模式SHARED Node() { } // 使用在addWaiter方法中 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 使用在Condition條件中 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } 

重要的成員屬性:app

  1. waitStatus: 表示當前節點的狀態,默認狀態是0。總共有五個值CANCELLED、SIGNAL、CONDITION、PROPAGATE以及0。
  2. prev和next:記錄着當前節點前一個節點和後一個節點的引用。
  3. thread:當前節點擁有的線程。當擁有鎖的線程釋放鎖的時候,可能會調用LockSupport.unpark(thread),喚醒這個被阻塞的線程。
  4. nextWaiter:若是是SHARED,表示當前節點是共享模式,若是是null,當前節點是獨佔模式,若是是其餘值,當前節點也是獨佔模式,不過這個值也是Condition隊列的下一個節點。

注意:經過Node咱們能夠實現兩個隊列,一是經過prev和next實現CLH隊列(線程同步隊列,雙向隊列),二是nextWaiter實現Condition條件上的等待線程隊列(單向隊列),後一個咱們在Condition中介紹。框架

1.2 操做CLH隊列

1.2.1 存儲CLH隊列

// CLH隊列頭 private transient volatile Node head; // CLH隊列尾 private transient volatile Node tail; 

1.2.2 設置CLH隊列頭head

/** * 經過CAS函數設置head值,僅僅在enq方法中調用 */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } 

這個方法只在enq方法中調用,經過CAS函數設置head值,保證多線程安全ide

// 從新設置隊列頭head,它只在acquire系列的方法中調用 private void setHead(Node node) { head = node; // 線程也沒有意義了,由於該線程已經獲取到鎖了 node.thread = null; // 前一個節點已經沒有意義了 node.prev = null; } 

這個方法只在acquire系列的方法中調用,從新設置head,表示移除一些等待線程節點。函數

1.2.3 設置CLH隊列尾tail

/** * 經過CAS函數設置tail值,僅僅在enq方法中調用 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } 

這個方法只在enq方法中調用,經過CAS函數設置tail值,保證多線程安全oop

1.2.4 將一個節點插入到CLH隊列尾

// 向隊列尾插入新節點,若是隊列沒有初始化,就先初始化。返回原先的隊列尾節點 private Node enq(final Node node) { for (;;) { Node t = tail; // t爲null,表示隊列爲空,先初始化隊列 if (t == null) { // 採用CAS函數即原子操做方式,設置隊列頭head值。 // 若是成功,再將head值賦值給鏈表尾tail。若是失敗,表示head值已經被其餘線程,那麼就進入循環下一次 if (compareAndSetHead(new Node())) tail = head; } else { // 新添加的node節點的前一個節點prev指向原來的隊列尾tail node.prev = t; // 採用CAS函數即原子操做方式,設置新隊列尾tail值。 if (compareAndSetTail(t, node)) { // 設置老的隊列尾tail的下一個節點next指向新添加的節點node t.next = node; return t; } } } } 

這個方法向CLH隊列尾插入一個新節點,若是隊列爲空,就先建立隊列再插入新節點,返回老的隊列尾節點。ui

1.2.5 將當前線程添加到CLH隊列尾

// 經過給定的模式mode(獨佔或者共享)爲當前線程建立新節點,並插入隊列中 private Node addWaiter(Node mode) { // 爲當前線程建立新的節點 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若是隊列已經建立,就將新節點插入隊列尾。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是隊列沒有建立,經過enq方法建立隊列,並插入新的節點。 enq(node); return node; } 

爲當前線程建立一個新節點,再插入到CLH隊列尾,返回新建立的節點。

二. 獨佔鎖

咱們先想想獨佔鎖的功能是什麼?

獨佔鎖至少有兩個功能:

  1. 獲取鎖的功能。 當多個線程一塊兒獲取鎖的時候,只有一個線程能獲取到鎖,其餘線程必須在當前位置阻塞等待。
  2. 釋放鎖的功能。獲取鎖的線程釋放鎖資源,並且還必須能喚醒正在等待鎖資源的一個線程。
    帶着這些疑惑,咱們來看AQS中是怎麼實現的。

2.1 獲取獨佔鎖的方法

2.1.1 acquire方法

/** * 獲取獨佔鎖。若是沒有獲取到,線程就會阻塞等待,直到獲取鎖。不會響應中斷異常 * @param arg */ public final void acquire(int arg) { // 1. 先調用tryAcquire方法,嘗試獲取獨佔鎖,返回true,表示獲取到鎖,不須要執行acquireQueued方法。 // 2. 調用acquireQueued方法,先調用addWaiter方法爲當前線程建立一個節點node,並插入隊列中, // 而後調用acquireQueued方法去獲取鎖,若是不成功,就會讓當前線程阻塞,當鎖釋放時纔會被喚醒。 // acquireQueued方法返回值表示在線程等待過程當中,是否有另外一個線程調用該線程的interrupt方法,發起中斷。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 

將這個方法一下格式,你們就很好理解了

public final void acquire(int arg) { // 1.先調用tryAcquire方法,嘗試獲取獨佔鎖,返回true則直接返回 if (tryAcquire(arg)) return; // 2. 調用addWaiter方法爲當前線程建立一個節點node,並插入隊列中 Node node = addWaiter(Node.EXCLUSIVE); // 調用acquireQueued方法去獲取鎖, // acquireQueued方法返回值表示在線程等待過程當中,是否有另外一個線程調用該線程的interrupt方法,發起中斷。 boolean interrupted = acquireQueued(node, arg); // 若是interrupted爲true,則當前線程要發起中斷請求 if (interrupted) { selfInterrupt(); } } 

2.1.2 tryAcquire方法

// 嘗試去獲取獨佔鎖,當即返回。若是返回true表示獲取鎖成功。 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 

若是子類想實現獨佔鎖,則必須重寫這個方法,不然拋出異常。這個方法的做用是當前線程嘗試獲取鎖,若是獲取到鎖,就會返回true,並更改鎖資源。沒有獲取到鎖返回false。

注:這個方法是當即返回的,不會阻塞當前線程

下面是ReentrantLock中FairSync的tryAcquire方法實現

// 嘗試獲取鎖,與非公平鎖最大的不一樣就是調用hasQueuedPredecessors()方法 // hasQueuedPredecessors方法返回true,表示等待線程隊列中有一個線程在當前線程以前, // 根據公平鎖的規則,當前線程不能獲取鎖。 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取鎖的記錄狀態 int c = getState(); // 若是c==0表示當前鎖是空閒的 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 判斷當前線程是否是獨佔鎖的線程 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 更改鎖的記錄狀態 setState(nextc); return true; } return false; } 

2.1.3 acquireQueued方法

addWaiter方法已經在上面講解了。acquireQueued方法做用就是獲取鎖,若是沒有獲取到,就讓當前線程阻塞等待。

/** * 想要獲取鎖的 acquire系列方法,都會這個方法來獲取鎖 * 循環經過tryAcquire方法不斷去獲取鎖,若是沒有獲取成功, * 就有可能調用parkAndCheckInterrupt方法,讓當前線程阻塞 * @param node 想要獲取鎖的節點 * @param arg * @return 返回true,表示在線程等待的過程當中,線程被中斷了 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 表示線程在等待過程當中,是否被中斷了 boolean interrupted = false; // 經過死循環,直到node節點的線程獲取到鎖,才返回 for (;;) { // 獲取node的前一個節點 final Node p = node.predecessor(); // 若是前一個節點是隊列頭head,而且嘗試獲取鎖成功 // 那麼當前線程就不須要阻塞等待,繼續執行 if (p == head && tryAcquire(arg)) { // 將節點node設置爲新的隊列頭 setHead(node); // help GC p.next = null; // 不須要調用cancelAcquire方法 failed = false; return interrupted; } // 當p節點的狀態是Node.SIGNAL時,就會調用parkAndCheckInterrupt方法,阻塞node線程 // node線程被阻塞,有兩種方式喚醒, // 1.是在unparkSuccessor(Node node)方法,會喚醒被阻塞的node線程,返回false // 2.node線程被調用了interrupt方法,線程被喚醒,返回true // 在這裏只是簡單地將interrupted = true,沒有跳出for的死循環,繼續嘗試獲取鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed爲true,表示發生異常,非正常退出 // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了。 if (failed) cancelAcquire(node); } } 

主要流程:

  1. 經過for (;;)死循環,直到node節點的線程獲取到鎖,才跳出循環。
  2. 獲取node節點的前一個節點p。
  3. 當前一個節點p時CLH隊列頭節點時,調用tryAcquire方法嘗試去獲取鎖,若是獲取成功,就將節點node設置成CLH隊列頭節點(至關於移除節點node和以前的節點)而後return返回。
    注意:只有當node節點的前一個節點是隊列頭節點時,纔會嘗試獲取鎖,因此獲取鎖是有順序的,按照添加到CLH隊列時的順序。
  4. 調用shouldParkAfterFailedAcquire方法,來決定是否要阻塞當前線程。
  5. 調用parkAndCheckInterrupt方法,阻塞當前線程。
  6. 若是當前線程發生異常,非正常退出,那麼會在finally模塊中調用cancelAcquire(node)方法,取消當前節點狀態。

注意:這裏當嘗試獲取鎖失敗時,並無當即阻塞當前線程,可是由於在for (;;)死循環裏,會繼續循環,方法不會返回。

2.1.4 shouldParkAfterFailedAcquire方法

這個方法的返回值決定是否要阻塞當前線程

/** * 根據前一個節點pred的狀態,來判斷當前線程是否應該被阻塞 * @param pred : node節點的前一個節點 * @param node * @return 返回true 表示當前線程應該被阻塞,以後應該會調用parkAndCheckInterrupt方法來阻塞當前線程 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 若是前一個pred的狀態是Node.SIGNAL,那麼直接返回true,當前線程應該被阻塞 return true; if (ws > 0) { // 若是前一個節點狀態是Node.CANCELLED(大於0就是CANCELLED), // 表示前一個節點所在線程已經被喚醒了,要從CLH隊列中移除CANCELLED的節點。 // 因此從pred節點一直向前查找直到找到不是CANCELLED狀態的節點,並把它賦值給node.prev, // 表示node節點的前一個節點已經改變。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 此時前一個節點pred的狀態只能是0或者PROPAGATE,不多是CONDITION狀態 // CONDITION(這個是特殊狀態,只在condition列表中節點中存在,CLH隊列中不存在這個狀態的節點) // 將前一個節點pred的狀態設置成Node.SIGNAL,這樣在下一次循環時,就是直接阻塞當前線程 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 

咱們發現是根據前一個節點的狀態,來決定是否阻塞當前線程。而前一個節點狀態是在哪裏改變的呢?驚奇地發現也是在這個方法中改變的。

  1. 若是前一個節點狀態是Node.SIGNAL,那麼直接返回true,阻塞當前線程
  2. 若是前一個節點狀態是Node.CANCELLED(大於0就是CANCELLED),表示前一個節點所在線程已經被喚醒了,要從CLH隊列中移除CANCELLED的節點。因此從pred節點一直向前查找直到找到不是CANCELLED狀態的節點。
    並把它賦值給node.prev,表示node節點的前一個節點已經改變。在acquireQueued方法中進行下一次循環。
  3. 不是前面兩種狀態,那麼就將前一個節點狀態設置成Node.SIGNAL,表示須要阻塞當前線程,這樣再下一次循環時,就會直接阻塞當前線程。

2.1.5 parkAndCheckInterrupt 方法

阻塞當前線程,線程被喚醒後返回當前線程中斷狀態

/** * 阻塞當前線程,線程被喚醒後返回當前線程中斷狀態 */ private final boolean parkAndCheckInterrupt() { // 經過LockSupport.park方法,阻塞當前線程 LockSupport.park(this); // 當前線程被喚醒後,返回當前線程中斷狀態 return Thread.interrupted(); } 

經過LockSupport.park(this)阻塞當前線程。

2.1.6 cancelAcquire方法

將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了。

// 將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了。 private void cancelAcquire(Node node) { // 若是node爲null,就直接返回 if (node == null) return; // node.thread = null; // 跳過那些已取消的節點,在隊列中找到在node節點前面的第一次狀態不是已取消的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 記錄pred原來的下一個節點,用於CAS函數更新時使用 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 將node節點狀態設置爲已取消Node.CANCELLED; node.waitStatus = Node.CANCELLED; // 若是node節點是隊列尾節點,那麼就將pred節點設置爲新的隊列尾節點 if (node == tail && compareAndSetTail(node, pred)) { // 而且設置pred節點的下一個節點next爲null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } 

2.1.7 小結

  1. tryAcquire方法嘗試獲取獨佔鎖,由子類實現
  2. acquireQueued方法,方法內採用for死循環,先調用tryAcquire方法,嘗試獲取鎖,若是成功,則跳出循環方法返回。 若是失敗,就可能阻塞當前線程。當別的線程鎖釋放的時候,可能會喚醒這個線程,而後再次進行循環判斷,調用tryAcquire方法,嘗試獲取鎖。
  3. 若是發生異常,該線程被喚醒,因此要取消節點node的狀態,由於節點node所在線程不是在阻塞狀態了。

注:還有其餘獲取獨佔鎖的方法,例如doAcquireInterruptibly、doAcquireNanos都在文章最後的源碼解析中,這裏就不作解析了,具體原理都差很少。

2.2 釋放獨佔鎖的方法

2.2.1 release方法

// 在獨佔鎖模式下,釋放鎖的操做 public final boolean release(int arg) { // 調用tryRelease方法,嘗試去釋放鎖,由子類具體實現 if (tryRelease(arg)) { Node h = head; // 若是隊列頭節點的狀態不是0,那麼隊列中就可能存在須要喚醒的等待節點。 // 還記得咱們在acquireQueued(final Node node, int arg)獲取鎖的方法中,若是節點node沒有獲取到鎖, // 那麼咱們會將節點node的前一個節點狀態設置爲Node.SIGNAL,而後調用parkAndCheckInterrupt方法 // 將節點node所在線程阻塞。 // 在這裏就是經過unparkSuccessor方法,進而調用LockSupport.unpark(s.thread)方法,喚醒被阻塞的線程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 
  1. 調用tryRelease方法釋放鎖資源,返回true表示鎖資源徹底釋放了,返回false表示還持有鎖資源。
  2. 若是鎖資源徹底被釋放了,就要喚醒等待鎖資源的線程。調用unparkSuccessor方法喚醒一個等待線程
    注:CLH隊列頭節點h爲null,表示隊列爲空,沒有節點。節點h的狀態是0,表示沒有CLH隊列中沒有被阻塞的線程。

2.2.2 tryRelease方法

// 嘗試去釋放當前線程持有的獨佔鎖,當即返回。若是返回true表示釋放鎖成功 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } 

若是子類想實現獨佔鎖,則必須重寫這個方法,不然拋出異常。做用是釋放當前線程持有的鎖,返回true表示已經徹底釋放鎖資源,返回false,表示還持有鎖資源。

注:對於獨佔鎖來講,同一時間只能有一個線程持有這個鎖,可是這個線程能夠重複地獲取鎖,由於被鎖住的模塊,再次進入另外一個被這個鎖鎖住的模塊,是容許的。這個就作可重入性,因此對於可重入的鎖釋放操做,也須要屢次。

下面是ReentrantLock中Sync的tryRelease方法實現

protected final boolean tryRelease(int releases) { // c表示新的鎖的記錄狀態 int c = getState() - releases; // 若是當前線程不是獨佔鎖的線程,就拋出IllegalMonitorStateException異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 標誌是否能夠釋放鎖 boolean free = false; // 當新的鎖的記錄狀態爲0時,表示能夠釋放鎖 if (c == 0) { free = true; // 設置獨佔鎖的線程爲null setExclusiveOwnerThread(null); } setState(c); return free; } 

2.2.3 unparkSuccessor方法

// 喚醒node節點的下一個非取消狀態的節點所在線程(即waitStatus<=0) private void unparkSuccessor(Node node) { // 獲取node節點的狀態 int ws = node.waitStatus; // 若是小於0,就將狀態從新設置爲0,表示這個node節點已經完成了 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下一個節點 Node s = node.next; // 若是下一個節點爲null,或者狀態是已取消,那麼就要尋找下一個非取消狀態的節點 if (s == null || s.waitStatus > 0) { // 先將s設置爲null,s不是非取消狀態的節點 s = null; // 從隊列尾向前遍歷,直到遍歷到node節點 for (Node t = tail; t != null && t != node; t = t.prev) // 由於是從後向前遍歷,因此不斷覆蓋找到的值,這樣才能獲得node節點後下一個非取消狀態的節點 if (t.waitStatus <= 0) s = t; } // 若是s不爲null,表示存在非取消狀態的節點。那麼調用LockSupport.unpark方法,喚醒這個節點的線程 if (s != null) LockSupport.unpark(s.thread); } 

這個方法的做用是喚醒node節點的下一個非取消狀態的節點所在線程。

  1. 將node節點的狀態設置爲0
  2. 尋找到下一個非取消狀態的節點s
  3. 若是節點s不爲null,則調用LockSupport.unpark(s.thread)方法喚醒s所在線程。
    注:喚醒線程也是有順序的,就是添加到CLH隊列線程的順序。

2.2.4 小結

  1. 調用tryRelease方法去釋放當前持有的鎖資源。
  2. 若是徹底釋放了鎖資源,那麼就調用unparkSuccessor方法,去喚醒一個等待鎖的線程。

三. 共享鎖

共享鎖與獨佔鎖相比,共享鎖可能被多個線程共同持有

3.1 獲取共享鎖的方法

3.1.1 acquireShared方法

// 獲取共享鎖 public final void acquireShared(int arg) { // 嘗試去獲取共享鎖,若是返回值小於0表示獲取共享鎖失敗 if (tryAcquireShared(arg) < 0) // 調用doAcquireShared方法去獲取共享鎖 doAcquireShared(arg); } 

調用tryAcquireShared方法嘗試獲取共享鎖,若是返回值小於0表示獲取共享鎖失敗.則繼續調用doAcquireShared方法獲取共享鎖。

3.1.2 tryAcquireShared方法

// 嘗試去獲取共享鎖,當即返回。返回值大於等於0,表示獲取共享鎖成功 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } 

若是子類想實現共享鎖,則必須重寫這個方法,不然拋出異常。做用是嘗試獲取共享鎖,返回值大於等於0,表示獲取共享鎖成功。

下面是ReentrantReadWriteLock中Sync的tryReleaseShared方法實現,這個咱們會在ReentrantReadWriteLock章節中重點介紹的。

protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 當前線程是第一個獲取讀鎖(共享鎖)的線程 if (firstReader == current) { // 將firstReaderHoldCount減一,若是就是1,那麼表示該線程須要釋放讀鎖(共享鎖), // 將firstReader設置爲null if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; // 獲取當前線程的HoldCounter變量 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); // 將rh變量的count減一, int count = rh.count; if (count <= 1) { readHolds.remove(); // count <= 0表示當前線程就沒有獲取到讀鎖(共享鎖),這裏釋放就拋出異常。 if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // 由於讀鎖是利用高16位儲存的,低16位的數據是要屏蔽的, // 因此這裏減去SHARED_UNIT(65536),至關於減一 // 表示一個讀鎖已經釋放 int nextc = c - SHARED_UNIT; // 利用CAS函數從新設置state值 if (compareAndSetState(c, nextc)) return nextc == 0; } } 

3.1.3 doAcquireShared方法

/** * 獲取共享鎖,獲取失敗,則會阻塞當前線程,直到獲取共享鎖返回 * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 爲當前線程建立共享鎖節點node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 若是節點node前一個節點是同步隊列頭節點。就會調用tryAcquireShared方法嘗試獲取共享鎖 if (p == head) { int r = tryAcquireShared(arg); // 若是返回值大於0,表示獲取共享鎖成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 若是節點p的狀態是Node.SIGNAL,就是調用parkAndCheckInterrupt方法阻塞當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed爲true,表示發生異常, // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了 if (failed) cancelAcquire(node); } } 

這個方法與獨佔鎖的acquireQueued方法相比較,不一樣的有三點:

  1. doAcquireShared方法,調用addWaiter(Node.SHARED)方法,爲當前線程建立一個共享模式的節點node。而acquireQueued方法是由外部傳遞來的。
  2. doAcquireShared方法沒有返回值,acquireQueued方法會返回布爾類型的值,是當前線程中斷標誌位值
  3. 最大的區別是從新設置CLH隊列頭的方法不同。doAcquireShared方法調用setHeadAndPropagate方法,而acquireQueued方法調用setHead方法。

3.1.4 setHeadAndPropagate方法

// 從新設置CLH隊列頭,若是CLH隊列頭的下一個節點爲null或者共享模式, // 那麼就要喚醒共享鎖上等待的線程 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設置新的同步隊列頭head setHead(node); // 若是propagate大於0, if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取新的CLH隊列頭的下一個節點s Node s = node.next; // 若是節點s是空或者共享模式節點,那麼就要喚醒共享鎖上等待的線程 if (s == null || s.isShared()) doReleaseShared(); } } 

3.2 釋放共享鎖的方法

3.2.1 releaseShared方法

// 釋放共享鎖 public final boolean releaseShared(int arg) { // 嘗試釋放共享鎖 if (tryReleaseShared(arg)) { // 喚醒等待共享鎖的線程 doReleaseShared(); return true; } return false; } 

3.2.2 tryReleaseShared方法

// 嘗試去釋放共享鎖 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } 

若是子類想實現共享鎖,則必須重寫這個方法,不然拋出異常。做用是釋放當前線程持有的鎖,返回true表示已經徹底釋放鎖資源,返回false,表示還持有鎖資源。

3.2.3 doReleaseShared方法

// 會喚醒等待共享鎖的線程 private void doReleaseShared() { for (;;) { // 將同步隊列頭賦值給節點h Node h = head; // 若是節點h不爲null,且不等於同步隊列尾 if (h != null && h != tail) { // 獲得節點h的狀態 int ws = h.waitStatus; // 若是狀態是Node.SIGNAL,就要喚醒節點h後繼節點的線程 if (ws == Node.SIGNAL) { // 將節點h的狀態設置成0,若是設置失敗,就繼續循環,再試一次。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒節點h後繼節點的線程 unparkSuccessor(h); } // 若是節點h的狀態是0,就設置ws的狀態是PROPAGATE。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是同步隊列頭head節點發生改變,繼續循環, // 若是沒有改變,就跳出循環 if (h == head) break; } } 

四. Condition條件

Condition是爲了實現線程之間相互等待的問題。注意Condition對象只能在獨佔鎖中才能使用。

考慮一下狀況,有兩個線程,生產者線程,消費者線程。當消費者線程消費東西時,發現沒有東西,這時它就要等待,讓生產者線程生產東西后,在通知它消費。
由於操做的是同一個資源,因此要加鎖,防止多線程衝突。而鎖在同一時間只能有一個線程持有,因此消費者在讓線程等待前,必須釋放鎖,且喚醒另外一個等待鎖的線程。
那麼在AQS中Condition條件又是如何實現的呢?

  1. 首先內部存在一個Condition隊列,存儲着全部在此Condition條件等待的線程。
  2. await系列方法:讓當前持有鎖的線程釋放鎖,並喚醒一個在CLH隊列上等待鎖的線程,再爲當前線程建立一個node節點,插入到Condition隊列(注意不是插入到CLH隊列中)
  3. signal系列方法:其實這裏沒有喚醒任何線程,而是將Condition隊列上的等待節點插入到CLH隊列中,因此當持有鎖的線程執行完畢釋放鎖時,就會喚醒CLH隊列中的一個線程,這個時候纔會喚醒線程。

4.1 await系列方法

4.1.1 await方法

/** * 讓當前持有鎖的線程阻塞等待,並釋放鎖。若是有中斷請求,則拋出InterruptedException異常 * @throws InterruptedException */ public final void await() throws InterruptedException { // 若是當前線程中斷標誌位是true,就拋出InterruptedException異常 if (Thread.interrupted()) throw new InterruptedException(); // 爲當前線程建立新的Node節點,而且將這個節點插入到Condition隊列中了 Node node = addConditionWaiter(); // 釋放當前線程佔有的鎖,並喚醒CLH隊列一個等待線程 int savedState = fullyRelease(node); int interruptMode = 0; // 若是節點node不在同步隊列中(注意不是Condition隊列) while (!isOnSyncQueue(node)) { // 阻塞當前線程,那麼怎麼喚醒這個線程呢? // 首先咱們必須調用signal或者signalAll將這個節點node加入到同步隊列。 // 只有這樣unparkSuccessor(Node node)方法,纔有可能喚醒被阻塞的線程 LockSupport.park(this); // 若是當前線程產生中斷請求,就跳出循環 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 若是節點node已經在同步隊列中了,獲取同步鎖,只有獲得鎖才能繼續執行,不然線程繼續阻塞等待 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除Condition隊列中狀態不是Node.CONDITION的節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 是否要拋出異常,或者發出中斷請求 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } 

方法流程:

  1. addConditionWaiter方法:爲當前線程建立新的Node節點,而且將這個節點插入到Condition隊列中了
  2. fullyRelease方法:釋放當前線程佔有的鎖,並喚醒CLH隊列一個等待線程
  3. isOnSyncQueue 方法:若是返回false,表示節點node不在CLH隊列中,即沒有調用過 signal系列方法,因此調用LockSupport.park(this)方法阻塞當前線程。
  4. 若是跳出while循環,表示節點node已經在CLH隊列中,那麼調用acquireQueued方法去獲取鎖。
  5. 清除Condition隊列中狀態不是Node.CONDITION的節點

4.1.2 addConditionWaiter方法

爲當前線程建立新的Node節點,而且將這個節點插入到Condition隊列中了

private Node addConditionWaiter() { Node t = lastWaiter; // 若是Condition隊列尾節點的狀態不是Node.CONDITION if (t != null && t.waitStatus != Node.CONDITION) { // 清除Condition隊列中,狀態不是Node.CONDITION的節點, // 而且可能會從新設置firstWaiter和lastWaiter unlinkCancelledWaiters(); // 從新將Condition隊列尾賦值給t t = lastWaiter; } // 爲當前線程建立一個狀態爲Node.CONDITION的節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是t爲null,表示Condition隊列爲空,將node節點賦值給鏈表頭 if (t == null) firstWaiter = node; else // 將新節點node插入到Condition隊列尾 t.nextWaiter = node; // 將新節點node設置爲新的Condition隊列尾 lastWaiter = node; return node; } 

4.1.3 fullyRelease方法

釋放當前線程佔有的鎖,並喚醒CLH隊列一個等待線程

/** * 釋放當前線程佔有的鎖,並喚醒CLH隊列一個等待線程 * 若是失敗就拋出異常,設置node節點的狀態是Node.CANCELLED * @return */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 釋放當前線程佔有的鎖 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } 

4.1.4 isOnSyncQueue

節點node是否是在CLH隊列中

// 節點node是否是在CLH隊列中 final boolean isOnSyncQueue(Node node) { // 若是node的狀態是Node.CONDITION,或者node沒有前一個節點prev, // 那麼返回false,節點node不在同步隊列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node有下一個節點next,那麼它必定在同步隊列中 if (node.next != null) // If has successor, it must be on queue return true; // 從同步隊列中查找節點node return findNodeFromTail(node); } // 在同步隊列中從後向前查找節點node,若是找到返回true,不然返回false private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 

4.1.5 acquireQueued方法

獲取獨佔鎖,這個在獨佔鎖章節已經說過

4.1.6 unlinkCancelledWaiters 方法

清除Condition隊列中狀態不是Node.CONDITION的節點

private void unlinkCancelledWaiters() { // condition隊列頭賦值給t Node t = firstWaiter; // 這個trail節點,只是起輔助做用 Node trail = null; while (t != null) { //獲得下一個節點next。當節點是condition時候,nextWaiter表示condition隊列的下一個節點 Node next = t.nextWaiter; // 若是節點t的狀態不是CONDITION,那麼該節點就要從condition隊列中移除 if (t.waitStatus != Node.CONDITION) { // 將節點t的nextWaiter設置爲null t.nextWaiter = null; // 若是trail爲null,表示原先的condition隊列頭節點實效,須要設置新的condition隊列頭 if (trail == null) firstWaiter = next; else // 將節點t從condition隊列中移除,由於改變了引用的指向,從condition隊列中已經找不到節點t了 trail.nextWaiter = next; // 若是next爲null,表示原先的condition隊列尾節點也實效,從新設置隊列尾節點 if (next == null) lastWaiter = trail; } else // 遍歷到的有效節點 trail = t; // 將next賦值給t,遍歷完整個condition隊列 t = next; } } 

4.1.7 reportInterruptAfterWait方法

/** * 若是interruptMode是THROW_IE,就拋出InterruptedException異常 * 若是interruptMode是REINTERRUPT,則當前線程再發出中斷請求 * 不然就什麼都不作 */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } 

4.2 signal系列方法

4.2.1 signal方法

// 若是condition隊列不爲空,將condition隊列頭節點插入到同步隊列中 public final void signal() { // 若是當前線程不是獨佔鎖線程,就拋出IllegalMonitorStateException異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 將Condition隊列頭賦值給節點first Node first = firstWaiter; if (first != null) // 將Condition隊列中的first節點插入到CLH隊列中 doSignal(first); } 

若是condition隊列不爲空,就調用doSignal方法將condition隊列頭節點插入到CLH隊列中。

4.2.2 doSignal方法

// 將Condition隊列中的first節點插入到CLH隊列中 private void doSignal(Node first) { do { // 原先的Condition隊列頭節點取消,因此從新賦值Condition隊列頭節點 // 若是新的Condition隊列頭節點爲null,表示Condition隊列爲空了 // ,因此也要設置Condition隊列尾lastWaiter爲null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 取消first節點nextWaiter引用 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } 

爲何使用while循環,由於只有是Node.CONDITION狀態的節點才能插入CLH隊列,若是不是這個狀態,那麼循環Condition隊列下一個節點。

4.2.3 transferForSignal方法

// 返回true表示節點node插入到同步隊列中,返回false表示節點node沒有插入到同步隊列中 final boolean transferForSignal(Node node) { // 若是節點node的狀態不是Node.CONDITION,或者更新狀態失敗, // 說明該node節點已經插入到同步隊列中,因此直接返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 將節點node插入到同步隊列中,p是原先同步隊列尾節點,也是node節點的前一個節點 Node p = enq(node); int ws = p.waitStatus; // 若是前一個節點是已取消狀態,或者不能將它設置成Node.SIGNAL狀態。 // 就說明節點p以後也不會發起喚醒下一個node節點線程的操做, // 因此這裏直接調用 LockSupport.unpark(node.thread)方法,喚醒節點node所在線程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } 
  1. 狀態不是 Node.CONDITION的節點,是不能從Condition隊列中插入到CLH隊列中。直接返回false
  2. 調用enq方法,將節點node插入到同步隊列中,p是原先同步隊列尾節點,也是node節點的前一個節點
  3. 若是前一個節點是已取消狀態,或者不能將它設置成Node.SIGNAL狀態。那麼就要LockSupport.unpark(node.thread)方法喚醒node節點所在線程。

4.2.4 signalAll 方法

// 將condition隊列中全部的節點都插入到同步隊列中 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } 

4.2.5 doSignalAll方法

/** * 將condition隊列中全部的節點都插入到同步隊列中 * @param first condition隊列頭節點 */ private void doSignalAll(Node first) { // 表示將condition隊列設置爲空 lastWaiter = firstWaiter = null; do { // 獲得condition隊列的下一個節點 Node next = first.nextWaiter; first.nextWaiter = null; // 將節點first插入到同步隊列中 transferForSignal(first); first = next; // 循環遍歷condition隊列中全部的節點 } while (first != null); } 

循環遍歷整個condition隊列,調用transferForSignal方法,將節點插入到CLH隊列中。

4.3 小結

Condition只能使用在獨佔鎖中。它內部有一個Condition隊列記錄全部在Condition條件等待的線程(即就是調用await系列方法後等待的線程).
await系列方法:會讓當前線程釋放持有的鎖,並喚醒在CLH隊列上的一個等待鎖的線程,再將當前線程插入到Condition隊列中(注意不是CLH隊列)
signal系列方法:並非喚醒線程,而是將Condition隊列中的節點插入到CLH隊列中。

總結

使用AQS類來實現獨佔鎖和共享鎖:

  1. 內部有一個CLH隊列,用來記錄全部等待鎖的線程
  2. 經過 acquire系列方法用來獲取獨佔鎖,獲取失敗,則阻塞當前線程
  3. 經過release方法用來釋放獨佔鎖,釋放成功,則會喚醒一個等待獨佔鎖的線程。
  4. 經過acquireShared系列方法用來獲取共享鎖。
  5. 經過releaseShared方法用來釋放共享鎖。
  6. 經過Condition來實現線程之間相互等待的。

示例

import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; // 實現一個可重入的獨佔鎖, 必須複寫tryAcquire和tryRelease方法 class Sync extends AbstractQueuedSynchronizer { // 嘗試獲取獨佔鎖, 利用鎖的獲取次數state屬性 @Override protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取鎖的記錄狀態state int c = getState(); // 若是c==0表示當前鎖是空閒的 if (c == 0) { // 經過CAS原子操做方式設置鎖的狀態,若是爲true,表示當前線程獲取的鎖, // 爲false,鎖的狀態被其餘線程更改,當前線程獲取的鎖失敗 if (compareAndSetState(0, acquires)) { // 設置當前線程爲獨佔鎖的線程 setExclusiveOwnerThread(current); return true; } } // 判斷當前線程是否是獨佔鎖的線程,由於是可重入鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 更改鎖的記錄狀態 setState(nextc); return true; } return false; } // 釋放持有的獨佔鎖, 由於是可重入鎖,因此只有當c等於0的時候,表示當前持有鎖的徹底釋放了鎖。 @Override protected boolean tryRelease(int releases) { // c表示新的鎖的記錄狀態 int c = getState() - releases; // 若是當前線程不是獨佔鎖的線程,就拋出IllegalMonitorStateException異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 標誌是否能夠釋放鎖 boolean free = false; // 當新的鎖的記錄狀態爲0時,表示能夠釋放鎖 if (c == 0) { free = true; // 設置獨佔鎖的線程爲null setExclusiveOwnerThread(null); } setState(c); return free; } } public class AQSTest { public static void newThread(Sync sync, String name, int time) { new Thread(new Runnable() { @Override public void run() { System.out.println("線程"+Thread.currentThread().getName()+" 開始運行,準備獲取鎖"); // 經過acquire方法,獲取鎖,若是沒有獲取到,就等待 sync.acquire(1); try { System.out.println("====線程"+Thread.currentThread().getName()+" 在run方法中獲取了鎖"); lockAgain(); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println("----線程"+Thread.currentThread().getName()+" 在run方法中釋放了鎖"); sync.release(1); } } private void lockAgain() { sync.acquire(1); try { System.out.println("====線程"+Thread.currentThread().getName()+" 在lockAgain方法中再次獲取了鎖"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println("----線程"+Thread.currentThread().getName()+" 在lockAgain方法中釋放了鎖"); sync.release(1); } } },name).start(); } public static void main(String[] args) { Sync sync = new Sync(); newThread(sync, "t1111", 1000); newThread(sync, "t2222", 1000); newThread(sync, "t3333", 1000); } } 

利用AbstractQueuedSynchronizer簡單實現一個可重入的獨佔鎖。

  1. 要實現獨佔鎖,必須重寫tryAcquire和tryRelease方法。不然在獲取鎖和釋放鎖的時候,會拋出異常。
  2. 直接的調用AQS類的acquire(1)和release(1)方法獲取鎖和釋放鎖。

輸出結果是

線程t1111 開始運行,準備獲取鎖
====線程t1111 在run方法中獲取了鎖
====線程t1111  在lockAgain方法中再次獲取了鎖
線程t2222 開始運行,準備獲取鎖
線程t3333 開始運行,準備獲取鎖
----線程t1111 在lockAgain方法中釋放了鎖
----線程t1111 在run方法中釋放了鎖
====線程t2222 在run方法中獲取了鎖
====線程t2222  在lockAgain方法中再次獲取了鎖
----線程t2222 在lockAgain方法中釋放了鎖
----線程t2222 在run方法中釋放了鎖
====線程t3333 在run方法中獲取了鎖
====線程t3333  在lockAgain方法中再次獲取了鎖
----線程t3333 在lockAgain方法中釋放了鎖
----線程t3333 在run方法中釋放了鎖

附錄

package java.util.concurrent.locks; import sun.misc.Unsafe; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; protected AbstractQueuedSynchronizer() { } static final class Node { // 共享模式的標記 static final Node SHARED = new Node(); // 獨佔模式的標記 static final Node EXCLUSIVE = null; // waitStatus變量的值,標誌着線程被取消 static final int CANCELLED = 1; // waitStatus變量的值,標誌着後繼線程(即隊列中此節點以後的節點)須要被阻塞.(用於獨佔鎖) static final int SIGNAL = -1; // waitStatus變量的值,標誌着線程在Condition條件上等待阻塞.(用於Condition的await等待) static final int CONDITION = -2; // waitStatus變量的值,標誌着下一個acquireShared方法線程應該被容許。(用於共享鎖) static final int PROPAGATE = -3; // 標記着當前節點的狀態,默認狀態是0, 小於0的狀態都是有特殊做用,大於0的狀態表示已取消 volatile int waitStatus; // prev和next實現一個雙向鏈表 volatile Node prev; volatile Node next; // 該節點擁有的線程 volatile Thread thread; // 可能有兩種做用:1. 表示下一個在Condition條件上等待的節點 // 2. 表示是共享模式或者獨佔模式,注意第一種狀況節點必定是共享模式 Node nextWaiter; // 是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一個節點prev,若是爲null,則拋出NullPointerException異常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 用於建立鏈表頭head,或者共享模式SHARED Node() { } // 使用在addWaiter方法中 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 使用在Condition條件中 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } } // CLH隊列頭 private transient volatile Node head; // CLH隊列尾 private transient volatile Node tail; // 用來記錄當前鎖被獲取的次數,當state==0,表示尚未被任何線程獲取 private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } // 採用CAS函數。比較並交換函數,它是原子操做函數;即,經過CAS操做的數據都是以原子方式進行的 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // Queuing utilities static final long spinForTimeoutThreshold = 1000L; // 向隊列尾插入新節點,若是隊列沒有初始化,就先初始化。返回原先的隊列尾節點 private Node enq(final Node node) { for (;;) { Node t = tail; // t爲null,表示隊列爲空,先初始化隊列 if (t == null) { // 採用CAS函數即原子操做方式,設置隊列頭head值。 // 若是成功,再將head值賦值給鏈表尾tail。若是失敗,表示head值已經被其餘線程,那麼就進入循環下一次 if (compareAndSetHead(new Node())) tail = head; } else { // 新添加的node節點的前一個節點prev指向原來的隊列尾tail node.prev = t; // 採用CAS函數即原子操做方式,設置新隊列尾tail值。 if (compareAndSetTail(t, node)) { // 設置老的隊列尾tail的下一個節點next指向新添加的節點node t.next = node; return t; } } } } // 經過給定的模式mode(獨佔或者共享)爲當前線程建立新節點,並插入隊列中 private Node addWaiter(Node mode) { // 爲當前線程建立新的節點 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若是隊列已經建立,就將新節點插入隊列尾。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是隊列沒有建立,經過enq方法建立隊列,並插入新的節點。 enq(node); return node; } // 從新設置隊列頭head,它只在acquire系列的方法中調用 private void setHead(Node node) { head = node; // 線程也沒有意義了,由於該線程已經獲取到鎖了 node.thread = null; // 前一個節點已經沒有意義了 node.prev = null; } // 喚醒node節點的下一個非取消狀態的節點所在線程(即waitStatus<=0) private void unparkSuccessor(Node node) { // 獲取node節點的狀態 int ws = node.waitStatus; // 若是小於0,就將狀態從新設置爲0,表示這個node節點已經完成了 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下一個節點 Node s = node.next; // 若是下一個節點爲null,或者狀態是已取消,那麼就要尋找下一個非取消狀態的節點 if (s == null || s.waitStatus > 0) { // 先將s設置爲null,s不是非取消狀態的節點 s = null; // 從隊列尾向前遍歷,直到遍歷到node節點 for (Node t = tail; t != null && t != node; t = t.prev) // 由於是從後向前遍歷,因此不斷覆蓋找到的值,這樣才能獲得node節點後下一個非取消狀態的節點 if (t.waitStatus <= 0) s = t; } // 若是s不爲null,表示存在非取消狀態的節點。那麼調用LockSupport.unpark方法,喚醒這個節點的線程 if (s != null) LockSupport.unpark(s.thread); } // 會喚醒等待共享鎖的線程 private void doReleaseShared() { for (;;) { // 將同步隊列頭賦值給節點h Node h = head; // 若是節點h不爲null,且不等於同步隊列尾 if (h != null && h != tail) { // 獲得節點h的狀態 int ws = h.waitStatus; // 若是狀態是Node.SIGNAL,就要喚醒節點h後繼節點的線程 if (ws == Node.SIGNAL) { // 將節點h的狀態設置成0,若是設置失敗,就繼續循環,再試一次。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒節點h後繼節點的線程 unparkSuccessor(h); } // 若是節點h的狀態是0,就設置ws的狀態是PROPAGATE。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若是同步隊列頭head節點發生改變,繼續循環, // 若是沒有改變,就跳出循環 if (h == head) break; } } // 從新設置CLH隊列頭,若是CLH隊列頭的下一個節點爲null或者共享模式, // 那麼就要喚醒共享鎖上等待的線程 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設置新的同步隊列頭head setHead(node); // 若是propagate大於0, if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取新的CLH隊列頭的下一個節點s Node s = node.next; // 若是節點s是空或者共享模式節點,那麼就要喚醒共享鎖上等待的線程 if (s == null || s.isShared()) doReleaseShared(); } } // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了。 private void cancelAcquire(Node node) { // 若是node爲null,就直接返回 if (node == null) return; // node.thread = null; // 跳過那些已取消的節點,在隊列中找到在node節點前面的第一次狀態不是已取消的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 記錄pred原來的下一個節點,用於CAS函數更新時使用 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 將node節點狀態設置爲已取消Node.CANCELLED; node.waitStatus = Node.CANCELLED; // 若是node節點是隊列尾節點,那麼就將pred節點設置爲新的隊列尾節點 if (node == tail && compareAndSetTail(node, pred)) { // 而且設置pred節點的下一個節點next爲null compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * 根據前一個節點pred的狀態,來判斷當前線程是否應該被阻塞 * @param pred : node節點的前一個節點 * @param node * @return 返回true 表示當前線程應該被阻塞,以後應該會調用parkAndCheckInterrupt方法來阻塞當前線程 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 若是前一個pred的狀態是Node.SIGNAL,那麼直接返回true,當前線程應該被阻塞 return true; if (ws > 0) { // 若是前一個節點狀態是Node.CANCELLED(大於0就是CANCELLED), // 表示前一個節點所在線程已經被喚醒了,要從CLH隊列中移除CANCELLED的節點。 // 因此從pred節點一直向前查找直到找到不是CANCELLED狀態的節點,並把它賦值給node.prev, // 表示node節點的前一個節點已經改變。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 此時前一個節點pred的狀態只能是0或者PROPAGATE,不多是CONDITION狀態 // CONDITION(這個是特殊狀態,只在condition列表中節點中存在,CLH隊列中不存在這個狀態的節點) // 將前一個節點pred的狀態設置成Node.SIGNAL,這樣在下一次循環時,就是直接阻塞當前線程 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * 當前線程發出中斷通知 */ static void selfInterrupt() { Thread.currentThread().interrupt(); } /** * 阻塞當前線程,線程被喚醒後返回當前線程中斷狀態 */ private final boolean parkAndCheckInterrupt() { // 經過LockSupport.park方法,阻塞當前線程 LockSupport.park(this); // 當前線程被喚醒後,返回當前線程中斷狀態 return Thread.interrupted(); } /** * 想要獲取鎖的 acquire系列方法,都會這個方法來獲取鎖 * 循環經過tryAcquire方法不斷去獲取鎖,若是沒有獲取成功,就有可能調用parkAndCheckInterrupt方法,讓當前線程阻塞 * @param node 想要獲取鎖的節點 * @param arg * @return 返回true,表示在線程等待的過程當中,線程被中斷了 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 表示線程在等待過程當中,是否被中斷了 boolean interrupted = false; // 經過死循環,直到node節點的線程獲取到鎖,才返回 for (;;) { // 獲取node的前一個節點 final Node p = node.predecessor(); // 若是前一個節點是隊列頭head,而且嘗試獲取鎖成功 // 那麼當前線程就不須要阻塞等待,繼續執行 if (p == head && tryAcquire(arg)) { // 將節點node設置爲新的隊列頭 setHead(node); // help GC p.next = null; // 不須要調用cancelAcquire方法 failed = false; return interrupted; } // 當p節點的狀態是Node.SIGNAL時,就會調用parkAndCheckInterrupt方法,阻塞node線程 // node線程被阻塞,有兩種方式喚醒, // 1.是在unparkSuccessor(Node node)方法,會喚醒被阻塞的node線程,返回false // 2.node線程被調用了interrupt方法,線程被喚醒,返回true // 在這裏只是簡單地將interrupted = true,沒有跳出for的死循環,繼續嘗試獲取鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed爲true,表示發生異常, // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了。 if (failed) cancelAcquire(node); } } /** * 這個方法與acquireQueued(final Node node, int arg)方法流程幾乎同樣 * 只不過當parkAndCheckInterrupt返回true時,直接拋出異常。 * @param arg * @throws InterruptedException */ private void doAcquireInterruptibly(int arg) throws InterruptedException { // 爲當前線程建立節點node,並插入到隊列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 經過死循環,直到node節點的線程獲取到鎖,或者當前線程有中斷請求會拋出中斷異常 for (;;) { final Node p = node.predecessor(); // 若是前一個節點是隊列頭head,而且嘗試獲取鎖成功 // 將該節點node設置成隊列頭head if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 當p節點的狀態是Node.SIGNAL時,就會調用parkAndCheckInterrupt方法,阻塞node線程 // node線程被阻塞,有兩種方式喚醒, // 1.是在unparkSuccessor(Node node)方法,會喚醒被阻塞的node線程,返回false // 2.node線程被調用了interrupt方法,線程被喚醒,返回true // 在這裏若是parkAndCheckInterrupt返回true,就會拋出InterruptedException異常 // 跳出死循環,方法返回 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * 嘗試在必定的時間nanosTimeout內獲取鎖,超時了就返回false * */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 計算截止時間 final long deadline = System.nanoTime() + nanosTimeout; // 爲當前線程建立節點node,並插入到隊列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 若是前一個節點是隊列頭head,而且嘗試獲取鎖成功 // 將該節點node設置成隊列頭head if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 計算剩餘時間 nanosTimeout = deadline - System.nanoTime(); // 剩餘時間小於等於0,就直接返回false,獲取鎖失敗 if (nanosTimeout <= 0L) return false; // 當p節點的狀態是Node.SIGNAL時,調用LockSupport.parkNanos阻塞當前線程 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 當前線程阻塞nanosTimeout時間 LockSupport.parkNanos(this, nanosTimeout); // 若是當前線程中斷標誌位是true,拋出InterruptedException異常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { // failed爲true,表示發生異常, // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了 if (failed) cancelAcquire(node); } } /** * 獲取共享鎖,獲取失敗,則會阻塞當前線程,直到獲取共享鎖返回 * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 爲當前線程建立共享鎖節點node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 若是節點node前一個節點是同步隊列頭節點。就會調用tryAcquireShared方法嘗試獲取共享鎖 if (p == head) { int r = tryAcquireShared(arg); // 若是返回值大於0,表示獲取共享鎖成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 若是節點p的狀態是Node.SIGNAL,就是調用parkAndCheckInterrupt方法阻塞當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed爲true,表示發生異常, // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了 if (failed) cancelAcquire(node); } } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // failed爲true,表示發生異常, // 則將node節點的狀態設置成CANCELLED,表示node節點所在線程已取消,不須要喚醒了 if (failed) cancelAcquire(node); } } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // Main exported methods // 嘗試去獲取獨佔鎖,當即返回。若是返回true表示獲取鎖成功。 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 嘗試去釋放當前線程持有的獨佔鎖,當即返回。若是返回true表示釋放鎖成功 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 嘗試去獲取共享鎖,當即返回。返回值大於等於0,表示獲取共享鎖成功 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 嘗試去釋放共享鎖 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * 獲取獨佔鎖。若是沒有獲取到,線程就會阻塞等待,直到獲取鎖。不會響應中斷異常 * @param arg */ public final void acquire(int arg) { // 1. 先調用tryAcquire方法,嘗試獲取獨佔鎖,返回true,表示獲取到鎖,不須要執行acquireQueued方法。 // 2. 調用acquireQueued方法,先調用addWaiter方法爲當前線程建立一個節點node,並插入隊列中, // 而後調用acquireQueued方法去獲取鎖,若是不成功,就會讓當前線程阻塞,當鎖釋放時纔會被喚醒。 // acquireQueued方法返回值表示在線程等待過程當中,是否有另外一個線程調用該線程的interrupt方法,發起中斷。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // public final void acquire(int arg) { // // 1.先調用tryAcquire方法,嘗試獲取獨佔鎖,返回true則直接返回 // if (tryAcquire(arg)) return; // // 2. 調用addWaiter方法爲當前線程建立一個節點node,並插入隊列中 // Node node = addWaiter(Node.EXCLUSIVE); // // 調用acquireQueued方法去獲取鎖, // // acquireQueued方法返回值表示在線程等待過程當中,是否有另外一個線程調用該線程的interrupt方法,發起中斷。 // boolean interrupted = acquireQueued(node, arg); // // 若是interrupted爲true,則當前線程要發起中斷請求 // if (interrupted) { // selfInterrupt(); // } // } /** * 獲取獨佔鎖。若是沒有獲取到,線程就會阻塞等待,直到獲取鎖。若是有中斷請求就會產生中斷異常 * @param arg */ public final void acquireInterruptibly(int arg) throws InterruptedException { // 若是當前線程中斷標誌位是true,那麼直接拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); // 先調用tryAcquire方法,嘗試獲取獨佔鎖,返回true,表示獲取到鎖,不須要執行doAcquireInterruptibly方法。 if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * 嘗試在必定時間內獲取獨佔鎖。若是有中斷請求就會產生中斷異常。 * 返回true,表示獲取鎖成功 */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 若是當前線程中斷標誌位是true,那麼直接拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); // 先調用tryAcquire方法,嘗試獲取獨佔鎖,返回true,表示獲取到鎖。 // 不然調用doAcquireNanos方法獲取鎖 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } // 在獨佔鎖模式下,釋放鎖的操做 public final boolean release(int arg) { // 調用tryRelease方法,嘗試去釋放鎖,由子類具體實現 if (tryRelease(arg)) { Node h = head; // 若是隊列頭節點的狀態不是0,那麼隊列中就可能存在須要喚醒的等待節點。 // 還記得咱們在acquireQueued(final Node node, int arg)獲取鎖的方法中,若是節點node沒有獲取到鎖, // 那麼咱們會將節點node的前一個節點狀態設置爲Node.SIGNAL,而後調用parkAndCheckInterrupt方法 // 將節點node所在線程阻塞。 // 在這裏就是經過unparkSuccessor方法,進而調用LockSupport.unpark(s.thread)方法,喚醒被阻塞的線程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 獲取共享鎖 public final void acquireShared(int arg) { // 嘗試去獲取共享鎖,若是返回值小於0表示獲取共享鎖失敗 if (tryAcquireShared(arg) < 0) // 調用doAcquireShared方法去獲取共享鎖 doAcquireShared(arg); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } // 釋放共享鎖 public final boolean releaseShared(int arg) { // 嘗試釋放共享鎖 if (tryReleaseShared(arg)) { // 喚醒等待共享鎖的線程 doReleaseShared(); return true; } return false; } // Queue inspection methods public final boolean hasQueuedThreads() { return head != tail; } public final boolean hasContended() { return head != null; } public final Thread getFirstQueuedThread() { // handle only fast path, else relay return (head == tail) ? null : fullGetFirstQueuedThread(); } /** * Version of getFirstQueuedThread called when fastpath fails */ private Thread fullGetFirstQueuedThread() { Node h, s; Thread st; if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null)) return st; Node t = tail; Thread firstThread = null; while (t != null && t != head) { Thread tt = t.thread; if (tt != null) firstThread = tt; t = t.prev; } return firstThread; } public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) if (p.thread == thread) return true; return false; } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } // 返回false表示隊列爲null,或者當前線程節點在是隊列頭的下一個節點。 // 返回true表示有一個線程節點在當前線程節點以前 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } // Instrumentation and monitoring methods public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; } public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; } public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list; } public String toString() { int s = getState(); String q = hasQueuedThreads() ? "non" : ""; return super.toString() + "[State = " + s + ", " + q + "empty queue]"; } // Internal support methods for Conditions // 節點node是否是在CLH隊列中 final boolean isOnSyncQueue(Node node) { // 若是node的狀態是Node.CONDITION,或者node沒有前一個節點prev, // 那麼返回false,節點node不在同步隊列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 若是node有下一個節點next,那麼它必定在同步隊列中 if (node.next != null) // If has successor, it must be on queue return true; // 從同步隊列中查找節點node return findNodeFromTail(node); } // 在同步隊列中從後向前查找節點node,若是找到返回true,不然返回false private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } // 返回true表示節點node插入到同步隊列中,返回false表示節點node沒有插入到同步隊列中 final boolean transferForSignal(Node node) { // 若是節點node的狀態不是Node.CONDITION,或者更新狀態失敗, // 說明該node節點已經插入到同步隊列中,因此直接返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 將節點node插入到同步隊列中,p是原先同步隊列尾節點,也是node節點的前一個節點 Node p = enq(node); int ws = p.waitStatus; // 若是前一個節點是已取消狀態,或者不能將它設置成Node.SIGNAL狀態。 // 就說明節點p以後也不會發起喚醒下一個node節點線程的操做, // 因此這裏直接調用 LockSupport.unpark(node.thread)方法,喚醒節點node所在線程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 等待節點node存放到同步隊列中,若是是當前線程插入的,返回true,若是是另外一個線程插入的,返回false。 final boolean transferAfterCancelledWait(Node node) { // 當節點node狀態仍是Node.CONDITION,改變它的狀態是0,而後插入到同步隊列中。 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } // 若是節點node不在同步隊列中,當前線程讓出執行權 while (!isOnSyncQueue(node)) Thread.yield(); return false; } /** * 釋放當前線程佔有的鎖,並喚醒CLH隊列一個等待線程 * 若是失敗就拋出異常,設置node節點的狀態是Node.CANCELLED * @return */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 釋放當前線程佔有的鎖 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } // Instrumentation methods for conditions public final boolean owns(ConditionObject condition) { return condition.isOwnedBy(this); } public final boolean hasWaiters(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.hasWaiters(); } public final int getWaitQueueLength(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitQueueLength(); } public final Collection<Thread> getWaitingThreads(ConditionObject condition) { if (!owns(condition)) throw new IllegalArgumentException("Not owner"); return condition.getWaitingThreads(); } public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** Condition隊列頭. */ private transient Node firstWaiter; /** Condition隊列尾. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } // Internal methods /** * 爲當前線程建立新的Node節點,而且將這個節點插入到Condition隊列中了 * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // 若是Condition隊列尾節點的狀態不是Node.CONDITION if (t != null && t.waitStatus != Node.CONDITION) { // 清除Condition隊列中,狀態不是Node.CONDITION的節點, // 而且可能會從新設置firstWaiter和lastWaiter unlinkCancelledWaiters(); // 從新將Condition隊列尾賦值給t t = lastWaiter; } // 爲當前線程建立一個狀態爲Node.CONDITION的節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 若是t爲null,表示Condition隊列爲空,將node節點賦值給鏈表頭 if (t == null) firstWaiter = node; else // 將新節點node插入到Condition隊列尾 t.nextWaiter = node; // 將新節點node設置爲新的Condition隊列尾 lastWaiter = node; return node; } // 將Condition隊列中的first節點插入到CLH隊列中 private void doSignal(Node first) { do { // 原先的Condition隊列頭節點取消,因此從新賦值Condition隊列頭節點 // 若是新的Condition隊列頭節點爲null,表示Condition隊列爲空了 // ,因此也要設置Condition隊列尾lastWaiter爲null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 取消first節點nextWaiter引用 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 將condition隊列中全部的節點都插入到同步隊列中 * @param first condition隊列頭節點 */ private void doSignalAll(Node first) { // 表示將condition隊列設置爲空 lastWaiter = firstWaiter = null; do { // 獲得condition隊列的下一個節點 Node next = first.nextWaiter; first.nextWaiter = null; // 將節點first插入到同步隊列中 transferForSignal(first); first = next; // 循環遍歷condition隊列中全部的節點 } while (first != null); } // 清除Condition隊列中狀態不是Node.CONDITION的節點 private void unlinkCancelledWaiters() { // condition隊列頭賦值給t Node t = firstWaiter; // 這個trail節點,只是起輔助做用 Node trail = null; while (t != null) { //獲得下一個節點next。當節點是condition時候,nextWaiter表示condition隊列的下一個節點 Node next = t.nextWaiter; // 若是節點t的狀態不是CONDITION,那麼該節點就要從condition隊列中移除 if (t.waitStatus != Node.CONDITION) { // 將節點t的nextWaiter設置爲null t.nextWaiter = null; // 若是trail爲null,表示原先的condition隊列頭節點實效,須要設置新的condition隊列頭 if (trail == null) firstWaiter = next; else // 將節點t從condition隊列中移除,由於改變了引用的指向,從condition隊列中已經找不到節點t了 trail.nextWaiter = next; // 若是next爲null,表示原先的condition隊列尾節點也實效,從新設置隊列尾節點 if (next == null) lastWaiter = trail; } else // 遍歷到的有效節點 trail = t; // 將next賦值給t,遍歷完整個condition隊列 t = next; } } // public methods // 若是condition隊列不爲空,將condition隊列頭節點插入到同步隊列中 public final void signal() { // 若是當前線程不是獨佔鎖線程,就拋出IllegalMonitorStateException異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 將Condition隊列頭賦值給節點first Node first = firstWaiter; if (first != null) // 將Condition隊列中的first節點插入到CLH隊列中 doSignal(first); } // 將condition隊列中全部的節點都插入到同步隊列中 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } // 讓當前持有鎖的線程阻塞等待,並釋放鎖。若是線程等待期間發出中斷請求,不會產生中斷異常 public final void awaitUninterruptibly() { // 爲當前線程建立新的Node節點,而且將這個節點插入到Condition隊列中了 Node node = addConditionWaiter(); // 釋放當前線程佔有的鎖,並喚醒其餘線程 int savedState = fullyRelease(node); boolean interrupted = false; // 若是節點node不在同步隊列中(注意不是Condition隊列),就阻塞當前線程 while (!isOnSyncQueue(node)) { // 阻塞當前線程 LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } // 若是節點node已經在同步隊列中了,獲取同步鎖,只有獲得鎖才能繼續執行,不然線程繼續阻塞等待 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * 若是線程沒有發起了中斷請求,返回0. * 若是線程發起了中斷請求,且中斷請求在signalled(即調用signal或signalAll)以前返回THROW_IE * 中斷請求在signalled以後返回REINTERRUPT */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * 若是interruptMode是THROW_IE,就拋出InterruptedException異常 * 若是interruptMode是REINTERRUPT,則當前線程再發出中斷請求 * 不然就什麼都不作 */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * 讓當前持有鎖的線程阻塞等待,並釋放鎖。若是有中斷請求,則拋出InterruptedException異常 * @throws InterruptedException */ public final void await() throws InterruptedException { // 若是當前線程中斷標誌位是true,就拋出InterruptedException異常 if (Thread.interrupted()) throw new InterruptedException(); // 爲當前線程建立新的Node節點,而且將這個節點插入到Condition隊列中了 Node node = addConditionWaiter(); // 釋放當前線程佔有的鎖,並喚醒CLH隊列一個等待線程 int savedState = fullyRelease(node); int interruptMode = 0; // 若是節點node不在同步隊列中(注意不是Condition隊列) while (!isOnSyncQueue(node)) { // 阻塞當前線程,那麼怎麼喚醒這個線程呢? // 首先咱們必須調用signal或者signalAll將這個節點node加入到同步隊列。 // 只有這樣unparkSuccessor(Node node)方法,纔有可能喚醒被阻塞的線程 LockSupport.park(this); // 若是當前線程產生中斷請求,就跳出循環 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 若是節點node已經在同步隊列中了,獲取同步鎖,只有獲得鎖才能繼續執行,不然線程繼續阻塞等待 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除Condition隊列中狀態不是Node.CONDITION的節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 是否要拋出異常,或者發出中斷請求 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } // 獲取 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除Condition隊列中狀態不是Node.CONDITION的節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 是否要拋出異常,或者發出中斷請求 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * 返回true:表示這個condition對象是由sync對象建立的。 */ final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } /** * 這個condition對象上是否有等待線程,即condition隊列不爲空,且有一個節點的狀態是 Node.CONDITION */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 遍歷condition隊列 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * 返回condition對象上等待線程的個數,即遍歷condition隊列,計算節點的狀態是Node.CONDITION的個數 */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; // 遍歷condition隊列 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { // 計算節點的狀態是Node.CONDITION的個數 if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * 返回condition對象上等待線程的集合。 */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); // 遍歷condition隊列 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { // 當節點的狀態是Node.CONDITION,將節點的線程添加到list集合中 if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } } // 支持compareAndSet操做 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * 經過CAS函數設置head值,僅僅在enq方法中調用 */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * 經過CAS函數設置tail值,僅僅在enq方法中調用 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * 經過CAS函數設置node對象的waitStatus值 */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * 經過CAS函數設置node對象的next值 */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } } 
相關文章
相關標籤/搜索