java併發編程系列:牛逼的AQS(上)

標籤: 「咱們都是小青蛙」公衆號文章java

設計java的大叔們爲了咱們方便的自定義各類同步工具,爲咱們提供了大殺器AbstractQueuedSynchronizer類,這是一個抽象類,如下咱們會簡稱AQS,翻譯成中文就是抽象隊列同步器。這傢伙老有用了,封裝了各類底層的同步細節,咱們程序員想自定義本身的同步工具的時候,只須要定義這個類的子類並覆蓋它提供的一些方法就行了。咱們前邊用到的顯式鎖ReentrantLock就是藉助了AQS的神力實現的,如今立刻來看看這個類的實現原理以及如何使用它自定義同步工具。node

同步狀態

AQS中維護了一個名叫state的字段,是由volatile修飾的,它就是所謂的同步狀態程序員

private volatile int state;
複製代碼

而且提供了幾個訪問這個字段的方法:算法

方法名 描述
protected final int getState() 獲取state的值
protected final void setState(int newState) 設置state的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新state的值

能夠看到這幾個方法都是final修飾的,說明子類中沒法重寫它們。另外它們都是protected修飾的,說明只能在子類中使用這些方法。安全

在一些線程協調的場景中,一個線程在進行某些操做的時候其餘的線程都不能執行該操做,好比持有鎖時的操做,在同一時刻只能有一個線程持有鎖,咱們把這種情景稱爲獨佔模式;在另外一些線程協調的場景中,能夠同時容許多個線程同時進行某種操做,咱們把這種情景稱爲共享模式bash

咱們能夠經過修改state字段表明的同步狀態來實現多線程的獨佔模式或者共享模式多線程

好比在獨佔模式下,咱們能夠把state的初始值設置成0,每當某個線程要進行某項獨佔操做前,都須要判斷state的值是否是0,若是不是0的話意味着別的線程已經進入該操做,則本線程須要阻塞等待;若是是0的話就把state的值設置成1,本身進入該操做。這個先判斷再設置的過程咱們能夠經過CAS操做保證原子性,咱們把這個過程稱爲嘗試獲取同步狀態。若是一個線程獲取同步狀態成功了,那麼在另外一個線程嘗試獲取同步狀態的時候發現state的值已是1了就一直阻塞等待,直到獲取同步狀態成功的線程執行完了須要同步的操做後釋放同步狀態,也就是把state的值設置爲0,並通知後續等待的線程。併發

共享模式下的道理也差很少,好比說某項操做咱們容許10個線程同時進行,超過這個數量的線程就須要阻塞等待。那麼咱們就能夠把state的初始值設置爲10,一個線程嘗試獲取同步狀態的意思就是先判斷state的值是否大於0,若是不大於0的話意味着當前已經有10個線程在同時執行該操做,本線程須要阻塞等待;若是state的值大於0,那麼能夠把state的值減1後進入該操做,每當一個線程完成操做的時候須要釋放同步狀態,也就是把state的值加1,並通知後續等待的線程。ide

因此對於咱們自定義的同步工具來講,須要自定義獲取同步狀態與釋放同步狀態的方式,而AQS中的幾個方法正是用來作這個事兒的:工具

方法名 描述
protected boolean tryAcquire(int arg) 獨佔式的獲取同步狀態,獲取成功返回true,不然false
protected boolean tryRelease(int arg) 獨佔式的釋放同步狀態,釋放成功返回true,不然false
protected int tryAcquireShared(int arg) 共享式的獲取同步狀態,獲取成功返回true,不然false
protected boolean tryReleaseShared(int arg) 共享式的釋放同步狀態,釋放成功返回true,不然false
protected boolean isHeldExclusively() 在獨佔模式下,若是當前線程已經獲取到同步狀態,則返回 true;其餘狀況則返回 false

咱們說AQS是一個抽象類,咱們以tryAcquire爲例看看它在AQS中的實現:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼

