AbstractQueuedSynchronizer原理剖析

不管是公平鎖仍是非公平鎖,它們的實現都依賴於AbstractQueuedSynchronizer,它提供了一個基於先進先出等待隊列 實現block locks和synchronizers的框架。特性以下node

  • 僅經過一個 int 類型來表明狀態。對於ReentrantLock而言,他就是線程持有鎖的次數,當次數爲0時,表明鎖沒有被持有,正數表明被持有的次數,負數則是超出了鎖的持有範圍,有可能存在死循環
  • 支持獨佔模式(默認的)和共享模式。在獨佔模式中,去獲取一個已經被其它線程擁有的鎖只會失敗,共享模式中,則多個線程是能夠成功的。

lock()原理

當ReentrantLock獲取鎖失敗時,會執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)框架

private Node addWaiter(Node mode) { 
    Node node = new Node(Thread.currentThread(), mode); //建立一個節點,存儲當前的線程,以及鎖持有的模式,對於 ReentrantLock來講就是 獨佔 型
    // Try the fast path of enq; backup to full enq on failure    
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {//CAS操做,若是當前的尾部節點沒有被其它線程更改,那麼把新的節點設置成隊列的尾部
            pred.next = node;
            return node;
        }
    }
    enq(node);//首次入隊
    return node;
}

獲取失敗進行入隊操做,首先就是往隊列中添加一個正在等待的節點Node性能

圖片描述
從Node自己的結構能夠看到,AQS(AbstractQueuedSynchronizer)自己就維護了一個雙向鏈表,用來存放等待中的線程。鏈表的每一個節點,表明那個線程,是獨佔仍是共享鎖。
建立好節點以後,便執行入隊操做,對於首次建立隊列ui

