Java併發AQS原理分析(一)

咱們說的AQS就是AbstractQueuedSynchronizer,他在java.util.concurrent.locks包下,這個類是Java併發的一個核心類。第一次知道有這個類是在看可重入鎖ReentrantLock中,在ReentrantLock中有一個內部類Sync繼承於AbstractQueuedSynchronizer,是ReentrantLock的核心實現。在併發包中的鎖幾乎都是基於AQS來構建的,可是在看源碼的時候就會發現他們並無直接繼承AbstractQueuedSynchronizer,而是經過內部類Sync實現。java

abstract static class Sync extends AbstractQueuedSynchronizer

這裏注意的是AbstractQueuedSynchronizer是一個抽象類,定義了基本的框架。AQS核心是用一個變量state來表示狀態.
AQS也就是AbstractQueuedSynchronizer這個類只是定義了一個隊列管理線程,對於線程的狀態是子類維護的,咱們能夠理解爲師一個同步隊列,當有線程獲取鎖失敗時(多線程爭用資源被阻塞時會進入此隊列),線程會被添加到隊列的隊尾node

總結:安全

  • AQS只是負責管理線程阻塞隊列。
  • 線程的阻塞和喚醒

同步器是實現鎖的關鍵(例如AQS隊列同步器),利用同步器實現鎖的定義。鎖匙面向用戶的,它定義了使用者和鎖交互的接口,可是隱藏了實現的細節。同步器則是鎖的實現,因此他是在鎖的背後默默作着貢獻,用戶不能直接的接觸到他,他簡化了鎖的實現方式,屏蔽了同步狀態管理、線程之間的排隊、等待、喚醒等操做。這樣設計很好的隔離了使用者和實現者關注的領域。多線程

上面的表示了隊列的形態,head表示隊列的頭節點,tail表示隊列的尾節點。在源碼中他們的定義使用volatile定義的。使用volatile關鍵字保證了變量在內存中的可見性,詳見:volatile關鍵字解析。保證某個線程在出隊入隊時被其餘線程看到。併發

private transient volatile Node head;//頭節點
private transient volatile Node tail;//尾節點

AbstractQueuedSynchronizer這個類中還有一個內部類Node,用於構建隊列元素的節點類。框架


在AQS中定義了兩種資源共享方式:源碼分析

  • Exclusive:獨佔式
  • Share:共享式ui

    當以獨佔模式獲取時,嘗試經過其餘線程獲取不能成功。 多線程獲取的共享模式可能(但不須要)成功。 當共享模式獲取成功時,下一個等待線程(若是存在)也必須肯定它是否也能夠獲取。 在不一樣模式下等待的線程共享相同的FIFO隊列。.net

在不一樣的實現類中爲了實現不一樣的功能,會採用不一樣的共享方式,例如可重入鎖ReentrantLock採用的就是獨佔鎖。
AQS的不一樣實現類,不須要關注線程等待隊列的維護和管理(線程阻塞入隊、喚醒出隊),在AQS中這些是已經定義好的,不一樣的同步器只須要對如下方法進行實現便可:線程

//獨佔方式嘗試獲取資源
protected boolean tryAcquire(int arg)
//獨佔方式嘗試釋放資源
protected boolean tryRelease(int arg)
//共享方式嘗試獲取資源,返回值0表示成功可是沒有剩餘資源,負數表示失敗,正數表示成功且有剩餘資源
protected int tryAcquireShared(int arg)
//共享方式嘗試釋放資源
protected boolean tryReleaseShared(int arg)

全部自定義的同步器只須要肯定本身是那種資源貢獻方式便可:共享式、獨佔式。也能夠同時實現共享式和獨佔式ReentrantReadWriteLock讀寫鎖,多個線程能夠同時進行讀操做,可是隻能有一個線程進行寫操做。


獨佔模式同步狀態獲取:

首先先從代碼開始執行的地方看:

以獨佔模式獲取資源,忽略中斷。(若是獲取到資源,直接返回結果,不然進入等待隊列,等待再次獲取資源。) 經過調用至少一次tryAcquire(int)實現,成功返回。 不然線程排隊,可能會重複阻塞和解除阻塞,直到成功才調用tryAcquire(int)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

方法執行的順序:

  • 調用tryAcquire()方法嘗試去獲取資源,具體在子類中進行實現
  • 調用addWaiter()方法把當前線程標記爲獨佔式,並加入到隊列的尾部

這裏須要講一下addWaiter()方法中的第一個參數,線程等待隊列中的元素都是利用Node這個內部類存儲的,在Node中有兩個成員變量分別聲明瞭資源共享方式:

static final Node SHARED = new Node();//共享式
        static final Node EXCLUSIVE = null;//獨佔式
  • 調用acquireQueued()方法,讓線程在隊列中等待獲取資源,獲取資源後返回,若是在這個等待過程當中線程被中斷過,返回true,不然返回false

在方法中首先調用tryAcquire(int)方法,該方法在AbstractQueuedSynchronizer並無實現,須要子類去實現:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

第二步調用addWaiter()方法:該方法是負責維護線程等待隊列的方法,因此在AbstractQueuedSynchronizer中實現了該方法:具體是建立了一個節點類,把節點放在隊尾,若是失敗調用enq(node)方法(隊尾節點爲空)。

addWaiter()方法:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

