AbstractQueuedSynchronizer 詳解

AbstractQueuedSynchronizer是concurrent工具包的核心抽象類,也是lock,Semaphore、CountDownLatch的基礎。(CyclicBarriar內部是經過reentrantlock實現)java

一.源碼分析

1.繼承

  • AbstractQueuedSynchronizer繼承AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
  • AbstractOwnableSynchronizer:是一個同步器,用來設置佔用lock資源的線程
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    /**
     * 同步佔用線程.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 設置當前擁有獨佔訪問的線程。null參數表示沒有線程擁有訪問權。該方法不會強制執行任何同步或不穩定的字段訪問。
     */
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

    /**
     * 返回由setExclusiveOwnerThread所設置的線程,若是沒有設置,則返回null。該方法不會強制執行任何同步或volatile字段訪問。
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

2.node結構

  • node的原理:
Wait queue node class. 
等待隊列節點類

The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. 
CLH locks are normally used for spinlocks. 
等待隊列是「CLH」(克雷格、Landin和哈格斯登)的一個變體。CLH鎖一般用於自旋鎖。

We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node.
咱們使用它們來阻塞同步器,可是使用相同的基本策略,在當前節點的前驅中保存一些關於線程的控制信息。


A "status" field in each node keeps track of whether a thread should block.
A node is signalled when its predecessor releases. 
Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. 

每一個節點中的「status」字段能夠跟蹤是否應該阻塞線程。
當它的前驅釋放時,當前節點會被喚醒。
隊列的每一個節點都充當一個特定的-notific樣式監視器,它持有一個等待線程。


The status field does NOT control whether threads are granted locks etc though.
A thread may try to acquire if it is first in the queue. 
But being first does not guarantee success; 
it only gives the right to contend. 
So the currently released contender thread may need to rewait. 

狀態字段不能控制線程是否被授予鎖。
若是線程是隊列中的第一個線程,則可能嘗試獲取它。
但做爲第一併不保證成功;它只賦予了競爭的權利。
所以,當前發佈的競爭者可能須要從新等待。

To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field. 

要將其放入CLH鎖中,您能夠將其做爲新的尾部進行拼接。對於dequeue,你只設置了head字段

      +------+  prev +-----+       +-----+
 head |      | <---- |     | <---- |     |  tail
      +------+       +-----+       +-----+
 
Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts. 

插入到CLH隊列中只須要在「tail」上執行一個原子操做,所以有一個簡單的原子點,從不排隊到排隊。相似地,「彈出」只須要更新「頭部」。然而,節點須要更多的工做來肯定它們的後繼者是誰,這在必定程度上是因爲超時和中斷可能致使的取消。

The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. 
「prev」連接(不在原始的CLH鎖中使用)主要是用來處理取消的。若是一個節點被取消,它的後繼(一般)會與一個未被取消的前任從新聯繫。

For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/ 

We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.) 

咱們還使用「下一個」連接來實現阻塞機制。每一個節點的線程id都保存在本身的節點中,所以,前一個節點將經過遍歷下一個連接來肯定下一個節點,以肯定它是哪一個線程。繼任者的肯定必須避免與新排隊節點的競爭,以設置其前任的「下一個」字段。當一個節點的繼任者看起來爲null時,經過從原子更新的「尾部」向後檢查,就能夠解決這個問題。(或者,換個說法,下一個連接是一個優化,所以咱們一般不須要向後掃描。)

Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility. 

取消對基本算法的保守性。因爲咱們必須爲取消其餘節點進行投票,因此咱們可能會忽略一個被取消的節點是在前面仍是後面。這是由取消後的繼任者來處理的,讓他們可以在新的前任上穩定下來,除非咱們可以肯定一個沒有被取消的前任,他將承擔這個責任。

CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention. 

CLH隊列須要一個虛擬頭節點來啓動。但咱們不會在初始化時創造它們,由於若是沒有競爭,就會白費力氣。相反,節點被構造,頭和尾指針被設置在第一個爭用上。

Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on. 

等待條件的線程使用相同的節點,可是使用附加的連接。條件只須要在簡單的(非併發的)鏈接隊列中連接節點,由於只有在獨佔的狀況下才訪問它們。在等待的時候,一個節點被插入到一個條件隊列中。在信號上,節點被轉移到主隊列。狀態字段的一個特殊值用來標記一個節點所在的隊列。
  • node結構:
static final class Node {
    //模式:共享||獨佔
    //共享模式
    static final Node SHARED = new Node();
    //獨佔模式
    static final Node EXCLUSIVE = null;

    //節點狀態:
    static final int CANCELLED =  1; //當前線程被取消
    static final int SIGNAL    = -1; //當前節點的後繼節點要運行,也就是unpark
    static final int CONDITION = -2; //當前節點在condition隊列中等待
    static final int PROPAGATE = -3; '//後繼的acquireShared可以得以執行,讀寫鎖和信號量使用'

    volatile int waitStatus; //節點狀態

    volatile Node prev; //前驅節點

    volatile Node next; //後繼節點

    volatile Thread thread; //節點對應的線程

    Node nextWaiter;    //下一個等待者
}

'waitStatus':節點狀態

 'SIGNAL': The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then, on failure, block. 
 
 'SIGNAL':該節點的後續版本(或將很快)被阻塞(經過park),所以當前節點在釋放或取消時必須取消其後續版本。爲了不競爭,獲取方法必須首先代表它們須要一個信號,而後重試原子獲取,而後,在失敗時,阻塞
 
 'CANCELLED': This node is cancelled due to timeout or interrupt. Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
 
 'CANCELLED':因爲超時或中斷,該節點被取消。節點永遠不會離開這個狀態。特別地,一個被取消節點的線程不再會阻塞
 
 'CONDITION': This node is currently on a condition queue. It will not be used as a sync queue node until transferred, at which time the status will be set to 0. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.) 
 
 'CONDITION': 該節點當前處於一個條件隊列中。在傳輸以前,它不會被用做同步隊列節點,在此期間,狀態將被設置爲0。(這個值的使用與該字段的其餘用途沒有任何關係,但簡化了力學。)
 
 'PROPAGATE': A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
 
 'PROPAGATE' :應該將一個釋放的節點傳播到其餘節點。這是在doReleaseShared中設置的(僅用於頭部節點),以確保傳播繼續,即便其餘操做已經進行了干預
 
 0: None of the above The values are arranged numerically to simplify use. 
 '0' :以上這些值都沒有按數字進行排列,以簡化使用
 
 Non-negative values mean that a node doesn't need to signal. So, most code doesn't need to check for particular values, just for sign. 
 
 非負的值意味着節點不須要發出信號。所以,大多數代碼不須要檢查特定的值,只是爲了符號。
 
 The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. It is modified using CAS (or when possible, unconditional volatile writes).
 
 該字段被初始化爲0,用於正常的同步節點,以及條件節點的條件。它使用CAS進行修改(或者在可能的狀況下,無條件的volatile寫操做)
  • AbstractQueuedSynchronizer 屬性
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    /**
     * 頭節點
     */
    private transient volatile Node head;

    /**
     * 尾節點
     */
    private transient volatile Node tail;

    /**
     * 同步器狀態
     */
    private volatile int state;

    // Unsafe類實例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset; //state內存偏移地址
    private static final long headOffset;   //head內存偏移地址
    private static final long tailOffset;   //tail內存偏移地址
    private static final long waitStatusOffset; //waitStatus內存偏移地址
    private static final long nextOffset;   //next內存地址

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
}