private Node enq(final Node node) {
    for (;;) {
       //藉助CAS機制實現無鎖操做,因此須要一直執行直到CAS成功
        Node t = tail;
        if (t == null) { // 初始化發生在第一次建立隊列,這樣的好處是,當競爭不激烈的時候,實際上也就不會發生這些操做,性能也會好些
             if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

能夠看到,入隊也就是從隊尾插入新的等待線程,入隊完畢,也就開始去進行不斷的嘗試,直到獲取鎖成功,能夠看到,對於lock來講,其實已是阻塞了this

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //優先執行當前節點的前一個節點
            if (p == head && tryAcquire(arg)) { //僅噹噹前節點的前一個節點是head,纔去獲取線程,這裏能夠看出其實先等待的線程是會優先處理,也就是FIFO原則
                setHead(node); 
                p.next = null; // help GC    ,釋放掉當前線程在隊列中的引用,也能夠看作’出隊'了            
                failed = false;
                //執行到這裏說明獲取鎖成功
                return interrupted;
            }
            //執行到這裏說明存在競爭,有多個線程都在等待一個鎖
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //裏面會對當前線程執行中斷,當被喚醒時,繼續循環
                    //若是線程被中斷,設置中斷標記,區別於 doAcquireInterruptibly,doAcquireInterruptibly是直接拋出異常,這也就是 lockInterruptibly可以拋出中斷的緣由
                    interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
從這裏能夠看到,不管鎖是公平鎖仍是非公平鎖,只要被放入了等待隊列,此時的執行依然是誰先等待就先執行誰 ,非公平鎖體如今新來的線程會無視已經等了的線程,能夠優先去搶鎖,因此公平體如今第一次參與搶鎖的線程會去等待已經在等待隊列中的線程,非公平並非說從已經在等待的線程隊列裏面隨便選一個

shouldParkAfterFailedAcquire的源碼以下spa

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //查看前一個節點的等待狀態
    if (ws == Node.SIGNAL)
            //已經嘗試過獲取鎖,能夠執行park了
            return true;
    if (ws > 0) {   
        do {
            //去掉隊列中全部已經取消的線程
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {             
        //此時當前線程的前一個節點的等待狀態一定是0或者PROGATE,這代表當前線程在park以前能夠再嘗試一次去獲取鎖,也就是說前一個節點可能剛獲取到SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

waitStatus:等待的狀態,共有5種線程

  • SIGNAL:,代表它的前一個節點須要執行 unparking;
  • CANCELLED:當前節點保存的線程因爲超時或者中斷被取消了;
  • CONDITION:接檔正處於條件隊列中,執行了await;
  • PROPAGATE:一個共享的鎖須要傳遞釋放信號到其它節點
  • 0:非上述4中狀態,有多是剛獲取signal,此時它的值是0,也有多是新建的head節點

parkAndCheckInterrupt主要是park當前線程code

private final boolean parkAndCheckInterrupt() {
    //當獲取不到許可時,阻塞線程,解除阻塞狀態的狀況以下:
    //1 某個線程對這個線程調用了unpark方法
    //2 某個線程中斷了這個線程
    //3 這個方法毫無理由的返回了 [park比較奇特的地方],基於這樣,調用的時候必須去判斷park的條件,以及當它返回的時候,去設置中斷的狀態
    LockSupport.park(this); 
    //返回線程的中斷狀態
    return Thread.interrupted();
}

至此lock()執行結束隊列

unlock()原理

當執行unlock時,ReentrentLock執行對應的Release圖片

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        //執行這裏表示所已經被釋放,可讓它的下一個節點來搶鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //h.waitStatus == 0 表示尚未執行park,天然不須要unpark
        return true;
    }
    return false;
}

若是release成功,即當前線程持有的全部鎖都已經釋放,那麼就能夠執行 unparkSuccessor,從源碼能夠看到,unpark是從頭部開始進行的,結合lock的原理,可知AQS自己就是一個先進先出的隊列
unparkSuccessor源碼以下

private void unparkSuccessor(Node node) {
     int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

     Node s = node.next; //有可能線程被取消了,因此從尾部往前遍歷,到最近一個沒有被取消的線程
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t; //確保線程沒有取消
    }
    if (s != null)
        LockSupport.unpark(s.thread); //恢復線程
}

至此unlock()完畢

await的原理

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();//當前線程已經中斷了,拋出中斷異常
    //添加一個新的waiter到condition queue中,這個新的Node的waitStatus會被標記爲CONDITION
    Node node = addConditionWaiter(); 
    //釋放當前線程擁有的鎖,即從sync queue中去掉當前線程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    //若是當前線程不在持有鎖的隊列裏頭,對他進行休眠,當其它線程執行 unlock的時候,釋放鎖,就會執行unpark操做,此時它會被喚醒,喚醒後,若是它在syn隊列裏頭,開始繼續往下執行。(這個插入操做則是由signal完成)
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;//等待的過程當中線程中斷了,退出
    }
//從新競爭鎖,至關於執行了lock操做
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    //再次去獲取鎖,若是當前的線程在park的時候是被中斷了,而且ConditionObject並非因爲中斷返回,這裏再次標記爲中斷
        interruptMode = REINTERRUPT;  
    if (node.nextWaiter != null) 
    //清除非Condition模式的線程,而在signal中有先關操做將conditon的線程設置成非condition
         unlinkCancelledWaiters();
    if (interruptMode != 0)
    //上報等待的過程當中發生了中斷,若是是要拋出中斷,就拋出,不然再次執行中斷
        reportInterruptAfterWait(interruptMode); 
}

isOnSyncQueue源碼以下

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
    //node自己是調用了 await 方法,或者沒有在獲取鎖的隊列裏頭,[若是在裏頭一定有一個前置的節點]
        return false; 
    if (node.next != null) 
    //當前節點存在下一個節點,那麼它確定是執行過 enq ,即獲取過鎖
        return true; 
    // CAS失敗的時候,有可能 node.rev是沒有的,所以須要從頭至尾遍歷一次
   return findNodeFromTail(node); 
}

checkInterruptWhileWaiting源碼以下

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    //線程中斷從新獲取鎖,而且設置waitStatus爲0,以便後續線程從condition queue清除
        enq(node); 
        return true;
    }
    while (!isOnSyncQueue(node))
    //若是CAS失敗,只要當前節點沒有在Sync queue中,那麼一直自旋,每次都會交出執行權限
        Thread.yield(); 
    return false;
}

能夠看到,await其實就是釋放線程原有的鎖,並把它放入conditon隊列中,而後執行阻塞。等喚醒的時候,從新獲取鎖,並清掉condition queue中的線程。 至此await執行結束

singnal的原理

public final void signal() {
    if (!isHeldExclusively()) //只有當前線程持有了鎖,才能釋放
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);//優先釋放隊列頭的,也就是等待時間最長的condition node
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
//將節點從condition queue轉移到sync queue
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false; //設置爲非等待失敗,則不繼續轉移
//CAS設置等待狀態爲0成功
Node p = enq(node); //新節點放入sync queue,並返回原來的尾部節點,也就是新節點的前一個節點
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //參考shouldParkAfterFailedAcquire
        LockSupport.unpark(node.thread);//若是當前節點的前一個節點線程已經取消,或者將當前節點的前一個節點線程的waitStatus設置成SIGNAL失敗,則直接喚醒當前線程
    return true;
}

能夠看到signal最關鍵的信息就是去掉等待隊列中的CONDITION狀態,並將線程加入sync隊列,至此signal結束

相關文章
相關標籤/搜索