鎖源碼分析-AQS實現

1 目錄

2 Lock和Condition接口

最近打算將jdk的Lock系列詳細的分析一下,解決如下幾個問題:node

  • AQS詳細分析
  • 獨佔鎖、共享鎖、讀寫鎖
  • 是否可重入
  • 公平鎖和非公平鎖的區別,這裏的公平和非公平是針對哪些線程來講的?代碼又如何實現?

Lock和synchronized是對應的,Condition和Object的監控方法(wait、notify)是對應的安全

2.1 Lock接口

先來大體看下接口方法併發

public interface Lock {

    void lock();
	void lockInterruptibly() throws InterruptedException;

    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

lock:在獲取鎖以前一直阻塞,而且不接受線程中斷響應app

lockInterruptibly:在獲取鎖以前一直阻塞,可是接受線程中斷響應,即一旦該線程被中斷,會退出阻塞,拋出InterruptedException異常,該方法執行結束less

tryLock:嘗試獲取鎖,無論成不成功立馬返回,返回結果表明是否成功獲取了鎖ide

tryLock(long time, TimeUnit unit):在一段時間內不斷嘗試獲取鎖,若是超時還未獲取鎖則拋出InterruptedException異常,該方法執行結束ui

unlock:釋放鎖,要和上面的獲取鎖方法成對出現this

newCondition:使用該鎖獲取一個Condition,Condition是和生產它的鎖是息息相關,綁定在一塊兒的。線程

##2.2 Condition接口code

先來大體看下接口方法

public interface Condition {

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();
    void signalAll();
}

就await和signal方法,和Object的wait、notify是相對應的。

await:阻塞,一直等待到該對象的signal或者signalAll被調用,退出阻塞,或者該線程被中斷,拋出InterruptedException異常,方法執行結束

awaitUninterruptibly:和上面差很少,可是不響應線程的中斷,即線程中斷對它沒影響

await(long time, TimeUnit unit)和awaitUntil(Date deadline):等待一段時間,一旦超時或者線程被中斷,都會拋出InterruptedException異常,方法執行結束

signal:喚醒一個處於await的線程,這裏都是針對同一個Condition對象來講的

signalAll:喚醒全部處於await的線程,這裏都是針對同一個Condition對象來講的

3 AbstractQueuedSynchronizer實現分析

3.1 AbstractQueuedSynchronizer方法概述

咱們常常會據說AQS,全稱就是這個AbstractQueuedSynchronizer。它在鎖中扮演什麼角色呢?

Lock的接口方法是針對用戶的使用而定義的,咱們在實現Lock的時候,就須要作以下事情,重點關注下這些事情的共性和異性

  • 指明什麼狀況下才叫獲取到鎖:如獨佔鎖,一旦有人佔據了,就不能獲取到鎖。如共享鎖,有人佔據,可是沒超過限制也能獲取鎖。這一部分應該是鎖的實現的業務代碼,每種鎖都有本身的業務邏輯。這一部分其實就是AbstractQueuedSynchronizer對子類留出的tryAcquire方法

  • 獲取不到鎖的時候該如何處理呢:咱們固然但願它們可以繼續等待,有一個隊列就最好不過了,一旦獲取鎖失敗就加入到等待隊列中排隊,隊列中的等待者依次再去競爭獲取鎖。這一部分代碼其實就和哪一種鎖沒太大關係了,因此應該是鎖的共性部分,這一部分其實就是AbstractQueuedSynchronizer實現的共性部分acquireQueued

那麼針對Lock接口定義的lock方法實現邏輯就通常以下了:

if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

表明着先試着獲取一下鎖,若是獲取不成功,就把以後的處理邏輯交給AbstractQueuedSynchronizer來處理。反正你只管告訴AbstractQueuedSynchronizer它怎樣才叫獲取到鎖,其餘的等待處理邏輯你就能夠不用再關心了。

以上也僅僅是部份內容,下面來全面看下AbstractQueuedSynchronizer對外留出的接口和已實現的共性部分

對外留出的接口:

  • tryAcquire:該方法向AQS解釋了怎麼才叫獲取一把獨佔鎖

  • tryRelease:該方法向AQS解釋了怎麼才叫釋放一把獨佔鎖

  • tryAcquireShared:該方法向AQS解釋了怎麼才叫獲取一把共享鎖

  • tryReleaseShared:該方法向AQS解釋了怎麼才叫釋放一把共享鎖

這些內容就是各類鎖自己的業務邏輯,屬於異性部分。

來看看鎖的共性部分相關方法:

  • acquire:獲取一把獨佔鎖的過程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    就是先拿鎖實現的tryAcquire方法去嘗試獲取獨佔鎖,一旦獲取鎖失敗就進入隊列,交給AQS來處理,一旦成功就表示獲取到了一把鎖。

