深刻淺出 Java Concurrency (8): 加鎖的原理 (Lock.lock)

接上篇,這篇從Lock.lock/unlock開始。特別說明在沒有特殊狀況下全部程序、API、文檔都是基於JDK 6.0的。

public void java.util.concurrent.locks.ReentrantLock.lock()html

獲取鎖。java

若是該鎖沒有被另外一個線程保持,則獲取該鎖並當即返回,將鎖的保持計數設置爲 1。node

若是當前線程已經保持該鎖,則將保持計數加 1,而且該方法當即返回。算法

若是該鎖被另外一個線程保持,則出於線程調度的目的,禁用當前線程,而且在得到鎖以前,該線程將一直處於休眠狀態,此時鎖保持計數被設置爲 1。併發

從上面的文檔能夠看出ReentrantLock是可重入鎖的實現。高併發

而內部是委託java.util.concurrent.locks.ReentrantLock.Sync.lock()實現的。post

java.util.concurrent.locks.ReentrantLock.Sync是抽象類,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync兩個實現,也就是常說的公平鎖和不公平鎖。性能

公平鎖和非公平鎖ui

若是獲取一個鎖是按照請求的順序獲得的,那麼就是公平鎖,不然就是非公平鎖。spa

在沒有深刻了解內部機制及實現以前,先了解下爲何會存在公平鎖和非公平鎖

公平鎖保證一個阻塞的線程最終可以得到鎖,由於是有序的,因此老是能夠按照請求的順序得到鎖

不公平鎖意味着後請求鎖的線程可能在其前面排列的休眠線程恢復前拿到鎖,這樣就有可能提升併發的性能。這是由於一般狀況下掛起的線程從新開始與它真正開始運行,兩者之間會產生嚴重的延時。所以非公平鎖就能夠利用這段時間完成操做。

這是非公平鎖在某些時候比公平鎖性能要好的緣由之一。

兩者在實現上的區別會在後面介紹,咱們先從公平鎖(FairSync)開始。

前面說過java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基礎,對於一個FairSync而言,lock()就直接調用AQS的acquire(int arg);

public final void acquire(int arg) 以獨佔模式獲取對象,忽略中斷。經過至少調用一次 tryAcquire(int) 來實現此方法,並在成功時返回。不然在成功以前,一直調用 tryAcquire(int) 將線程加入隊列,線程可能重複被阻塞或不被阻塞。

在介紹實現以前先要補充上一節的知識,對於一個AQS的實現而言,一般狀況下須要實現如下方法來描述如何鎖定線程。

  • tryAcquire(int) 試圖在獨佔模式下獲取對象狀態。此方法應該查詢是否容許它在獨佔模式下獲取對象狀態,若是容許,則獲取它。

    此方法老是由執行 acquire 的線程來調用。若是此方法報告失敗,則 acquire 方法能夠將線程加入隊列(若是尚未將它加入隊列),直到得到其餘某個線程釋放了該線程的信號。也就是說此方法是一種嘗試性方法,若是成功獲取鎖那最好,若是沒有成功也沒有關係,直接返回false。

  • tryRelease(int) 試圖設置狀態來反映獨佔模式下的一個釋放。 此方法老是由正在執行釋放的線程調用。釋放鎖可能失敗或者拋出異常,這個在後面會具體分析。
  • tryAcquireShared(int) 試圖在共享模式下獲取對象狀態。
  • tryReleaseShared(int) 試圖設置狀態來反映共享模式下的一個釋放。
  • isHeldExclusively() 若是對於當前(正調用的)線程,同步是以獨佔方式進行的,則返回 true

除了tryAcquire(int)外,其它方法會在後面具體介紹。首先對於ReentrantLock而言,不論是公平鎖仍是非公平鎖,都是獨佔鎖,也就是說同時可以有一個線程持有鎖。所以對於acquire(int arg)而言,arg==1。在AQS中acquire的實現以下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這個看起來比較複雜,咱們分解如下4個步驟。

  1. 若是tryAcquire(arg)成功,那就沒有問題,已經拿到鎖,整個lock()過程就結束了。若是失敗進行操做2。
  2. 建立一個獨佔節點(Node)而且此節點加入CHL隊列末尾。進行操做3。
  3. 自旋嘗試獲取鎖,失敗根據前一個節點來決定是否掛起(park()),直到成功獲取到鎖。進行操做4。
  4. 若是當前線程已經中斷過,那麼就中斷當前線程(清除中斷位)。

這是一個比較複雜的過程,咱們循序漸進一個一個分析。

tryAcquire(acquires)

