小弟爲了進階本身的技術。一點點挖掘自身潛力。html
AQS abstractQueueSynchronizer(抽象隊列同步器),是什麼?java
答:它是用來構建鎖 或者 其餘同步器組件的重量級基礎框架,是整個JUC體系的基礎。經過內置FIFO隊列來完成獲取線程取鎖的排隊工做,並經過一個int類型變量標識持有鎖的狀態;node
前置知識點:設計模式
一、可重入鎖(遞歸鎖):api
sync(隱式鎖,jvm管理)和ReentrantLock(Lock顯式鎖,就是手動加解)是重入鎖的典型表明,爲能夠重複使用的鎖。一個變成多個流程,能夠獲取同一把鎖。app
可重入鎖概念: 是指一個線程,在外層方法獲取鎖的時候,再次進入該線程的內層方法會自動獲取鎖(必須是同一個對象),不會被阻塞。可避免死鎖 框架
舉例: 遞歸調用同一個 sync修飾的方法或者代碼塊。必須是一個對象才行。一個線程調用一個對象的method1,method1 調用method2,method2調用method3, 3個方法都是被sync修飾,這樣也是一個可重入鎖的例子 。jvm
再好比下面這種ide
static Object lock = new Object(); public void mm(){ synchronized (lock){ System.out.println("===========mm method"); synchronized (lock){ System.out.println("=========== method"); } } }
只有一個對象 和同步代碼塊,若是sycn中嵌套sync 並都是lock對象,那麼該線程就會持有當前對象的鎖,並可重入。反編譯後發現函數
public void mm(); Code: 0: getstatic #7 // Field lock:Ljava/lang/Object; 3: dup 4: astore_1 5: monitorenter 6: getstatic #8 // Field java/lang/System.out:Ljava/io/PrintStream; 9: ldc #9 // String ===========mm method 11: invokevirtual #10 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 14: aload_1 15: monitorexit 16: goto 24 19: astore_2 20: aload_1 21: monitorexit 22: aload_2 23: athrow 24: return Exception table: from to target type 6 16 19 any 19 22 19 any
sync同步代碼塊加解鎖,使用的命令爲monitorenter 和 monitorexit(同步方法標識是ACC_SYNCHRONIZED,在flag中),enter 爲加鎖,必須成對出現,但這裏卻又兩個exit。緣由爲第一個exit爲程序正常運行後的解鎖命令,並執行完後會執行goto到return ,也就是第24行,
第二個exit 爲當程序出現異常時,須要執行的解鎖命令;
如上就是可重入鎖的相關概念
二、什麼是LockSupport?
根據jdk8 的api文檔顯示定義爲: 用於建立鎖和其餘同步類的基本線程阻塞原語;
是一個線程阻塞工具類,全部方法均爲靜態,可讓線程在任意位置阻塞,阻塞後也有對應的喚醒方法。
先複習下object 對象的wait 和 notify 和Lock 的condition
-
wait 和notify 必須在sync 代碼塊中才能使用,不然報錯。非法的監視器
-
condition的await 和 signal方法也必須在lock 和unlock方法前執行,不然報錯,非法的監視器
-
線程必定要先 等待 ,再 被 喚醒,順序不能換
LockSupport 有兩個關鍵函數 park 和unpark,該類使用了Permit(許可證)的概念來阻塞和喚醒線程的功能。每一個線程都會有一個Permit,該Permit 只有兩個值 0 和1 ,默認是0。相似於信號量,但上限是1;
來看park方法:
public static void park() { //unsafe的方法。初始爲0 UNSAFE.park(false, 0L); }
禁止當前線程進行線程調度,除非Permit可用,就是1
若是Permit 爲1(有可用證書) 將變動爲0(線程仍然會處理業務邏輯),而且當即返回。不然當前線程對於線程調度目的將被禁用,並處於休眠狀態。直至發生三件事情之一:
-
一些其餘線程調用當前線程做爲目標的unpark ; 要麼
-
其餘一些線程當前線程爲interrupts ; 要麼
-
電話虛假(也就是說,沒有理由)返回。
這種方法不報告是哪一個線程致使該方法返回。 來電者應從新檢查致使線程首先停放的條件。 呼叫者還能夠肯定線程在返回時的中斷狀態。
小結:Permit默認0,因此一開始調用park,當前線程被阻塞,直到別的線程將當前線程的Permit修改成1,從park方法處被喚醒,處理業務,而後會將permit修改成0,並返回;若是permit爲1,調用park時會將permit修改成0,在執行業務邏輯到線程生命週期。與park方法定義吻合。
在看unpark方法:
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
在調用unpark方法後,會將Thread線程的許可permit設置成1,會自動喚醒thread線程,即,以前阻塞中的LockSupport.park方法會當即返回,而後線程執行業務邏輯 。 且 unpark能夠在park以前執行。至關於執行park沒有效果。
三、AQS abstractQueueSynchronizer 源碼
剩餘前置知識爲: 公平鎖、非公平鎖、自旋鎖、鏈表、模板設計模式
AQS使用volatile修飾的int類型的變量 標識鎖的狀態,經過內置的FIFO隊列來完成資源獲取的排隊工做,將每條要去搶佔資源的線程封裝成node節點實現鎖的分配,經過CAS(自旋鎖)完成對state值的修改 ;
(1)node節點源碼
static final class Node { /** Marker to indicate a node is waiting in shared mode */ //共享節點 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ //獨佔節點 static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ //線程被取消狀態 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 後續線程須要喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ //鄧丹condition喚醒 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ //共享室同步狀態獲取 將會無條件傳播下去 static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ //初始爲0,狀態是上面幾種,標識當前節點在隊列中的狀態 volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ //前置節點 volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ //後置節點 volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ //當線程對象 volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
node節點就是每個等待執行的線程。還有一個waitState狀態字段,標識當前等待中的線程狀態
根據node節點 繪畫一個aqs基本結構圖
解釋:state爲狀態位,aqs爲同步器。有head 和tail兩個 頭 尾節點,當state = 1時,代表同步器被佔用(或者說當前有線程持有了同一個對象的鎖),將後續線程添加到隊列中,並用雙向鏈表鏈接,遵循FIFO。
(2)以ReentrantLock的實現分析。由於他也實現了Lock 並內部持有同步器sync和AQS(以銀行櫃檯例子)
new ReentrantLock()或 new ReentrantLock(false)時,建立的是非公平鎖,而 ReentrantLock對象內部還有 兩個類 分別爲公平同步器和非公平同步器
static final class NonfairSync extends Sync //公平鎖 有一個判斷隊列中是否有排隊的線程,這是與上面鎖不一樣的獲取方式 static final class FairSync extends Sync
公平鎖解釋:先到先得,新線程在獲取鎖時,若是這個同步器的等待隊列中已經有線程在等待,那麼當前線程會先進入等待隊列;
非公平鎖解釋:新進來的線程不論是否有等待的線程,若是能夠獲取鎖,則馬上佔有鎖。
這裏還有一個關鍵的模板設計模式: 在查詢aqs的tryAcquire方法時發現,該方法直接拋出異常,這就是典型的模板設計模式,強制要求子類重寫該方法。不然不讓用
1.1 當線程a到櫃檯辦理業務時,會調用sync 的lock,即 a線程調用lock方法
final void lock() { //利用cas將當前對象的state 從0 設置成1,固然裏面還有一個偏移量 //意思就是若是是0 就設置爲1成功返回true if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
由於是第一個運行的線程,確定是true因此,將當前運行的線程設置爲a,即a線程佔用了同步器,獲取了鎖
1.2 當b線程運行lock時,發現不能將0設置成1(cas思想),就會運行acquire(1)方法
public final void acquire(int arg) { //這裏用到了模板設計模式,強制子類實現該方法 //由於默認使用非公平鎖,因此看NonfairSync if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } //該方法就是非公平鎖執行 等待的方法 final boolean nonfairTryAcquire(int acquires) { //獲取當前b線程 final Thread current = Thread.currentThread(); //獲取當前鎖的狀態是1,由於a線程已經獲取,並將state修改成1 int c = getState(); //有可能b在設置state時,a正辦理,到這兒時,a辦理完了。state爲0了。 if (c == 0) { //樂觀的將state 從0 修改成 1 if (compareAndSetState(0, acquires)) { //設置當前獲取鎖的線程爲b setExclusiveOwnerThread(current); return true; } } //有可能 a線程辦完業務。又回頭辦理了一個,因此當前線程持有鎖的線程依舊是a else if (current == getExclusiveOwnerThread()) { //2 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //設置state的值 setState(nextc); return true; } //若是b線程走到這裏,就證實b必須到等待隊列裏去了 return false; }
再來看邏輯運算符後面的邏輯
private Node addWaiter(Node mode) { //實例化一個節點,並將b 和 節點類型封裝成node Node node = new Node(Thread.currentThread(), mode); //等待隊列爲null 因此tail初始化確定是null //若是是線程c在b以後進來,tail就是b 節點 Node pred = tail; //c節點以後都走這個方法 if (pred != null) { //node的前置爲tail //c 的前置設置爲b node.prev = pred; //cas樂觀,比較 若是當前節點仍然是b 就將b 設置成c //b就是tail尾節點,將tail設置成c //這裏根據源碼可知,就是將tail的值設置成c 並不影響pred的值,仍是b if (compareAndSetTail(pred, node)) { //b 的下一個節點設置成c pred.next = node; return node; } } //線程b 入等待隊列 enq(node); return node; } //入隊列方法 private Node enq(final Node node) { //該方法相似於while(true) for (;;) { //獲取tail節點 Node t = tail; //初始化鎖等待隊列 if (t == null) { // Must initialize //設置頭部節點爲新的節點 //這裏看出,鎖等待隊列的第一個節點並不是b,而是一個空node,該node爲站位節點或者叫哨兵節點 if (compareAndSetHead(new Node())) //將頭尾都指向該節點 tail = head; } else { //第二次循環時,t爲空node,將b的前置設置爲空node node.prev = t; //設置tail節點爲b節點 if (compareAndSetTail(t, node)) { //空node節點的下一個節點爲b node節點 t.next = node; return t; } } } } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }
以上b線程的進入等待隊列的操做就完成了 ,但線程仍是活躍的,如何阻塞的呢?
下面接着看acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋執行 for (;;) { //若是是b線程,這裏p就是b節點的前置節點 final Node p = node.predecessor(); //空節點就是head節點,但又調用了一次tryAcquire方法,想再嘗試獲取鎖資源 //若是a線程未處理完,那麼這裏返回false //若是a線程處理完成,那麼這裏就能夠獲取到鎖 if (p == head && tryAcquire(arg)) { //將head設置成b節點 setHead(node); //原空節點的下鏈接斷開 p.next = null; // help GC failed = false; return interrupted; } //第一次空節點進入should方法。返回false //當第二次循環到此處should方法返回true //執行parkAndCheckInterrupt方法,會將當前線程park,並獲取b線程的中斷狀態,若是未中斷返回false,並再次自旋一次 ,中斷爲true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //head節點就是空節點因此w=0 //空節點第二次進入時就是-1 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; //若是b節點狀態是其餘,則將節點鏈接變化一下 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //ws = 0時,使用cas將驗證pred 和ws 的值,是空節點和0 並將ws修改成-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
以上b線程未獲取鎖 並被掛起的操做就完成了
1.3 當a線程調用unlock方法時:
public void unlock() { sync.release(1); } public final boolean release(int arg) { //判斷a線程是否完成業務。並釋放鎖,state=0 if (tryRelease(arg)) { //獲取頭部節點,就是空節點 Node h = head; //空節點在b獲取鎖時,狀態變動爲-1,因此這裏是true if (h != null && h.waitStatus != 0) //喚醒線程 unparkSuccessor(h); return true; } return false; } //aqs父類的模板方法,強制要求子類實現該方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected final boolean tryRelease(int releases) { //將鎖的狀態設置爲0 int c = getState() - releases; //判斷當前線程與 鎖的獨佔線程是否一致 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //所得狀態標識 boolean free = false; if (c == 0) { free = true; //若是state=0證實a完成了業務。那麼鎖的獨佔狀態就應該恢復爲null setExclusiveOwnerThread(null); } //恢復鎖的state狀態 setState(c); return free; }
注意:這裏state是減1操做。若是ReentrantLock不斷可重入,那麼這裏是不能一次就歸零的。因此纔會有ReentrantLock 調了幾回lock 就是要調幾回unlock
a線程喚醒b線程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //空節點-1,設置成0 compareAndSetWaitStatus(node, ws, 0); //獲取b節點,非null 且 waitState=0 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //將b線程喚醒 LockSupport.unpark(s.thread); }
而 b線程還在acquireQueued方法裏自旋呢,不過自旋後就會獲取鎖。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //b線程在a未釋放鎖以前一直在自旋, for (;;) { final Node p = node.predecessor(); //當a釋放鎖後,b獲取到鎖,將state設置爲1 //並將空節點的全部鏈接斷開等待GC回收 //並返回當前線程 b 的中斷狀態 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //park當前線程b 並獲取b的中斷狀態,確定是false,且調用的是帶參的native方法,屢次調用會重置b線程的中斷狀態 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //調用該方法的if判斷若是進入了。會將b 中斷,並在不斷循環中再重置中斷狀態爲false
1.4 c線程的執行流程與b線程相似。會將b節點充等待隊列中移除,遵循FIFO
以上就是a線程執行完成後b線程開始執行的流程
以銀行櫃檯爲例,就是a客戶先到櫃檯前辦理業務(佔用同步器或者說是鎖),b客戶和c客戶只能在等待區等待。此時呢。a辦理完業務後可能直接離開了。那麼b就能夠佔用櫃檯(獲取同步器或者鎖),c繼續等待。。。還有一種就是a客戶辦理完業務。忽然想起來還有個業務要辦理,又坐在櫃檯前,這時a客戶就至關於重入鎖,再次獲取同步器,b和c只能等待。(若是a辦理完業務,忽然來了個d客戶,直接到櫃檯辦理業務。這是b和c都沒反應過來。也是能夠的,畢竟是非公平鎖,若是是公平鎖,那麼d客戶只能乖乖在c客戶後面排着)