喔😯個人天,居然只是拋出個異常,這不科學。是的,在AQS中的確沒有實現這個方法,不一樣的同步工具針對的具體併發場景不一樣,因此如何獲取同步狀態和如何釋放同步狀態是須要咱們在自定義的AQS子類中實現的,若是咱們自定義的同步工具須要在獨佔模式下工做,那麼咱們就重寫tryAcquiretryReleaseisHeldExclusively方法,若是是在共享模式下工做,那麼咱們就重寫tryAcquireSharedtryReleaseShared方法。好比在獨佔模式下咱們能夠這樣定義一個AQS子類:

public class Sync extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int arg) {
        return compareAndSetState(0, 1);
    }

    @Override
    protected boolean tryRelease(int arg) {
        setState(0);
        return true;
    }

    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
}
複製代碼

tryAcquire表示嘗試獲取同步狀態,咱們這裏定義了一種極其簡單的獲取方式,就是使用CAS的方式把state的值設置成1,若是成功則返回true,失敗則返回falsetryRelease表示嘗試釋放同步狀態,這裏一樣採用了一種極其簡單的釋放算法,直接把state的值設置成0就行了。isHeldExclusively就表示當前是否有線程已經獲取到了同步狀態。若是你有更復雜的場景,可使用更復雜的獲取和釋放算法來重寫這些方法

經過上邊的嘮叨,咱們只是瞭解了啥是個同步狀態,學會了如何經過繼承AQS來自定義獨佔模式和共享模式下獲取和釋放同步狀態的各類方法,可是你會驚訝的發現會了這些仍然沒有什麼卵用。咱們指望的效果是一個線程獲取同步狀態成功會當即返回true,並繼續執行某些須要同步的操做,在操做完成後釋放同步狀態,若是獲取同步狀態失敗的話會當即返回false,而且進入阻塞等待狀態,那線程是怎麼進入等待狀態的呢?不要走開,下節更精彩。

同步隊列

AQS中還維護了一個所謂的同步隊列,這個隊列的節點類被定義成了一個靜態內部類,它的主要字段以下:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
}
複製代碼

AQS中定義一個頭節點引用,一個尾節點引用

private transient volatile Node head;
private transient volatile Node tail;
複製代碼

經過這兩個節點就能夠控制到這個隊列,也就是說能夠在隊列上進行諸如插入和移除操做。能夠看到Node類中有一個Thread類型的字段,這代表每個節點都表明一個線程。咱們指望的效果是當一個線程獲取同步狀態失敗以後,就把這個線程阻塞幷包裝成Node節點插入到這個同步隊列中,當獲取同步狀態成功的線程釋放同步狀態的時候,同時通知在隊列中下一個未獲取到同步狀態的節點,讓該節點的線程再次去獲取同步狀態

這個節點類的其餘字段的意思咱們以後遇到會詳細嘮叨,咱們先看一下獨佔模式共享模式下在什麼狀況下會往這個同步隊列裏添加節點,什麼狀況下會從它裏邊移除節點,以及線程阻塞和恢復的實現細節。

獨佔式同步狀態獲取與釋放

獨佔模式下,同一個時刻只能有一個線程獲取到同步狀態,其餘同時去獲取同步狀態的線程會被包裝成一個Node節點放到同步隊列中,直到獲取到同步狀態的線程釋放掉同步狀態才能繼續執行。初始狀態的同步隊列是一個空隊列,裏邊一個節點也沒有,就長這樣:

image_1c3cqmj2g1ve01vd36i71ui91jo645.png-5.5kB

接下來咱們就要詳細看一下獲取同步狀態失敗的線程是如何被包裝成Node節點插入到隊列中同時阻塞等待的。

前邊說過,獲取和釋放同步狀態的方式是由咱們自定義的,在獨佔模式須要咱們定義AQS的子類而且重寫下邊這些方法:

protected boolean tryAcquire(int arg) protected boolean tryRelease(int arg) protected boolean isHeldExclusively() 複製代碼

在定義了這些方法後,誰去調用它們呢?AQS裏定義了一些調用它們的方法,這些方法都是由public final修飾的:

方法名 描述
void acquire(int arg) 獨佔式獲取同步狀態,若是獲取成功則返回,若是失敗則將當前線程包裝成Node節點插入同步隊列中。
void acquireInterruptibly(int arg) 與上個方法意思相同,只不過一個線程在執行本方法過程當中被別的線程中斷,則拋出InterruptedException異常。
boolean tryAcquireNanos(int arg, long nanos) 在上個方法的基礎上加了超時限制,若是在給定時間內沒有獲取到同步狀態,則返回false,不然返回true
boolean release(int arg) 獨佔式的釋放同步狀態。

