Java併發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放

本篇學習目標

  • 回顧CLH同步隊列的結構。
  • 學習獨佔式資源獲取和釋放的流程。

CLH隊列的結構

我在Java併發包源碼學習系列:AbstractQueuedSynchronizer#同步隊列與Node節點已經粗略地介紹了一下CLH的結構,本篇主要解析該同步隊列的相關操做,所以在這邊再回顧一下:html

AQS經過內置的FIFO同步雙向隊列來完成資源獲取線程的排隊工做,內部經過節點head【其實是虛擬節點,真正的第一個線程在head.next的位置】和tail記錄隊首和隊尾元素,隊列元素類型爲Node。java

  • 若是當前線程獲取同步狀態失敗(鎖)時,AQS 則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程
  • 當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。

接下來將要經過AQS以獨佔式的獲取和釋放資源的具體案例來詳解內置CLH阻塞隊列的工做流程,接着往下看吧。node

資源獲取

public final void acquire(int arg) {
        if (!tryAcquire(arg) && // tryAcquire由子類實現,表示獲取鎖,若是成功,這個方法直接返回了
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若是獲取失敗,執行
            selfInterrupt();
    }
  • tryAcquire(int)是AQS提供給子類實現的鉤子方法,子類能夠自定義實現獨佔式獲取資源的方式,獲取成功則返回true,失敗則返回false。
  • 若是tryAcquire方法獲取資源成功就直接返回了,失敗的化就會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))的邏輯,咱們能夠將其進行拆分,分爲兩步:
    • addWaiter(Node.EXCLUSIVE):將該線程包裝成爲獨佔式的節點,加入隊列中。
    • acquireQueued(node,arg):若是當前節點是等待節點的第一個,即head.next,就嘗試獲取資源。若是該方法返回true,則會進入selfInterrupt()的邏輯,進行阻塞。

接下來咱們分別來看看addWaiteracquireQueued兩個方法。編程

入隊Node addWaiter(Node mode)

根據傳入的mode參數決定獨佔或共享模式,爲當前線程建立節點,併入隊。多線程

// 其實就是把當前線程包裝一下,設置模式,造成節點,加入隊列
	private Node addWaiter(Node mode) {
        // 根據mode和thread建立節點
        Node node = new Node(Thread.currentThread(), mode);
        // 記錄一下原尾節點
        Node pred = tail;
        // 尾節點不爲null,隊列不爲空,快速嘗試加入隊尾。
        if (pred != null) {
            // 讓node的prev指向尾節點
            node.prev = pred;
            // CAS操做設置node爲新的尾節點,tail = node
            if (compareAndSetTail(pred, node)) {
                // 設置成功,讓原尾節點的next指向新的node,實現雙向連接
                pred.next = node;
                // 入隊成功,返回
                return node;
            }
        }
        // 快速入隊失敗,進行不斷嘗試
        enq(node);
        return node;
    }

幾個注意點:併發

  • 入隊的操做其實就是將線程經過指定模式包裝爲Node節點,若是隊列尾節點不爲null,利用CAS嘗試快速加入隊尾。
  • 快速入隊失敗的緣由有兩個:
    • 隊列爲空,即尚未進行初始化。
    • CAS設置尾節點的時候失敗。
  • 在第一次快速入隊失敗後,將會走到enq(node)邏輯,不斷進行嘗試,直到設置成功。

不斷嘗試Node enq(final Node node)