3.工做原理

  • 要點:node

    • 鎖.headNode用來在隊列中佔位,表明鎖擁有者的node
    • 線程的休眠和喚醒都是經過 pred.waitStatus/-1 來判斷
      • pred.waitStatus/-1 :當前線程lock(),本身休眠
      • pred.waitStatus/-1 :鎖佔用線程.unlock(),喚醒nextNode線程
    • 這樣的好處是,鎖的獲取都是排隊執行,減小了鎖競爭。
  • 流程圖算法

    • lock的時候:lock的時候,除了佔用鎖資源的線程。其它線程都會添加到synqueue中,並設置predNode的/waitstate=-1(用例判斷當前線程是否休眠)/,而後資金park(直接休眠)

輸入圖片說明

* **unlock的時候**

輸入圖片說明 1. t1.unlock(), * 設置鎖資源的state=0, * 檢測headNode的waiteState/-1,而後/喚醒下一個線程/ * 設置headNode的/waiteState=0/ 2. t2被喚醒,繼續lock() * 獲取鎖資源,設置鎖ower=t2 * 設置/鎖.head=t2Node/,將以前的headNode從隊列中剔除 * 設置/t2Node.thread=null,t2Node.pred=null/,留空t2Node,做爲佔位併發

  • ReentrantLock.lock() 調用
    • 非公平鎖走上面流程
    • 公平鎖會先去判斷隊列是否爲null