突然擺了這麼多方法可能有點突兀哈,咱們先看一下acquire方法的源代碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

代碼顯示acquire方法其實是經過tryAcquire方法來獲取同步狀態的,若是tryAcquire方法返回true則結束,若是返回false則繼續執行。這個tryAcquire方法就是咱們本身規定的獲取同步狀態的方式。假設如今有一個線程已經獲取到了同步狀態,而線程t1同時調用tryAcquire方法嘗試獲取同步狀態,結果就是獲取失敗,會先執行addWaiter方法,咱們一塊兒來看一下這個方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);  //構造一個新節點
    Node pred = tail;
    if (pred != null) { //尾節點不爲空,插入到隊列最後
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {       //更新tail,而且把新節點插入到列表最後
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {    //tail節點爲空,初始化隊列
            if (compareAndSetHead(new Node()))  //設置head節點
                tail = head;
        } else {    //tail節點不爲空,開始真正插入節點
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼

能夠看到,這個addWaiter方法就是向隊列中插入節點的方法。首先會構造一個Node節點,假設這個節點爲節點1,它的thread字段就是當前線程t2,這個節點被剛剛建立出來的樣子就是這樣:

image_1c3dakn72sg8n8s1ib71jeu19pp3t.png-19.6kB

而後咱們再分析一下具體的插入過程。若是tail節點不爲空,直接把新節點插入到隊列後邊就返回了,若是tail節點爲空,調用enq方法先初始化一下headtail節點以後再把新節點插入到隊列後邊。enq方法的這幾行初始化隊列的代碼須要特別注意:

if (t == null) {    //tail節點爲空,初始化隊列
    if (compareAndSetHead(new Node()))  //設置head節點
        tail = head;
} else {
    //真正插入節點的過程
}
複製代碼

也就是說在隊列爲空的時候會先讓headtail引用指向同一個節點後再進行插入操做,而這個節點居然就是簡簡單單的new Node(),真是沒有任何添加劑呀~ 咱們先把這個節點稱爲0號節點吧,這個節點的任何一個字段都沒有被賦值,因此在第一次節點插入後,隊列其實長這樣:

image_1c3dapsu21h36kbj1rev135718d05q.png-42.3kB

其中的節點1纔是咱們真正插入的節點,表明獲取同步狀態失敗的線程,0號節點是在初始化過程當中建立的,咱們以後再看它有什麼用。

addWaiter方法調用完會返回新插入的那個節點,也就是節點1acquire方法會接着調用acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();  //獲取前一個節點
            if (p == head && tryAcquire(arg)) { 前一個節點是頭節點再次嘗試獲取同步狀態
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

能夠看到,若是新插入的節點的前一個節點是頭節點的話,會再次調用tryAcquire嘗試獲取同步狀態,這個主要是怕獲取同步狀態的線程很快就把同步狀態給釋放了,因此在當前線程阻塞以前抱着僥倖的心理再試試能不能成功獲取到同步狀態,若是僥倖能夠獲取,那就調用setHead方法把頭節點換成本身:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
複製代碼

同時把本Node節點的thread字段設置爲null,意味着本身成爲了0號節點

若是當前Node節點不是頭節點或者已經獲取到同步狀態的線程並無釋放同步狀態,那就乖乖的往下執行shouldParkAfterFailedAcquire方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;   //前一個節點的狀態
    if (ws == Node.SIGNAL)  //Node.SIGNAL的值是-1
        return true;
    if (ws > 0) {   //當前線程已被取消操做,把處於取消狀態的節點都移除掉
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {    //設置前一個節點的狀態爲-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
複製代碼

這個方法是對Node節點中的waitStatus的各類操做。若是當前節點的前一個節點的waitStatusNode.SIGNAL,也就是-1,那麼意味着當前節點能夠被阻塞,若是前一個節點的waitStatus大於0,意味着該節點表明的線程已經被取消操做了,須要把全部waitStatus大於0的節點都移除掉,若是前一個節點的waitStatus既不是-1,也不大於0,就把若是前一個節點的waitStatus設置成Node.SIGNAL。咱們知道Node類裏定義了一些表明waitStatus的靜態變量,咱們來看看waitStatus的各個值都是什麼意思吧:

靜態變量 描述
Node.CANCELLED 1 節點對應的線程已經被取消了(咱們後邊詳細會說線程如何被取消)
Node.SIGNAL -1 表示後邊的節點對應的線程處於等待狀態
Node.CONDITION -2 表示節點在等待隊列中(稍後會詳細說什麼是等待隊列)
Node.PROPAGATE -3 表示下一次共享式同步狀態獲取將被無條件的傳播下去(稍後再說共享式同步狀態的獲取與釋放時詳細嘮叨)
0 初始狀態

如今咱們重點關注waitStauts0或者-1的狀況。目前咱們的當前節點是節點1,它對應着當前線程,當前節點的前一個節點是0號節點。在一開始,全部的Node節點的waitStatus都是0,因此在第一次調用shouldParkAfterFailedAcquire方法時,當前節點的前一個節點,也就是0號節點waitStatus會被設置成Node.SIGNAL當即返回false,這個狀態的意思就是說0號節點後邊的節點都處於等待狀態,如今的隊列已經變成了這個樣子:

image_1c3datk24q84tg05qi14di14qk67.png-59.7kB

因爲acquireQueued方法是一個循環,在第二次執行到shouldParkAfterFailedAcquire方法時,因爲0號節點waitStatus已經爲Node.SIGNAL了,因此shouldParkAfterFailedAcquire方法會返回true,而後繼續執行parkAndCheckInterrupt方法:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
複製代碼

LockSupport.park(this)方法:

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);     //調用底層方法阻塞線程
    setBlocker(t, null);
}
複製代碼

其中的UNSAFE.park(false, 0L)方法以下:

public native void park(boolean var1, long var2);
複製代碼

這個就表示當即阻塞線程,這是一個底層方法,咱們程序員就不用關心操做系統時如何阻塞線程的了。呼~至此,咱們在獨佔模式下跑完了一個獲取不到同步狀態的線程是怎麼被插入到同步隊列以及被阻塞的過程。這個過程須要你們多看幾遍,畢竟比較麻煩哈~

若是此時再新來一個線程t2調用acquire方法要求獲取同步狀態的話,它一樣會被包裝成Node插入同步隊列的,效果就像下圖同樣:

image_1c3dbas6bkii33r5c0g3f3ss6k.png-60.4kB

你們注意一下節點1waitStauts已經變成-1了,別忘了waitStauts值爲-1的時候,也就是Node.SIGNAL意味着它的下一個節點處於等待狀態,由於0號節點節點1waitStauts值都爲-1,也就意味着它們兩個的後繼節點,也就是節點1節點2都處於等待狀態。

以上就是線程t1t2在某個線程已經獲取了同步狀態的狀況下調用acquire方法時所產生的後果,acquireInterruptiblyacquire方法基本一致,只不過它是可中斷的,也就是說在一個線程調用acquireInterruptibly因爲沒有獲取到同步狀態而發生阻塞以後,若是有別的線程中斷了這個線程,則acquireInterruptibly方法會拋出InterruptedException異常並返回。tryAcquireNanos也是支持中斷的,只不過還帶有一個超時時間,若是超出了該時間tryAcquireNanos尚未返回,則返回false

若是一個線程在各類acquire方法中獲取同步狀態失敗的話,會被包裝成Node節點放到同步隊列,這個能夠看做是一個插入過程。有進就有出,若是一個線程完成了獨佔操做,就須要釋放同步狀態,同時把同步隊列第一個(非0號節點)節點表明的線程叫醒,在咱們上邊的例子中就是節點1,讓它繼續執行,這個釋放同步狀態的過程就須要調用release方法了:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼

能夠看到這個方法會用到咱們在AQS子類裏重寫的tryRelease方法,若是成功的釋放了同步狀態,那麼就繼續往下執行,若是頭節點head不爲null而且headwaitStatus不爲0,就執行unparkSuccessor方法:

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;   //節點的等待狀態
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next; 
        if (s == null || s.waitStatus > 0) {    //若是node爲最後一個節點或者node的後繼節點被取消了
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)   
                if (t.waitStatus <= 0)  //找到離頭節點最近的waitStatus爲負數的節點
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);   //喚醒該節點對應的線程
    }
複製代碼

咱們如今的頭節點head指向的是0號節點,它的狀態爲-1,因此它的waitStatus首先會被設置成0,接着它的後繼節點,也就是節點1表明的線程會被這樣調用LockSupport.unpark(s.thread),這個方法的意思就是喚醒節點1對應的線程t2,把節點1thread設置爲null並把它設置爲頭節點,修改後的隊列就長下邊這樣:

image_1c3dfd25n1nc1acd1c7btcvd7u71.png-41.9kB

因此如今等待隊列裏只有一個t2線程是阻塞的。這就是釋放同步狀態的過程。

獨佔式同步工具舉例

看完了獨佔式同步狀態獲取與釋放的原理,咱們能夠嘗試自定義一個簡單的獨佔式同步工具,咱們經常使用的就是一個獨佔式同步工具,咱們下邊來定義一個簡單的鎖:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class PlainLock {

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private Sync sync = new Sync();


    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
}
複製代碼

咱們在PlainLock中定義了一個AQS子類Sync,重寫了一些方法來自定義了在獨佔模式下獲取和釋放同步狀態的方式,靜態內部類就是AQS子類在咱們自定義同步工具中最多見的定義方式。而後在PlainLock裏定義了lock方法表明加鎖,unlock方法代碼解鎖,具體的方法調用咱們上邊都快說吐了,這裏就不想讓你再吐一次了,看一下這個鎖的應用:

public class Increment {

    private int i;

    private PlainLock lock = new PlainLock();

    public void increase() {
        lock.lock();
        i++;
        lock.unlock();
    }

    public int getI() {
        return i;
    }

    public static void test(int threadNum, int loopTimes) {
        Increment increment = new Increment();

        Thread[] threads = new Thread[threadNum];

        for (int i = 0; i < threads.length; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < loopTimes; i++) {
                        increment.increase();
                    }
                }
            });
            threads[i] = t;
            t.start();
        }

        for (Thread t : threads) {  //main線程等待其餘線程都執行完成
            try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        System.out.println(threadNum + "個線程,循環" + loopTimes + "次結果:" + increment.getI());
    }

    public static void main(String[] args) {
        test(20, 1);
        test(20, 10);
        test(20, 100);
        test(20, 1000);
        test(20, 10000);
        test(20, 100000);
        test(20, 1000000);
    }
}
複製代碼