上面的方法判斷,若是添加到隊尾失敗
enq()方法:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //若是隊列爲空(隊尾元素爲空)建立節點添加進去
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    //把tail指向head
                    tail = head;
            } else {
                //正常添加到隊尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面的代碼中添加節點都用到了比較和交換(CAS,能夠說是一種在併發環境下的解決方法),compareAndSetTail()方法可以確保節點能被安全的添加進隊列中,在多線程環境下沒法保證一個元素被正確的添加到隊列的尾部。由於進入隊列的元素都是放在隊尾的,爲了保證數據的正確性,因此在設置尾節點的時候使用CAS
第三步調用acquireQueued()方法,目的是爲了在隊列中等待被喚醒使用資源,由於以前的操做失敗後,線程會被放入隊尾,隊列是先進先出的結構,因此在隊尾的線程必須等待被喚醒。方法中主要有一個死循環,咱們稱他叫自旋,只有當條件知足的時候,得到同步狀態,退出自旋。
acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) {
        //設置成功標記
        boolean failed = true;
        try {
            //設置中斷標記
            boolean interrupted = false;
            for (;;) {
                //得到node的前驅節點
                final Node p = node.predecessor();
                //判斷前驅結點是不是頭節點
                if (p == head && tryAcquire(arg)) {
                    //把node設置爲頭結點
                    setHead(node);
                    //把p節點的前驅設置爲null,見下面的解釋
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷是否繼續等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

把p節點的前驅設置爲null,也就是以前的head節點,在上面源碼中後面的註釋標記爲help GC功能,解釋一下:在調用上面的setHead()方法的時候,方法的內部已經將當前節點的前驅結點設置爲null,在這裏再次設置一遍,爲了保證當前節點的前驅結點順利被回收(當前節點設置爲頭節點,那麼以前的頭節點就要被釋放,模擬一個正常的出隊過程)。本身畫圖更好理解。

setHead()方法:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

這裏分析上面調用的acquireQueued()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取前驅節點的狀態
        int ws = pred.waitStatus;
        //若是當前節點狀態值爲SIGNAL這個值,表明當前線程應該被掛起,等待被喚醒
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //若是大於0表明將當前節點的前驅節點移除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //小於0時把前驅結點狀態值設置爲SIGNAL,目的是爲了前驅判斷後將當前節點掛起(通知本身一下)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

在這裏咱們須要看一下Node這個類中定義的關於狀態值的定義:

//表示線程已取消,做廢狀態
        static final int CANCELLED =  1;
        //表示後繼節點應該等待當前節點釋放資源後喚醒其後繼節點
        static final int SIGNAL    = -1;
        //表示當前正處於等待狀態
        static final int CONDITION = -2;
        //表示狀態須要向後傳播
        static final int PROPAGATE = -3;
  • CANCELLED 取消狀態
  • SIGNAL 等待觸發狀態
  • CONDITION 等待條件狀態
  • PROPAGATE 狀態須要向後傳播

等待隊列是FIFO先進先出,只有前一個節點的狀態爲SIGNAL時,當前節點的線程才能被掛起。 因此在方法調用的時候把前驅結點設置爲SIGNAL。
由於前一節點被置爲SIGNAL說明後面有線程須要執行,可是還輪不到它後面的線程執行,後面線程必定要找一個前驅節點不爲CANCEL的節點,而後把它設置爲SIGNAL而後原地掛起,等待喚醒。 由於SIGNAL執行完了會喚醒緊接着的後面一個。


總結:
AQS中定義的acquire()模板方法,具體經過調用子類中的tryAcquire()方法嘗試去獲取資源,成功則返回,失敗調用addWaiter()將當前線程添加到阻塞隊列的隊尾,同時標記爲獨佔狀態。acquireQueued()方法經過自旋獲取同步狀態(該方法使線程在等待隊列中等待休息,當有機會時嘗試獲取資源),節點嘗試獲取資源的條件是當前節點的前驅節點是頭節點,嘗試獲取到資源後才返回,在整個等待過程當中若是發生過中斷,不作響應,在獲取資源後調用selfInterrupt()方法設置中斷。


獨佔模式下同步狀態的釋放:

上面根據源碼分析了獨佔模式下得到鎖的過程主要調用了模板方法acquire()方法向下分析,接着咱們分析它的相反的方法,獨佔模式下釋放鎖的過程,仍是一個模板方法release()

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //獲得頭節點
            Node h = head;
            //判斷頭節點不爲空,狀態值符合條件
            if (h != null && h.waitStatus != 0)
                //喚醒下一個等待線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease()方法依然須要子類去本身實現

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

unparkSuccessor()方法:

private void unparkSuccessor(Node node) {
        //得到當前線程的狀態值
        int ws = node.waitStatus;
        if (ws < 0)
            //小於0時置零
            compareAndSetWaitStatus(node, ws, 0);
        //得到當前節點的後繼節點
        Node s = node.next;
        //判斷爲空和狀態值是否大於0
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從尾節點向前遍歷,須要喚醒的線程一般是存儲在下一個節點中的
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //喚醒線程
            LockSupport.unpark(s.thread);
    }

unpark()方法喚醒的是等待隊列中最前面的線程,以後會再次執行上面的過程。

總結:在獲取同步狀時,在使用者的角度看在使用鎖時,同步器會維護一個同步隊列,獲取狀態失敗的線程會被加入這個隊列並進行自旋;當該節點的前驅節點是頭節點的時候而且得到了同步狀態時移出隊列。在釋放的時候,調用tryRelease()釋放並喚醒後繼節點。

相關文章
相關標籤/搜索