1.默認建立NonfairSync(非公平鎖)

        final void lock() {
            if (compareAndSetState(0, 1))   '直接參與鎖競爭'
                setExclusiveOwnerThread(Thread.currentThread()); //成功則設置鎖的擁有者爲當前線程
            else
                acquire(1);     //失敗則調用 AbstractQueuedSynchronizer.acquire獲取鎖
        }





1.2 公平鎖 fairSync 
        final void lock() {
            acquire(1); "AbstractQueuedSynchronizer.acquire "
        }
        
        "公平鎖的tryAcquire"
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c / 0) {
                if (!hasQueuedPredecessors() && 
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current / getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }        
        
        
        "公平鎖的hasQueuedPredecessors:判斷node隊列的頭尾是否相同,節點的下一個節點是否爲null"
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) / null || s.thread != Thread.currentThread());
        }    

2. AbstractQueuedSynchronizer.acquire 

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    2.1 tryAcquire:調用ReentrantLock.NonfairSync.tryAcquire(1),最後調用Sync.nonfairTryAcquire
    2.2 addWaiter(Node.EXCLUSIVE):添加當前線程到隊列中
    2.3 acquireQueued(final Node node, int arg):循環獲取鎖資源(cas)
    2.4 selfInterrupt():本身中斷。
    
'2.1  tryAcquire(1)獲取鎖資源:調用ReentrantLock.NonfairSync.tryAcquire(1)'

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires); //調用Sync.nonfairTryAcquire
    }

    '2.1.1 調用Sync.nonfairTryAcquire(1)'
    "非公平鎖的tryAcquire"
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); //當前lock資源狀態
        if (c / 0) {   //無線程佔有
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current / getExclusiveOwnerThread()) {    //
            int nextc = c + acquires;   //可重入,將狀態值增長
            if (nextc < 0) // overflow  //防止泄漏
                throw new Error("Maximum lock count exceeded");
            setState(nextc);    //從新設置狀態
            return true;
        }
        return false;
    }    

'2.2 addWaiter(Node.EXCLUSIVE) :添加隊列'

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); //建立node
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;   //獲取鎖隊列的尾節點
        if (pred != null) { //尾節點不爲null
            node.prev = pred;   //將node節點的前驅設置爲尾節點
            if (compareAndSetTail(pred, node)) {   //替換尾節點 
                pred.next = node; //尾節點的後繼爲當前節點
                return node; //返回節點
            }
        }
        enq(node);  //隊列尚未初始化
        return node;
    }
    
    '隊列尚未初始化'
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t / null) { // Must initialize
                if (compareAndSetHead(new Node()))  //初始化:設置頭節點
                    tail = head;    //初始化:將尾節點設置頭節點
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }    