執行結果:

20個線程,循環1次結果:20
20個線程,循環10次結果:200
20個線程,循環100次結果:2000
20個線程,循環1000次結果:20000
20個線程,循環10000次結果:200000
20個線程,循環100000次結果:2000000
20個線程,循環1000000次結果:20000000
複製代碼

很顯然這個咱們只寫了幾行代碼的鎖已經起了做用,這就是AQS的強大之處,咱們只須要寫不多的東西就能夠構建一個同步工具,並且不用考慮底層複雜的同步狀態管理、線程的排隊、等待與喚醒等等機制。

共享式同步狀態獲取與釋放

共享式獲取與獨佔式獲取的最大不一樣就是在同一時刻是否有多個線程能夠同時獲取到同步狀態。獲取不到同步狀態的線程也須要被包裝成Node節點後阻塞的,而能夠訪問同步隊列的方法就是下邊這些:

|void acquireShared(int arg)|共享式獲取同步狀態,若是失敗則將當前線程包裝成Node節點插入同步隊列中。。| |void acquireSharedInterruptibly(int arg)|與上個方法意思相同,只不過一個線程在執行本方法過程當中被別的線程中斷,則拋出InterruptedException異常。| |boolean tryAcquireSharedNanos(int arg, long nanos)|在上個方法的基礎上加了超時限制,若是在給定時間內沒有獲取到同步狀態,則返回false,不然返回true。| |boolean releaseShared(int arg)|共享式的釋放同步狀態。|

