AbstractQueuedSynchronizer 底層源碼原理分析

概述

Java的內置鎖一直都是備受爭議的,在JDK 1.6以前,synchronized這個重量級鎖其性能一直都是較爲低下,雖然在1.6後,進行大量的鎖優化策略,可是與Lock相比synchronized仍是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基於JVM機制),可是它卻缺乏了獲取鎖與釋放鎖的可操做性,可中斷、超時獲取鎖,且它爲獨佔式在高併發場景下性能大打折扣。node

AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其餘同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC併發包的做者(Doug Lea)指望它可以成爲實現大部分同步需求的基礎。它是JUC併發包中的核心基礎組件。安全

AQS解決了子類實現同步器時涉及到的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器能夠帶來不少好處。它不只可以極大地減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。多線程

AQS的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態。併發

AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操做,固然AQS能夠確保對state的操做是安全的。框架

AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做,若是當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。高併發

AQS能夠實現獨佔鎖和共享鎖,RenntrantLock實現的是獨佔鎖,ReentrantReadWriteLock實現的是獨佔鎖和共享鎖,CountDownLatch實現的是共享鎖。oop

  1. 獨佔式exclusive。保證一次只有一個線程能夠通過阻塞點,只有一個線程能夠獲取到鎖。
  2. 共享式shared。能夠容許多個線程阻塞點,能夠多個線程同時獲取到鎖。

下面咱們經過源碼來分析下AQS的實現原理性能

AbstractQueuedSynchronizer類結構

經過AQS的類結構咱們能夠看到它內部有一個隊列和一個state的int變量。
隊列:經過一個雙向鏈表實現的隊列來存儲等待獲取鎖的線程。
state:鎖的狀態。
head、tail和state 都是volatile類型的變量,volatile能夠保證多線程的內存可見性。優化

同步隊列的基本結構以下:ui

同步隊列

同步器隊列Node元素的類結構以下:

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    //表示當前的線程被取消;
    static final int CANCELLED =  1;
    //表示當前節點的後繼節點包含的線程須要運行,也就是unpark;
    static final int SIGNAL    = -1;
    //表示當前節點在等待condition,也就是在condition隊列中;
    static final int CONDITION = -2;
    //表示當前場景下後續的acquireShared可以得以執行;
    static final int PROPAGATE = -3;
    //表示節點的狀態。默認爲0,表示當前節點在sync隊列中,等待着獲取鎖。
    //其它幾個狀態爲:CANCELLED、SIGNAL、CONDITION、PROPAGATE
    volatile int waitStatus;
    //前驅節點
    volatile Node prev;
    //後繼節點
    volatile Node next;
    //獲取鎖的線程
    volatile Thread thread;
    //存儲condition隊列中的後繼節點。
    Node nextWaiter;
    ......
}

從Node結構prev和next節點能夠看出它是一個雙向鏈表,waitStatus存儲了當前線程的狀態信息

waitStatus

  1. CANCELLED,值爲1,表示當前的線程被取消;
  2. SIGNAL,值爲-1,表示當前節點的後繼節點包含的線程須要運行,也就是unpark;
  3. CONDITION,值爲-2,表示當前節點在等待condition,也就是在condition隊列中;
  4. PROPAGATE,值爲-3,表示當前場景下後續的acquireShared可以得以執行;
  5. 值爲0,表示當前節點在sync隊列中,等待着獲取鎖。

下面咱們經過如下五個方面來介紹AQS是怎麼實現的鎖的獲取和釋放的

  1. 獨佔式得到鎖
  2. 獨佔式釋放鎖
  3. 共享式得到鎖
  4. 共享式釋放鎖
    5.獨佔超時得到鎖

1.獨佔式得到鎖

acquire方法代碼以下:

public final void acquire(int arg) {
        //嘗試得到鎖,獲取不到則加入到隊列中等待獲取
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. 首先執行tryAcquire方法,嘗試得到鎖。
  2. 若是獲取失敗則進入addWaiter方法,構造同步節點(獨佔式Node.EXCLUSIVE),將該節點添加到同步隊列尾部,並返回此節點,進入acquireQueued方法。
  3. acquireQueued方法,這個新節點死是循環的方式獲取同步狀態,若是獲取不到則阻塞節點中的線程,阻塞後的節點等待前驅節點來喚醒或阻塞線程被中斷。

addWaiter方法代碼以下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        //將該節點添加到隊列尾部
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //若是前驅節點爲null,則進入enq方法經過自旋方式入隊列
    enq(node);
    return node;
}

將構造的同步節點加入到同步隊列中

  1. 使用鏈表的方式把該Node節點添加到隊列尾部,若是tail的前驅節點不爲空(隊列不爲空),則進行CAS添加到隊列尾部。
  2. 若是更新失敗(存在併發競爭更新),則進入enq方法進行添加

enq方法代碼以下:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //若是隊列爲空,則經過CAS把當前Node設置成頭節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //若是隊列不爲空,則向隊列尾部添加Node
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