  • release:釋放一把獨佔鎖的過程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    就是先拿鎖實現的tryRelease方法嘗試釋放獨佔鎖,一旦釋放成功,就通知隊列,有人釋放鎖了,隊列前面的能夠再次去競爭鎖了(這一部分下面詳細說明)

  • acquireShared:獲取一把共享鎖的過程

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    就是先拿鎖實現的tryAcquireShared方法嘗試獲取共享鎖,一旦獲取失敗,就進入隊列,交給AQS來處理

  • releaseShared:釋放一把共享鎖的過程

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    就是先拿鎖實現的tryReleaseShared方法嘗試釋放共享鎖,一旦釋放成功,就通知隊列,有人釋放鎖了

一個Lock只要實現了AQS的留出的業務部分代碼,就可使用AQS提供的上述方法來實現鎖的相關功能,如一個簡單的獨佔鎖實現以下(略去其餘一些代碼):

public class Mutex implements Lock{

	@Override
	public void lock() {
		sync.acquire(1);
	}

	@Override
	public void unlock() {
		sync.release(1);
	}

	private final Sync sync=new Sync();

	private static class Sync extends AbstractQueuedSynchronizer{

		@Override
		protected boolean tryAcquire(int arg) {
			if(compareAndSetState(0,1)){
				setExclusiveOwnerThread(Thread.currentThread());
				return true;
			}
			return false;
		}

		@Override
		protected boolean tryRelease(int arg) {
			if(getState()==0){
				throw new IllegalMonitorStateException();
			}
			setExclusiveOwnerThread(null);
			setState(0);
			return true;
		}	
	}

}

3.2 AbstractQueuedSynchronizer的狀態屬性state

從上面咱們就看到總是方法中會有各類的int參數,其實這是AbstractQueuedSynchronizer將獲取鎖的過程量化對數字的操做,而state變量就是用於記錄當前數字值的。以獨佔鎖爲例:

  • state=0 表示該鎖未被其餘線程獲取

  • 一旦有線程想獲取鎖,就能夠對state進行CAS增量操做,這個增量能夠是任意值,不過大多數都默認取1。也就是說一旦一個線程對state CAS操做成功就表明該線程獲取到了鎖,則state就變成1了。其餘操做對state CAS操做失敗的就表明沒獲取到鎖,就自動進入AQS管理流程

  • 其餘線程發現當前state值不等於0表示鎖已被其餘線程獲取,就自動進入AQS管理流程

  • 一旦獲取鎖的線程想釋放鎖,就能夠對state進行自減,即減到0,其餘線程又能夠去獲取鎖了

從上面的例子中能夠看到對state的幾種線程安全和非安全操做:

  • compareAndSetState(int expect, int update):線程安全的設置狀態,由於可能多個線程併發調用該方法,因此須要CAS來保證線程安全

  • setState(int newState):非線程安全的設置狀態,這種通常是針對只有一個線程獲取鎖的時候來釋放鎖,此時沒有併發的可能性,因此就不須要上述的compareAndSetState操做

  • getState():獲取狀態值

AQS提供了上述線程安全和非安全的設置狀態state的方法,供咱們在實現鎖的tryAcquire、tryRelease等方法的時候合理的時候它們。

3.3 AbstractQueuedSynchronizer的FIFO隊列

就以獲取獨佔鎖爲例來詳細看下該過程:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

前面簡單提到了,就是先拿鎖實現的tryAcquire方法去嘗試獲取獨佔鎖,一旦獲取鎖失敗就進入隊列,交給AQS來處理。AQS的處理簡單描述下就是將當前線程包裝成Node節點而後放到隊列中進程排隊,等待前面的Node節點都出隊了,被喚醒輪到本身再次去競爭鎖。

咱們先來認識下Node節點,大體以下結構:

static final class Node {

    volatile Node prev;
    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