哈,和獨佔模式下的方法長得很是像嘛,只是每一個方法中都加了一個Shared單詞。它們的功能也是同樣同樣的,以acquireShared方法爲例:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
複製代碼

這個方法會調用咱們自定義的AQS子類中的tryAcquireShared方法去獲取同步狀態,只不過tryAcquireShared的返回值是一個int值,該值不小於0的時候表示獲取同步狀態成功,則acquireShared方法直接返回,什麼都不作;若是該返回值大於0的時候,表示獲取同步狀態失敗,則會把該線程包裝成Node節點插入同步隊列,插入過程和獨佔模式下的過程差很少,咱們這就很少廢話了。

另外兩個acquire方法也很少廢話了,只不過一個是可中斷的,一個是支持超時的~

釋放同步狀態的方法也和獨佔模式的差很少:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

這個方法會調用咱們自定義的AQS子類中的tryReleaseShared方法去釋放同步狀態,若是釋放成功的話會移除同步隊列中的一個阻塞節點。與獨佔模式不一樣的一點是,可能同時會有多個線程釋釋放同步狀態,也就是可能多個線程會同時移除同步隊列中的阻塞節點,哈哈,如何保證移除過程的安全性?這個問題就不看源碼了,你們本身嘗試着寫寫。

共享式同步工具舉例