該方法使用CAS自旋的方式來保證向隊列中添加Node(同步節點簡寫Node)

  1. 若是隊列爲空,則把當前Node設置成頭節點
  2. 若是隊列不爲空,則向隊列尾部添加Node

acquireQueued方法代碼以下:

final boolean acquireQueued(final Node node, int arg) {  
    boolean failed = true;  
    try {  
        boolean interrupted = false;  
        for (;;) {  
            //找到當前節點的前驅節點
            final Node p = node.predecessor();  
            //檢測p是否爲頭節點,若是是,再次調用tryAcquire方法 
            if (p == head && tryAcquire(arg)) {  
                //若是p節點是頭節點且tryAcquire方法返回true。那麼將當前節點設置爲頭節點。
                setHead(node);  
                p.next = null; // help GC  
                failed = false;  
                return interrupted;  
            }  
            //若是p節點不是頭節點,或者tryAcquire返回false,說明請求失敗。  
            //那麼首先須要判斷請求失敗後node節點是否應該被阻塞,若是應該  
            //被阻塞,那麼阻塞node節點,並檢測中斷狀態。  
            if (shouldParkAfterFailedAcquire(p, node) &&  
                parkAndCheckInterrupt())  
                //若是有中斷,設置中斷狀態。  
                interrupted = true;  
        }  
    } finally {  
        if (failed) //最後檢測一下若是請求失敗(異常退出),取消請求。  
            cancelAcquire(node);  
    }  
}

在acquireQueued方法中,當前線程經過自旋的方式來嘗試獲取同步狀態,

  1. 若是當前節點的前驅節點頭節點才能嘗試得到鎖,若是得到成功,則把當前線程設置成頭結點,把以前的頭結點從隊列中移除,等待垃圾回收(沒有對象引用)
  2. 若是獲取鎖失敗則進入shouldParkAfterFailedAcquire方法中檢測當前節點是否能夠被安全的掛起(阻塞),若是能夠安全掛起則進入parkAndCheckInterrupt方法,把當前線程掛起,並檢查剛線程是否執行了interrupted方法。

經過上面的代碼咱們能夠發現AQS內部的同步隊列是FIFO的方式存取的。節點自旋獲取同步狀態的行爲以下圖所示

節點自旋獲取同步狀態