對於公平鎖而言,它的實現方式以下:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (isFirst(current) &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

在這段代碼中,前面說明對於AQS存在一個state來描述當前有多少線程持有鎖。因爲AQS支持共享鎖(例如讀寫鎖,後面會繼續講),因此這裏state>=0,可是因爲ReentrantLock是獨佔鎖,因此這裏不妨理解爲0<=state,acquires=1。isFirst(current)是一個很複雜的邏輯,包括踢出無用的節點等複雜過程,這裏暫且不提,大致上的意思是說判斷AQS是否爲空或者當前線程是否在隊列頭(爲了區分公平與非公平鎖)。

  1. 若是當前鎖有其它線程持有,c!=0,進行操做2。不然,若是當前線程在AQS隊列頭部,則嘗試將AQS狀態state設爲acquires(等於1),成功後將AQS獨佔線程設爲當前線程返回true,不然進行2。這裏能夠看到compareAndSetState就是使用了CAS操做。
  2. 判斷當前線程與AQS的獨佔線程是否相同,若是相同,那麼就將當前狀態位加1(這裏+1後結果爲負數後面會講,這裏暫且不理它),修改狀態位,返回true,不然進行3。這裏之因此不是將當前狀態位設置爲1,而是修改成舊值+1呢?這是由於ReentrantLock是可重入鎖,同一個線程每持有一次就+1。
  3. 返回false。

比較非公平鎖的tryAcquire實現java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平鎖多了一個判斷當前節點是否在隊列頭,這個就保證了是否按照請求鎖的順序來決定獲取鎖的順序(同一個線程的屢次獲取鎖除外)。

如今再回頭看公平鎖和非公平鎖的lock()方法。公平鎖只有一句acquire(1);而非公平鎖的調用以下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

很顯然,非公平鎖在第一次獲取鎖,或者其它線程釋放鎖後(可能等待),優先採用compareAndSetState(0,1)而後設置AQS獨佔線程而持有鎖,這樣有時候比acquire(1)順序檢查鎖持有而要高效。即便在重入鎖上,也就是compareAndSetState(0,1)失敗,可是是當前線程持有鎖上,非公平鎖也沒有問題。

addWaiter(mode)

tryAcquire失敗就意味着入隊列了。此時AQS的隊列中節點Node就開始發揮做用了。通常狀況下AQS支持獨佔鎖和共享鎖,而獨佔鎖在Node中就意味着條件(Condition)隊列爲空(上一篇中介紹過相關概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有兩個常量,

static final Node EXCLUSIVE = null; //獨佔節點模式

static final Node SHARED = new Node(); //共享節點模式

addWaiter(mode)中的mode就是節點模式,也就是共享鎖仍是獨佔鎖模式。

前面一再強調ReentrantLock是獨佔鎖模式。

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
     return node;
}

上面是節點如隊列的一部分。當前僅當隊列不爲空而且將新節點插入尾部成功後直接返回新節點。不然進入enq(Node)進行操做。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            Node h = new Node(); // Dummy header
            h.next = node;
            node.prev = h;
            if (compareAndSetHead(h)) {
                tail = node;
                return h;
            }
        }
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq(Node)去隊列操做實現了CHL隊列的算法,若是爲空就建立頭結點,而後同時比較節點尾部是不是改變來決定CAS操做是否成功,當且僅當成功後纔將爲不節點的下一個節點指向爲新節點。能夠看到這裏仍然是CAS操做。

acquireQueued(node,arg)

自旋請求鎖,若是可能的話掛起線程,直到獲得鎖,返回當前線程是否中斷過(若是park()過而且中斷過的話有一個interrupted中斷位)。

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (RuntimeException ex) {
        cancelAcquire(node);
        throw ex;
    }
}

下面的分析就須要用到上節節點的狀態描述了。acquireQueued過程是這樣的:

  1. 若是當前節點是AQS隊列的頭結點(若是第一個節點是DUMP節點也就是傀儡節點,那麼第二個節點實際上就是頭結點了),就嘗試在此獲取鎖tryAcquire(arg)。若是成功就將頭結點設置爲當前節點(無論第一個結點是不是DUMP節點),返回中斷位。不然進行2。
  2. 檢測當前節點是否應該park(),若是應該park()就掛起當前線程而且返回當前線程中斷位。進行操做1。

一個節點是否該park()是關鍵,這是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)實現的。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int s = pred.waitStatus;
    if (s < 0) return true;
    if (s > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
    return false;
}

  1. 若是前一個節點的等待狀態waitStatus<0,也就是前面的節點尚未得到到鎖,那麼返回true,表示當前節點(線程)就應該park()了。不然進行2。
  2. 若是前一個節點的等待狀態waitStatus>0,也就是前一個節點被CANCELLED了,那麼就將前一個節點去掉,遞歸此操做直到全部前一個節點的waitStatus<=0,進行4。不然進行3。
  3. 前一個節點等待狀態waitStatus=0,修改前一個節點狀態位爲SINGAL,表示後面有節點等待你處理,須要根據它的等待狀態來決定是否該park()。進行4。
  4. 返回false,表示線程不該該park()。

selfInterrupt()

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

若是線程曾經中斷過(或者阻塞過)(好比手動interrupt()或者超時等等,那麼就再中斷一次,中斷兩次的意思就是清除中斷位)。

大致上整個Lock.lock()就這樣一個流程。除了lock()方法外,還有lockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下來的章節中會一一介紹。

相關文章
相關標籤/搜索