最近結合書籍《Java併發編程藝術》一直在看AQS的源碼,發現AQS核心就是:利用內置的FIFO雙向隊列結構來實現線程排隊獲取int變量的同步狀態,以此奠基了不少併發包中大部分實現基礎,好比ReentranLock等。今天又是週末,便來總結下最近看的消化後的內容。node
主要參考資料《Java併發編程藝術》(有須要的小夥伴能夠找我,我這裏只有電子PDF)結合ReentranLock、AQS等源碼。算法
博文中的流程圖,結構圖等都是我理解以後一步步親自畫的,若是轉載,請標明謝謝!編程
AQS使用的同步隊列是基於一種CLH鎖算法來實現(引用網上資料對CLH簡單介紹):安全
CLH鎖也是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,若是發現前驅釋放了鎖就結束自旋;併發
結點之間是經過隱形的鏈表相連,之因此叫隱形的鏈表是因爲這些結點之間沒有明顯的next指針,而是經過myPred所指向的結點的變化狀況來影響myNode的行爲;oop
當一個線程需要獲取鎖時,會建立一個新的QNode。將當中的locked設置爲true表示需要獲取鎖。而後線程對tail域調用getAndSet方法,使本身成爲隊列的尾部。同一時候獲取一個指向其前趨的引用myPred,而後該線程就在前趨結點的locked字段上旋轉。直到前趨結點釋放鎖。性能
當一個線程需要釋放鎖時,將當前結點的locked域設置爲false,同一時候回收前趨結點。線程A需要獲取鎖。其myNode域爲true。些時tail指向線程A的結點,而後線程B也增長到線程A後面。tail指向線程B的結點。而後線程A和B都在它的myPred域上旋轉,一旦它的myPred結點的locked字段變爲false,它就可以獲取鎖。ui
而在源碼中也有這樣的介紹:this
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks.
* ...........
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
* ..............
在AQS中的同步隊列結構以及獲取/釋放鎖都是基於此實現的,這裏咱們先放一個我畫的基本結構來理解AQS同步隊列,再進一步介紹一些細節。atom
根據以上圖咱們看到:
在源碼中咱們能夠看到:
// 內部類Node節點 static final class Node{...} // 同步隊列的head引用 private transient volatile Node head; // 同步隊列的tail引用 private transient volatile Node tail;
那麼Node結構的具體構成是什麼呢?咱們具體看內部類Node的源碼:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** 等待狀態: * 0 INITAIL: 初始狀態 * 1 CANCELLED: 因爲等待超時或者被中斷,須要從同步隊列中取消等待,節點進入該狀態不會被改變 * -1 SIGNAL: 當前節點釋放同步狀態或被取消,則等待狀態的後繼節點被通知 * -2 CONDITION: 節點在等待隊列中,線程在Condition上,須要其它線程調用Condition的signal()方法才能從等待隊轉移到同步隊列 * -3 PROPAGATE: 表示下一個共享式同步狀態將會無條件被傳播下去 */ volatile int waitStatus; /** 前驅結點 */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 獲取同步狀態的線程 */ volatile Thread thread; /** 等待隊列中的後繼節點 */ Node nextWaiter; /** 判斷Node是不是共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** 返回前驅結點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
從源碼中能夠發現:同步隊列中的節點Node用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和後繼節點。
節點是構成同步隊列的基礎,沒有成功獲取同步狀態的線程將成爲節點加入該隊列的尾部。當一個線程沒法獲取同步狀態時,會被構形成節點並加入同步隊列中,經過CAS保證設置尾節點這一步是線程安全的,此時才能認爲當前節點(線程)成功加入同步隊列與尾節點創建聯繫。具體的實現邏輯請看下面介紹!
經過調用同步器acquire(int arg)方法能夠獲取同步狀態,該方法中斷不敏感,也就是因爲線程獲取同步狀態失敗後進入同步隊列中,後序線程對進行中斷操做時,線程不會從同步隊列中移出
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
同步狀態獲取主要的流程步驟:
1)首先調用自定義同步器實現tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態。
2)若是獲取失敗則構造同步節點(獨佔式Node.EXCLUSIVE)並經過addWaiter(Node ndoe)方法將該節點加入到同步隊列的尾部,同時enq(node)經過for(;;)循環保證安全設置尾節點。
private Node addWaiter(Node mode) { // 根據給定模式構造Node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 嘗試在尾部添加 if (pred != null) { node.prev = pred; // cas方式保證正確添加尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // enq主要是經過for(;;)死循環來確保節點正確添加 // 在for(;;)死循環中,經過cas將節點設置爲尾節點時,才返回;不然一直嘗試設置 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize 當tail節點爲null時,必須初始化構造好 head節點 if (compareAndSetHead(new Node())) tail = head; } else { // 不然就經過cas開始添加尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
假設原隊列中存在Node-1到Node-4節點,此時某個線程獲取同步狀態失敗則構成成Node-5經過CAS方式加入隊列(下圖忽略自旋環節)。
3)節點進入同步隊列以後「自旋」,即acquireQueued(final Node node, int arg)方法,在這個方法中,當前node死循環嘗試獲取鎖狀態,可是隻有node的前驅結點是Head才能嘗試獲取同步狀態,獲取成功以後當即設置當前節點爲Head,併成功返回。不然就會一直自旋。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 當前node節點的前驅是Head時(p == head),纔能有資格去嘗試獲取同步狀態(tryAcquire(arg)) // 這是由於當前節點的前驅結點得到同步狀態,才能喚醒後繼節點,即當前節點 if (p == head && tryAcquire(arg)) { // 以上條件知足以後 setHead(node); // 設置當前節點爲Head p.next = null; // help GC // 釋放ndoe的前驅節點 failed = false; return interrupted; } // 線程被中斷或者前驅結點被釋放,則繼續進入檢查:p == head && tryAcquire(arg if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
此時新加入的Node-5節點也開始自旋,此時的Head(Node-1)已經獲取到了同步狀態,而Node-2退出了自旋,成爲了新的Head。
文字總結:
1)同步器會維護一個雙向FIFO隊列,獲取同步失敗的線程將會被構形成Node加入隊尾(而且作自旋檢查:檢查前驅結點是不是Head);
2)當前線程想要得到同步狀態,前提是其前驅結點是頭結點,而且得到了同步狀態;
3)當Head調用release(int arg)釋放鎖的同時會喚醒後繼節點(即當前節點),後繼節點結束自旋
流程圖總結:
同步器的release方法:釋放鎖的同時,喚醒後繼節點(進而時後繼節點從新獲取同步狀態)
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 該方法會喚醒Head節點的後繼節點,使其重試嘗試獲取同步狀態 unparkSuccessor(h); return true; } return false; }
UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)喚醒處於等待狀態的線程(以後會慢慢看源碼介紹)。
共享鎖跟獨佔式鎖最大的不一樣就是:某一時刻有多個線程同時獲取到同步狀態,獲取判斷是否獲取同步狀態成功的關鍵,獲取到的同步狀態要大於等於0。而其餘步驟基本都是一致的,仍是從源碼開始分析起:帶後綴Share都爲共享式同步方法。
1)acquireShared(int arg)獲取同步狀態:若是獲取失敗則加入隊尾,而且檢查是否具有退出自旋的條件(前驅結點是頭結點而且能成功獲取同步狀態)
public final void acquireShared(int arg) { // tryAcquireShared 獲取同步狀態,大於0纔是獲取狀態成功,不然就是失敗 if (tryAcquireShared(arg) < 0) // 獲取狀態失敗則構造共享Node,加入隊列; // 而且檢查是否具有退出自旋的條件:即preNode爲head,而且能獲取到同步狀態 doAcquireShared(arg); }
2)doAcquireShared(arg):獲取失敗的Node加入隊列,若是當前節點的前驅結點是頭結點的話,嘗試獲取同步狀態,若是大於等於0則在for(;;)中退出(退出自旋)。
private void doAcquireShared(int arg) { // 構造共享模式的Node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); // 前驅節點是頭結點,而且能獲取狀態成功,則return返回,退出死循環(自旋) if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
3)releaseShared(int arg):釋放同步狀態,經過loop+CAS方式釋放多個線程的同步狀態。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 經過loop+CAS方式釋放多個線程的同步狀態 doReleaseShared(); return true; } return false; }
一、實現一個不可重入的互斥鎖Mutex
二、實現指定共享數量的共享鎖MyShareLock
--------------------------------未完待續(爲了加深理解畫圖寫代碼花費時間較長,因此慢慢來保證質量,不着急!)-------------------------------------------