深刻併發之(一) 歷來ReentrantLock看AbstractQueuedSynchronizer源碼

深刻併發包之(一) 歷來ReentrantLock看AbstractQueuedSynchronizer源碼

這篇文章是一篇併發相關的源碼分析文章,主要從源碼級別分析AQS操做(主要關於阻塞鎖的實現),從而加深對併發中ReentrantLockReentrantReadWriteLock兩種鎖的理解。java

AQS概述

下面引用JDK文檔中對AQS類的描述:node

爲實現依賴於先進先出 (FIFO) 等待隊列的阻塞鎖和相關同步器(信號量、事件,等等)提供一個框架。此類的設計目標是成爲依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎。併發

在這篇文章中咱們只關注有關鎖的實現的相關部分,那麼這段話就能夠這樣理解。app

AQS類的內部實現了一個FIFO的等待隊列(實際是一個鏈表),用來保存線程等相關內容,同時,利用這個隊列來實現一個阻塞鎖。框架

同時AQS中有一個int類型的值state,這個值有相關的原子操做,咱們可使用相關的方法來修改這個值。函數

當state的值等於0時,咱們認爲沒有線程得到了鎖;當state的值大於0時,咱們認爲已經有線程得到了鎖,同時,線程每重入一次,state的值加一,這樣就能夠實現鎖的重入源碼分析

AQS中咱們有獨佔共享兩種模式,所以咱們既能夠實現獨佔鎖,也能夠實現共享鎖。ui

隊列的實現

AQS的隊列的實現主要有一個內部類Node,來表示隊列的結點。this

static final class Node {
    //結點的狀態,signal等
    volatile int waitStatus;
    //結點的前驅結點
    volatile Node prev;
    //結點的後繼結點
    volatile Node next;
    //結點保存的線程
    volatile Thread thread;
    //表示當前結點的模式,獨佔仍是共享
    Node nextWaiter;
}

由此咱們能夠看出,所謂的FIFO隊列其實是一個雙向鏈表,結點中的主要數據就是狀態和線程。咱們會將全部獲取鎖可是沒有成功的線程保存到隊列中,這樣咱們就能夠在此基礎上實現其餘功能,例如公平鎖和非公平鎖等內容。線程

下面給出結點狀態的可能取值:

//因爲超時或者中斷,線程處於取消狀態
static final int CANCELLED =  1;
//線程處於阻塞狀態
static final int SIGNAL    = -1;
//線程再等待知足條件
static final int CONDITION = -2;
//線程狀態須要向後傳播
static final int PROPAGATE = -3;

結點的默認狀態爲0,這種狀態表示結點的狀態不屬於上面的任意一種。當結點狀態大於0時,意味着線程不須要被阻塞,所以,不少時候JDK在校驗的時候沒有校驗到具體值,而是一種範圍校驗。

既然實現了隊列,那麼天然須要維護隊列中的結點,後面咱們會介紹相關方法。

ReentrantLock加鎖方法的概述

首先咱們先來描述一下ReentrantLock使用AQS實現加鎖須要實現的功能。

一、 將全部須要加鎖的線程維護到隊列中

二、 將全部須要等待的線程阻塞

三、 將全部被取消的線程從隊列中刪除

四、 將全部被中斷的線程中斷

五、 將得到鎖的線程從隊列中刪除

AQS的使用

使用AQS來實現一個鎖,咱們須要在類的內部聲明一個非公共的類來繼承AQS類,而且實現AQS類中的部分方法。

通常來講,咱們須要本身重寫方法以下

//獨佔獲取鎖
tryAcquire(int)
//獨佔模式釋放鎖
tryRelease(int)
//共享模式獲取鎖
tryAcquireShared(int)
//共享模式釋放鎖
tryReleaseShared(int)

在AQS中,上面幾個方法體是直接拋出異常,因此若是咱們沒有本身重寫,那麼加鎖方法是沒法調用的。

查看ReentrantLock類的源碼,其內部有一個類Sync類。,繼承了AQS類,而且重寫了部分方法。因爲ReentrantLockReentrantLock實現了公平鎖和非公平鎖,因此,其類的內部又有兩個類NonfairSyncFairSync類繼承了Sync類。

加鎖代碼思路

上面的介紹應該讓你們已經對AQS的使用和基本功能有所瞭解了,接下來,咱們將跟隨ReentrantLock源碼的思路,一步步分析加鎖和解鎖的流程。

博主會把JDK中的源碼主要內容粘貼到文章中,好了,開始吧。

下面這段是ReentrantLock中的加鎖部分

public void lock() {
    sync.lock();
}

能夠看出加鎖部分實際是調用了Sync類的lock()方法。
當咱們查看lock()方法時發現,這個方法是一個抽象方法,實際上,在公平鎖和非公平鎖中,這個方法的實現是不一樣,所以,這個方法是由NonfairSyncFairSync類實現的,下面以非公平鎖中的實現爲例。

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

加鎖過程概覽見下圖

NonfairSync

加鎖過程當中,首先經過一個CAS操做,若是state的值爲0(即沒有線程得到了獨佔鎖),那麼將其設置爲1(表示已經有一個線程得到了獨佔鎖),若是設置成功,那麼加鎖就成功了,而後調用方法setExclusiveOwnerThread,將得到鎖的線程設置爲當前線程。

因爲是非公平鎖,因此不須要考慮線程的等待時間等因素。

同時須要注意,若是CAS操做失敗,也就是說已經有線程已經得到了獨佔鎖,這時咱們調用acquire函數獲取鎖,這裏應該是考慮到鎖重入等狀況。同時若是是第一個線程進入,就能夠直接得到鎖,效率較高。