private Node enq(final Node node) {
        // 自旋,俗稱死循環,直到設置成功爲止
        for (;;) {
            // 記錄原尾節點
            Node t = tail;
            // 第一種狀況:隊列爲空,原先head和tail都爲null,
            // 經過CAS設置head爲哨兵節點,若是設置成功,tail也指向哨兵節點
            if (t == null) { // Must initialize
                // 初始化head節點
                if (compareAndSetHead(new Node()))
                    // tail指向head,下個線程來的時候,tail就不爲null了,就走到了else分支
                    tail = head;
            // 第二種狀況:CAS設置尾節點失敗的狀況,和addWaiter同樣,只不過它在for(;;)中
            } else {
                // 入隊,將新節點的prev指向tail
                node.prev = t;
                // CAS設置node爲尾部節點
                if (compareAndSetTail(t, node)) {
                    //原來的tail的next指向node
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq的過程是自選設置隊尾的過程,若是設置成功,就返回。若是設置失敗,則一直嘗試設置,理念就是,我總能等待設置成功那一天。app

咱們還能夠發現,head是延遲初始化的,在第一個節點嘗試入隊的時候,head爲null,這時使用了new Node()建立了一個不表明任何線程的節點,做爲虛擬頭節點,且咱們須要注意它的waitStatus初始化爲0,這一點對咱們以後分析有指導意義。ide

若是是CAS失敗致使重複嘗試,那就仍是讓他繼續CAS好了。oop

boolean acquireQueued(Node, int)

// 這個方法若是返回true,代碼將進入selfInterrupt()
	final boolean acquireQueued(final Node node, int arg) {
        // 注意默認爲true
        boolean failed = true;
        try {
            // 是否中斷
            boolean interrupted = false;
            // 自旋,即死循環
            for (;;) {
                // 獲得node的前驅節點
                final Node p = node.predecessor();
                // 咱們知道head是虛擬的頭節點,p==head表示若是node爲阻塞隊列的第一個真實節點
                // 就執行tryAcquire邏輯,這裏tryAcquire也須要由子類實現
                if (p == head && tryAcquire(arg)) {
                    // tryAcquire獲取成功走到這,執行setHead出隊操做 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 走到這有兩種狀況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了
                // 這裏就判斷 若是當前線程爭鎖失敗,是否須要掛起當前這個線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 死循環退出,只有tryAcquire獲取鎖失敗的時候failed才爲true
            if (failed)
                cancelAcquire(node);
        }
    }

出隊void setHead(Node)

CLU同步隊列遵循FIFO,首節點的線程釋放同步狀態後,喚醒下一個節點。將隊首節點出隊的操做實際上就是,將head指針指向將要出隊的節點就能夠了。源碼分析

private void setHead(Node node) {
        // head指針指向node
        head = node;
        // 釋放資源
        node.thread = null;
        node.prev = null;
    }

boolean shouldParkAfterFailedAcquire(Node,Node)

/**
     * 走到這有兩種狀況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了
     * 這裏就判斷 若是當前線程爭鎖失敗,是否須要掛起當前這個線程
     *
     * 這裏pred是前驅節點, node就是當前節點
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驅節點的waitStatus
        int ws = pred.waitStatus;
        // 前驅節點爲SIGNAL【-1】直接返回true,表示當前節點能夠被直接掛起
        if (ws == Node.SIGNAL)
            return true;
        // ws>0 CANCEL 說明前驅節點取消了排隊
        if (ws > 0) {
            // 下面這段循環其實就是跳過全部取消的節點,找到第一個正常的節點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 將該節點的後繼指向node,創建雙向鏈接
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 官方說明:走到這waitStatus只能是0或propagate,默認狀況下,當有新節點入隊時,waitStatus老是爲0
             * 下面用CAS操做將前驅節點的waitStatus值設置爲signal
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 返回false,接着會再進入循環,此時前驅節點爲signal,返回true
        return false;
    }

針對前驅節點的waitStatus有三種狀況:

等待狀態不會爲 Node.CONDITION ,由於它用在 ConditonObject 中

  1. ws==-1,即爲Node.SIGNAL,表示當前節點node能夠被直接掛起,在pred線程釋放同步狀態時,會對node線程進行喚醒。
  2. ws > 0,即爲Node.CANCELLED,說明前驅節點已經取消了排隊【多是超時,多是被中斷】,則須要找到前面沒有取消的前驅節點,一直找,直到找到爲止。
  3. ws == 0 or ws == Node.PROPAGATE:
    • 默認狀況下,當有新節點入隊時,waitStatus老是爲0,用CAS操做將前驅節點的waitStatus值設置爲signal,下一次進來的時候,就走到了第一個分支。
    • 當釋放鎖的時候,會將佔用鎖的節點的ws狀態更新爲0。

PROPAGATE表示共享模式下,前驅節點不只會喚醒後繼節點,同時也可能會喚醒後繼的後繼。

咱們能夠發現,這個方法在第一次走進來的時候是不會返回true的。緣由在於,返回true的條件時前驅節點的狀態爲SIGNAL,而第一次的時候尚未給前驅節點設置SIGNAL呢,只有在CAS設置了狀態以後,第二次進來纔會返回true。

那SIGNAL的意義究竟是什麼呢?

這裏引用:併發編程——詳解 AQS CLH 鎖 # 爲何 AQS 須要一個虛擬 head 節點

waitStatus這裏用ws簡稱,每一個節點都有ws變量,用於表示該節點的狀態。初始化的時候爲0,若是被取消爲1,signal爲-1。

若是某個節點的狀態是signal的,那麼在該節點釋放鎖的時候,它須要喚醒下一個節點。

所以,每一個節點在休眠以前,若是沒有將前驅節點的ws設置爲signal,那麼它將永遠沒法被喚醒。

所以咱們會發現上面當前驅節點的ws爲0或propagate的時候,採用cas操做將ws設置爲signal,目的就是讓上一個節點釋放鎖的時候可以通知本身。

boolean parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        // 掛起當前線程
        LockSupport.park(this);
        return Thread.interrupted();
    }

shouldParkAfterFailedAcquire方法返回true以後,就會調用該方法,掛起當前線程。

LockSupport.park(this)方法掛起的線程有兩種途徑被喚醒:1.被unpark() 2.被interrupt()。

須要注意這裏的Thread.interrupted()會清除中斷標記位。

void cancelAcquire(node)

上面tryAcquire獲取鎖失敗的時候,會走到這個方法。

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
		// 將節點的線程置空
        node.thread = null;

        // 跳過全部的取消的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        // 這裏在沒有併發的狀況下,preNext和node是一致的
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here. 能夠直接寫而不是用CAS
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        // 設置node節點爲取消狀態
        node.waitStatus = Node.CANCELLED;

        // 若是node爲尾節點就CAS將pred設置爲新尾節點
        if (node == tail && compareAndSetTail(node, pred)) {
            // 設置成功以後,CAS將pred的下一個節點置爲空
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head && // pred不是首節點
                ((ws = pred.waitStatus) == Node.SIGNAL || // pred的ws爲SIGNAL 或 能夠被CAS設置爲SIGNAL
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) { // pred線程非空
                // 保存node 的下一個節點
                Node next = node.next; 
                // node的下一個節點不是cancelled,就cas設置pred的下一個節點爲next
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 上面的狀況除外,則走到這個分支,喚醒node的下一個可喚醒節點線程
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

釋放資源

boolean release(int arg)

public final boolean release(int arg) {
        if (tryRelease(arg)) { // 子類實現tryRelease方法
            // 得到當前head
            Node h = head;
            // head不爲null而且head的等待狀態不爲0
            if (h != null && h.waitStatus != 0)
                // 喚醒下一個能夠被喚醒的線程,不必定是next哦
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • tryRelease(int)是AQS提供給子類實現的鉤子方法,子類能夠自定義實現獨佔式釋放資源的方式,釋放成功並返回true,不然返回false。
  • unparkSuccessor(node)方法用於喚醒等待隊列中下一個能夠被喚醒的線程,不必定是下一個節點next,好比它多是取消狀態。
  • head 的ws必須不等於0,爲何呢?當一個節點嘗試掛起本身以前,都會將前置節點設置成SIGNAL -1,就算是第一個加入隊列的節點,在獲取鎖失敗後,也會將虛擬節點設置的 ws 設置成 SIGNAL,而這個判斷也是防止多線程重複釋放,接下來咱們也能看到釋放的時候,將ws設置爲0的操做。

void unparkSuccessor(Node node)

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        // 若是node的waitStatus<0爲signal,CAS修改成0
        // 將 head 節點的 ws 改爲 0,清除信號。表示,他已經釋放過了。不能重複釋放。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 喚醒後繼節點,可是有可能後繼節點取消了等待 即 waitStatus == 1
        Node s = node.next;
        // 若是後繼節點爲空或者它已經放棄鎖了
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從隊尾往前找,找到沒有沒取消的全部節點排在最前面的【直到t爲null或t==node才退出循環嘛】
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 若是>0表示節點被取消了,就一直向前找唄,找到以後不會return,還會一直向前
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 若是後繼節點存在且沒有被取消,會走到這,直接喚醒後繼節點便可
        if (s != null)
            LockSupport.unpark(s.thread);
    }

參考閱讀

相關文章
相關標籤/搜索