'2.3 acquireQueued(final Node node, int arg):循環獲取鎖資源(cas)'    

    ① 判斷結點的前驅是否爲head而且當前節點是否成功獲取(資源)。

  ② 若步驟①均知足,則設置結點爲head,以後會判斷是否finally模塊,而後返回。

  ③ 若步驟①不知足,則判斷是否須要park當前線程,是否須要park當前線程的邏輯是判斷結點的前驅結點的狀態是否爲SIGNAL,如果,則park當前結點,不然,不進行park操做。

  ④ 若park了當前線程,以後某個線程對本線程unpark後,而且本線程也得到機會運行。那麼,將會繼續進行步驟①的判斷。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;  //默認當前節點獲取失敗
        try {
            boolean interrupted = false;
            for (;;) {  //無限循環
                final Node p = node.predecessor();  //獲取node節點的前驅節點
                if (p / head && tryAcquire(arg)) { //若是前驅爲頭結點,而且當前節點獲取鎖
                    setHead(node);  //將當前節點獲取鎖
                    p.next = null; // help GC   //前驅從隊列中刪除
                    failed = false; 
                    return interrupted; //終端標誌
                }
                //若是獲取鎖資源失敗
                '檢測前驅的狀態'
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) 
                //獲取失敗
                cancelAcquire(node);
        }
    }
    
    '當獲取鎖資源失敗,檢測前驅的狀態並更新當前節點狀態'
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;   //前驅狀態
        if (ws / Node.SIGNAL) //當前驅狀態爲 SIGNAL
     
            // 能夠進行park操做
            return true;
        
        if (ws > 0) {
            表示狀態爲CANCELLED,爲1
            do {
                node.prev = pred = pred.prev; // 找到pred結點前面最近的一個狀態不爲CANCELLED的結點
            } while (pred.waitStatus > 0);
            pred.next = node; // 賦值pred結點的next域   
        } else { 
            // 爲PROPAGATE -3 或者是0 表示無狀態,(爲CONDITION -2時,表示此節點在condition queue中) 
            // 比較並設置前驅結點的狀態爲SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 不能進行park操做
        return false;
    }
    
    '進行park操做而且返回該線程是否被中斷'
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);  // 在許可可用以前禁用當前線程,而且設置了blocker
        return Thread.interrupted();  // 當前線程是否已被中斷,並清除中斷標記位
    }
    
    '獲取失敗,取消繼續獲取(資源)'
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        //節點爲null,返回
        if (node / null)
            return;
        //設置node的線程爲null
        node.thread = null;

        // 獲取前期,並找到前驅中狀態不爲 CANCELLED 的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        // 設置節點的狀態爲CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 若是節點是尾節點,設置尾節點是前驅
        if (node / tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null); //前期下一個節點爲null
        } else { //當前節點不是尾節點
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) / Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // (pred結點不爲頭結點,而且pred結點的狀態爲SIGNAL)或者 
                // pred結點狀態小於等於0,而且比較並設置等待狀態爲SIGNAL成功,而且pred結點所封裝的線程不爲空
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);  '釋放當前節點'
            }

            node.next = node; // help GC
        }
    }    
    'unparkSuccessor:釋放後繼結點'
    private void unparkSuccessor(Node node) {

        // 獲取node結點的等待狀態
        int ws = node.waitStatus;
        if (ws < 0) 
            // 狀態值小於0,爲SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
            // 比較而且設置結點等待狀態,設置爲0
            compareAndSetWaitStatus(node, ws, 0);


        // 獲取node節點的下一個結點
        Node s = node.next;
        if (s / null || s.waitStatus > 0) { 
            // 下一個結點爲空或者下一個節點的等待狀態大於0,即爲CANCELLED
            // s賦值爲空
            s = null; 
            // 從尾結點開始從後往前開始遍歷
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) 
                    // 找到等待狀態小於等於0的結點,找到最前的狀態小於等於0的結點
                    // 保存結點
                    s = t;
        }
        if (s != null) // 該結點不爲爲空,釋放許可
            LockSupport.unpark(s.thread);'喚醒下一個節點'
    }
  • ReentrantLock.lock() 調用
'調用sync.release(1)'
    public final boolean release(int arg) {
        if (tryRelease(arg)) {  
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
'減小鎖的持有次數'
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; //獲取釋放的次數
        if (Thread.currentThread() != getExclusiveOwnerThread())    //若是當前節點沒有持有鎖資源則拋出異常
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c / 0) {   //鎖資源爲0
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
相關文章
相關標籤/搜索