	volatile int waitStatus;
}

private transient volatile Node head;

private transient volatile Node tail;

首先就是prev、next節點能夠構成一個雙向隊列。AQS中含有上述的head和tail兩個屬性一塊兒來構成FIFO隊列。

Thread thread則表明的是構成此節點的線程。

Node nextWaiter:是用於condition queue,用於構成一個單向的FIFO隊列,詳見下面。

volatile int waitStatus:則表示該節點當前的狀態,目前有以下狀態:

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;

CANCELLED:表示該節點所對應的線程由於獲取鎖的過程當中超時或者被中斷而被設置成此狀態

SIGNAL:表示該節點所對應的線程被阻塞,再也不被調度執行,須要等待其餘線程釋放鎖以後來喚醒它,才能再次加入鎖的競爭

CONDITION:表示該節點所對應的線程被阻塞,再也不被調度執行,在等待某一個condition的signal、signalAll方法的喚醒

PROPAGATE:只用於共享狀態的HEAD節點,目前還沒弄清楚,歡迎一塊兒來探討

節點建立後默認狀態值是0。

接下來咱們就要分別看下這個獨佔鎖的入隊和出隊過程以及共享鎖的入隊和出隊過程

###3.3.1 獨佔鎖的獲取和釋放過程

先來看看獲取

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先調用鎖本身的tryAcquire業務邏輯來嘗試獲取鎖,一旦獲取失敗,則進入AQS的處理流程,即acquireQueued方法

第一步:就是構造一個Node併入隊

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

這裏先嚐試經過CAS將新添加的Node節點放置當前tail的後面,若是tail爲空或者CAS失敗,就進入下面的for循環形式的CAS流程,前面的嘗試主要是爲了針對大部分狀況即(pred!=null的狀況下)可以快速處理,而for循環則是要考慮全部狀況,由於判斷邏輯就比較多了。如剛初始化的時候,head和tail都指向了一個空的Node,該Node並不須要獲取鎖。

第二步:入隊後的處理邏輯,是被阻塞呢?仍是持續不斷嘗試獲取鎖呢?

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這個就要查看新構建的Node的前一個節點是否是head,若是是head,則該新節點能夠嘗試下獲取鎖,一旦獲取鎖成功就會設置head指向當前節點Node。

來重點說說這個head節點:剛初始化的時候head和tail都指向的是一個空的Node,head節點並無獲取到鎖,見上面。若是上述嘗試獲取鎖成功就重新設置head節點爲當前Node,此時head節點又是一個獲取了鎖的節點。

若是當前節點的前一個節點不是head或者是head可是嘗試獲取鎖失敗,此時就須要衡量下是否須要將當前節點阻塞,即shouldParkAfterFailedAcquire方法的邏輯:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

若是前置節點處於Node.SIGNAL狀態,則該Node則須要被阻塞,即還輪不到它。

若是前置節點狀態處於Node.CANCELLED狀態,則該Node須要從新更換前置節點

若是前置節點狀態處於其餘狀態,則須要把前置節點的狀態值設置成Node.SIGNAL,以便下一次循環嘗試獲取鎖失敗時,該節點被阻塞,即知足上述第一種狀況。從這裏看出,節點的狀態大部分是應用於後續節點行爲判斷使用的,而對自身的邏輯並沒什麼影響。

基本上不管如何,該節點沒有機會嘗試獲取鎖或者有機會嘗試獲取鎖可是又失敗時,最終都會被阻塞,則裏的阻塞使用的就是LockSupport.park:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

接下來看看釋放

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

先調用鎖的tryRelease具體業務邏輯,而後就會使用LockSupport.unpark喚醒head的下一個節點,至此在上述acquireQueued方法中被阻塞的那個Node(head的下一個Node)再也不阻塞,再次繼續for循環流程,又一次開始嘗試,若是獲取成功,則更新了head節點,則以前的head就表示出隊了,若是獲取還失敗,說明又被外部線程獲取到了,那就再一次的被park阻塞。

共享鎖的獲取和釋放過程就再也不說了,大部分都是相同的。

3.4 AbstractQueuedSynchronizer的condition queue

來看下AQS提供的Condition實現,簡單以下

public class ConditionObject implements Condition{
    private transient Node firstWaiter;
    private transient Node lastWaiter;
}

一個ConditionObject對象自己維護了一個FIFO單向隊列,這裏經過Node的Node nextWaiter屬性來創建關聯。

咱們來簡單看看Condition的await和signal:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

這裏再也不一點一旦詳細研究了,內容太多了,簡單看下總體的邏輯:

前提:在調用Condition的await和signal的方法前必須先獲取到鎖

第一步:首先構造一個Node,初始化狀態爲Node.CONDITION,並放到對應ConditionObject的lastWaiter上

第二步:釋放鎖

第三步:while循環裏檢查該Node是否在同步隊列即上述的FIFO雙向隊列,不在的話,則被park住,等待unpark的喚醒

第四步:收到了unpark喚醒(在喚醒的時候就順便將該Node加入了上述的FIFO雙向隊列中了,所以能夠跳出while循環了),跳出了while循環

第五步:該Node已經進入了上述的FIFO雙向隊列中了,開始了上面介紹的邏輯

再來看看喚醒操做:

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

就是unpark頭節點Node,而且將該Node放入上述的FIFO雙向隊列中。

#4 結束語

下一篇就來詳細的介紹幾個具體的基於AQS的鎖的實現。

相關文章
相關標籤/搜索