假設某個操做只能同時有兩個線程操做,其餘的線程須要處於等待狀態,咱們能夠這麼定義這個鎖:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DoubleLock {


    private static class Sync extends AbstractQueuedSynchronizer {

        public Sync() {
            super();
            setState(2);    //設置同步狀態的值
        }

        @Override
        protected int tryAcquireShared(int arg) {
            while (true) {
                int cur = getState();
                int next = getState() - arg;
                if (compareAndSetState(cur, next)) {
                    return next;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            while (true) {
                int cur = getState();
                int next = cur + arg;
                if (compareAndSetState(cur, next)) {
                    return true;
                }
            }
        }
    }
    
    private Sync sync = new Sync();
    
    public void lock() {
        sync.acquireShared(1);     
    }
    
    public void unlock() {
        sync.releaseShared(1);
    }
}
複製代碼

state的初始值釋2,每當一個線程調用tryAcquireShared獲取到同步狀態時,state的值都會減1,當state的值爲0時,其餘線程就沒法獲取到同步狀態從而被包裝成Node節點進入同步隊列等待。

AQS中其餘針對同步隊列的重要方法

除了一系列acquirerelease方法,AQS還提供了許多直接訪問這個隊列的方法,它們由都是public final修飾的:

方法名 描述
boolean hasQueuedThreads() 是否有正在等待獲取同步狀態的線程。
boolean hasContended() 是否某個線程曾經由於獲取不到同步狀態而阻塞
Thread getFirstQueuedThread() 返回隊列中第一個(等待時間最長的)線程,若是目前沒有將任何線程加入隊列,則返回 null
boolean isQueued(Thread thread) 若是給定線程的當前已加入同步隊列,則返回 true。
int getQueueLength() 返回等待獲取同步狀態的線程數估計值,由於在構造該結果時,多線程環境下實際線程集合可能發生大的變化
Collection<Thread> getQueuedThreads() 返回包含可能正在等待獲取的線程 collection,由於在構造該結果時,多線程環境下實際線程集合可能發生大的變化

若是有須要的話,能夠在咱們自定義的同步工具中使用它們。

題外話

寫文章挺累的,有時候你以爲閱讀挺流暢的,那實際上是背後無數次修改的結果。若是你以爲不錯請幫忙轉發一下,萬分感謝~ 這裏是個人公衆號「咱們都是小青蛙」,裏邊有更多技術乾貨,時不時扯一下犢子,歡迎關注:

小冊

另外,做者還寫了一本MySQL小冊:《MySQL是怎樣運行的:從根兒上理解MySQL》的連接 。小冊的內容主要是從小白的角度出發,用比較通俗的語言講解關於MySQL進階的一些核心概念,好比記錄、索引、頁面、表空間、查詢優化、事務和鎖等,總共的字數大約是三四十萬字,配有上百幅原創插圖。主要是想下降普通程序員學習MySQL進階的難度,讓學習曲線更平滑一點~ 有在MySQL進階方面有疑惑的同窗能夠看一下:

相關文章
相關標籤/搜索