下面給出acquire方法,這個方法其實是由AQS給出的實現。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這裏能夠看出acquire方法會調用咱們本身實現的tryAcquire方法來嘗試得到鎖,若是獲取鎖成功,那麼方法就直接結束了,若是獲取鎖失敗會繼續執行後面的方法。

tryAcquire方法

那麼,接下來查看非公平鎖中的tryAcquire方法。

注意,tryAcquire方法是由ReentrantLock類實現的

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

查看源碼,nonfairTryAcquire方法是在Sync類中給出的。

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //沒有線程得到過鎖
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /**
             * 已經有線程獲取了鎖,因爲是獨佔鎖,因此其餘線程不能夠
             *再次進入,可是有一種特殊狀況,就是當前線程或得到鎖的
             *線程是同一個,這時知足可重入
             */
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

這段代碼的總體流程以下圖:

tryAcquire

這段代碼的思路以下:

  • 若是state的值是0,那麼證實獨佔鎖沒有被佔有,咱們可嘗試CAS操做,若是CAS操做成功,那麼當前線程得到了鎖,返回爲真。
  • 若是state的值不爲0,那麼說明已經有線程得到了鎖,其餘線程就不該該能夠得到鎖,可是,這裏有一種狀況,就是重入,若是已經佔有鎖的線程和當前線程是同一線程,那麼就能夠得到鎖,而且須要將state的值加一,表示重入了一次,這裏還對state的值進行了校驗,以防溢出,這裏每一個線程的重入次數其實是有限的。

addWaiter以及acquireQueued方法

這裏讓咱們的目光會到acquire方法中,假如獲取鎖失敗,那麼接下來咱們須要作的就是維護FIFO鏈表,而且讓全部沒有得到鎖的線程進入等待隊列,直到得到鎖。

爲了方便,這裏將acquire方法再次給出

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

下面介紹方法addWaiteracquireQueued,這裏Node.EXCLUSIVE實際上就是null,這裏是用來區分節點是等待獨佔鎖仍是共享鎖的。

給出addWaiter的代碼,這部分代碼有AQS實現

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//新建節點
        Node pred = tail;
        //若是隊列有值那麼能夠直接將當前節點加入到隊列尾部,注意這裏若是多個結點同時進入是有可能發生CAS操做失敗的狀況,因此接下來就須要enq方法
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //調用enq初始化隊列並將結點加入隊列中
        enq(node);
        return node;
    }

這部分代碼的主要職責是維護鏈表的順序,生成一個節點,將節點維護到鏈表的尾部,若是鏈表爲空或者CAS設置tail失敗那麼就須要enq方法來初始化鏈表,並將當前節點維護到鏈表的尾部。

private Node enq(final Node node) {
        //死循環,節點會自旋,直到成功加入隊列,若是有併發進入這個方法,也不會有問題
        for (;;) {
            Node t = tail;
            if (t == null) { // 若是隊列爲空,須要初始化一個空的Node節點,並讓head和tail都指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//將節點放入隊列的尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

給出acquireQueued的代碼,這部分代碼由AQS實現

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //若是前驅結點爲頭結點,那麼只有當前結點在等待獲取鎖,那麼就能夠再次嘗試獲取鎖,若是獲取成功就能夠反悔了
                if (p == head && tryAcquire(arg)) {
                    //獲取鎖成功後釋放結點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷當前結點是否被中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這部分代碼實際也是一個死循環,當有一個結點進入後會一直自旋,知道成功得到鎖。若是在等待的過程當中,線程被中斷,則不會響應,可是會標記一下,當該結點的線程得到鎖後,再對線程進行中斷處理。

經過兩幅圖片介紹一下acquireQueued方法的兩種狀況

狀況一,隊列中只有頭結點和當前結點

acquireQueued1

狀況二,當前結點不是隊列中除頭結點外的惟一結點

狀況二

下面咱們詳細介紹兩個方法,自旋的兩個重要函數shouldParkAfterFailedAcquireparkAndCheckInterrupt

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //若是前驅結點的狀態爲SIGNAL,那麼當前結點能夠park即讓其等待
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {//前驅結點的狀態爲cancle
            //把全部狀態爲cancle的結點所有跳過,即從隊列中刪除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //將前驅結點的狀態設置爲SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        //上面函數返回值爲真,才能進入這個函數,表示當前結點能夠被park,而且再檢查線程是否被中斷,並清除中斷狀態,暫時不作響應
        LockSupport.park(this);
        return Thread.interrupted();
    }

目前進行到這裏加鎖還剩最後一步,即對線程中斷的處理,若是線程被中斷,而且得到了鎖,那麼會調用方法selfInterrupt來進行中斷處理。

static void selfInterrupt() {
        //再次調用線程中段方法
        Thread.currentThread().interrupt();
    }

釋放鎖代碼思路

通過上面的折磨,終於走到了釋放鎖部分的代碼,這部分代碼相對於加鎖部分要簡單不少。

public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        //嘗試釋放鎖,釋放成功那麼須要將其餘等待中的結點喚醒
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //若是調用釋放鎖的線程和佔有鎖的線程不是同一個,拋出異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //因爲鎖的可重入,只有當state的值爲0的時候才能真正釋放鎖
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //修改State的值
            setState(c);
            return free;
        }

釋放鎖部分的代碼實際上很簡單,因爲只有得到鎖得線程纔可以釋放鎖,所以,若是調用釋放鎖的線程和佔有鎖的線程不是同一個,則須要拋出異常。另一個須要注意的點就是,因爲鎖的可重入性,咱們使用了state的值來表示鎖的重入次數,所以,只有當state的值變爲0,咱們纔可以真正的釋放鎖,可是,不管是否釋放了鎖,咱們都須要將state的值修改。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
相關文章
相關標籤/搜索