shouldParkAfterFailedAcquire方法代碼以下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //得到前驅節點狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
           //若是前驅節點狀態爲SIGNAL,當前線程則能夠阻塞。
           return true;
        if (ws > 0) {
            do {
                //判斷若是前驅節點狀態爲CANCELLED,那就一直往前找,直到找到最近一個正常等待的狀態
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //並將當前Node排在它的後邊。
            pred.next = node;
        } else {
            //若是前驅節點正常,則修改前驅節點狀態爲SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

節點的狀態以下表:

狀態 說明
CANCELLED 1 等待超時或者中斷,須要從同步隊列中取消
SIGNAL -1 後繼節點出於等待狀態,當前節點釋放鎖後將會喚醒後繼節點
CONDITION -2 節點在等待隊列中,節點線程等待在Condition上,其它線程對Condition調用signal()方法後,該節點將會從等待同步隊列中移到同步隊列中,而後等待獲取鎖。
PROPAGATE -3 表示下一次共享式同步狀態獲取將會無條件地傳播下去
INITIAL 0 初始狀態
  1. 首先獲取前驅節點的狀態ws
  2. 若是ws爲SIGNAL則表示能夠被前驅節點喚醒,當前線程就能夠掛起,等待前驅節點喚醒,返回true(能夠掛起)
  3. 若是ws>0說明,前驅節點取消了,並循環查找此前驅節點以前全部連續取消的節點。並返回false(不能掛起)。
  4. 嘗試將當前節點的前驅節點的等待狀態設爲SIGNAL

parkAndCheckInterrupt方法代碼以下:

private final boolean parkAndCheckInterrupt() {
    //阻塞當前線程
    LockSupport.park(this);
    //判斷是否中斷來喚醒的
    return Thread.interrupted();
}
  1. 調用LockSupport.park(this);進行阻塞當前線程
  2. 若是被喚醒判斷是否是被中斷的(喚醒有兩種可能性,一種是unpark,一種是interrupter)

2. 獨佔式釋放鎖

release方法代碼以下:

public final boolean release(int arg) {
        //嘗試釋放鎖
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //喚醒後繼節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。

  1. 首先經過tryRelease方法釋放鎖若是釋放鎖成功,執行第2步。
  2. 經過調用unparkSuccessor() 方法來喚醒頭結點的後繼節點。該方法內部是經過LockSupport.unpark(s.thread);來喚醒後繼節點的。

3. 共享式得到鎖

acquireShared方法代碼以下:

public final void acquireShared(int arg) {
    //嘗試獲取的鎖,若是獲取失敗執行doAcquireShared方法。
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared()嘗試獲取鎖,若是獲取失敗則經過doAcquireShared()進入等待隊列,直到獲取到資源爲止才返回。

這裏tryAcquireShared()須要自定義同步器去實現。
AQS中規定:負值表明獲取失敗,非負數標識獲取成功。

doAcquireShared方法代碼以下:

private void doAcquireShared(int arg) {
    //構建共享Node
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取前驅節點
            final Node p = node.predecessor();
            //若是是頭節點進行嘗試得到鎖
            if (p == head) {
                //若是返回值大於等於0,則說明得到鎖
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //當前節點設置爲隊列頭,並
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireQueued方法中,當前線程也經過自旋的方式來嘗試獲取同步狀態,同獨享式得到鎖同樣

  1. 若是當前節點的前驅節點頭節點才能嘗試得到鎖,若是得到成功,則把當前線程設置成頭結點,把以前的頭結點從隊列中移除,等待垃圾回收(沒有對象引用)
  2. 若是獲取鎖失敗則進入shouldParkAfterFailedAcquire方法中檢測當前節點是否能夠被安全的掛起(阻塞),若是能夠安全掛起則進入parkAndCheckInterrupt方法,把當前線程掛起,並檢查剛線程是否執行了interrupted方法。

setHeadAndPropagate方法代碼以下:

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        //若是propagate >0,說明共享鎖還有能夠進行得到鎖,繼續喚醒下一個節點
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

設置當前節點爲頭結點,並調用了doReleaseShared()方法,acquireShared方法最終調用了release方法,得看下爲何。緣由其實也很簡單,shared模式下是容許多個線程持有一把鎖的,其中tryAcquire的返回值標誌了是否容許其餘線程繼續進入。若是容許的話,須要喚醒隊列中等待的線程。其中doReleaseShared方法的邏輯很簡單,就是喚醒後繼線程。

所以acquireShared的主要邏輯就是嘗試加鎖,若是容許其餘線程繼續加鎖,那麼喚醒後繼線程,若是失敗,那麼入隊阻塞等待。

4. 共享式釋放鎖

releaseShared方法代碼以下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。

  1. 首先經過tryReleaseShared方法釋放鎖若是釋放鎖成功,執行第2步。
  2. 經過調用unparkSuccessor() 方法來喚醒頭結點的後繼節點。該方法內部是經過LockSupport.unpark(s.thread);來喚醒後繼節點的。

doReleaseShared方法代碼以下:

private void doReleaseShared() {
    for (;;) {
        // 獲取隊列的頭節點
        Node h = head;
        // 若是頭節點不爲null,而且頭節點不等於tail節點。
        if (h != null && h != tail) {
            // 獲取頭節點對應的線程的狀態
            int ws = h.waitStatus;
            // 若是頭節點對應的線程是SIGNAL狀態,則意味着「頭節點的下一個節點所對應的線程」須要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 設置「頭節點對應的線程狀態」爲空狀態。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒「頭節點的下一個節點所對應的線程」。
                unparkSuccessor(h);
            }
            // 若是頭節點對應的線程是空狀態,則設置「尾節點對應的線程所擁有的共享鎖」爲其它線程獲取鎖的空狀態。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若是頭節點發生變化,則繼續循環。不然,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}

該方法主要是喚醒後繼節點。對於可以支持多個線程同時訪問的併發組件(好比Semaphore),它和獨佔式主要區別在於tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,通常是經過循環和CAS來保證的,由於釋放同步狀態的操做會同時來自多個線程。

5. 獨佔超時得到鎖

doAcquireNanos方法代碼以下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    //計算出超時時間點
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            //計算剩餘超時時間,超時時間點deadline減去當前時間點System.nanoTime()獲得還應該睡眠的時間
            nanosTimeout = deadline - System.nanoTime();
            //若是超時,返回false,獲取鎖失敗
            if (nanosTimeout <= 0L)
                return false;
            //判斷是否須要阻塞當前線程
            //若是須要,在判斷當前剩餘納秒數是否大於1000
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //阻塞 nanosTimeout納秒數
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

該方法在自旋過程當中,當節點的前驅節點爲頭節點時嘗試獲取同步狀態,若是獲取成功則從該方法返回,這個過程和獨佔式同步獲取的過程相似,可是在同步狀態獲取失敗的處理上有所不一樣。若是當前線程獲取同步狀態失敗,則首先從新計算超時間隔nanosTimeout,則判斷是否超時(nanosTimeout小於等於0表示已經超時),若是沒有超時,則使當前線程等待nanosTimeout納秒(當已到設置的超時時間,該線程會從LockSupport.parkNanos(Object blocker,long nanos)方法返回)。

若是nanosTimeout小於等於spinForTimeoutThreshold(1000納秒)時,將不會使該線程進行 超時等待,而是進入快速的自旋過程。緣由在於,很是短的超時等待沒法作到十分精確,若是 這時再進行超時等待,相反會讓nanosTimeout的超時從總體上表現得反而不精確。所以,在超 時很是短的場景下,同步器會進入無條件的快速自旋。

相關文章
相關標籤/搜索