併發Lock之ReentrantLock實現原理

咱們在以前介紹了併發編程的鎖機制:synchronized和lock,lock接口的重要實現類是可重入鎖ReentrantLock。而上一篇併發Lock之AQS(AbstractQueuedSynchronizer)詳解介紹了AQS,談到ReentrantLock,不得不談抽象類AbstractQueuedSynchronizer(AQS)。AQS定義了一套多線程訪問共享資源的同步器框架,ReentrantLock的實現依賴於該同步器。本文在介紹過AQS,結合其具體的實現類ReentrantLock分析實現原理。java

ReentrantLock類圖

ReentrantLock
ReentrantLock類圖

ReentrantLock實現了Lock接口,內部有三個內部類,Sync、NonfairSync、FairSync,Sync是一個抽象類型,它繼承AbstractQueuedSynchronizer,這個AbstractQueuedSynchronizer是一個模板類,它實現了許多和鎖相關的功能,並提供了鉤子方法供用戶實現,好比tryAcquire,tryRelease等。Sync實現了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync兩個類繼承自Sync,實現了lock方法,公平搶佔和非公平搶佔針對tryAcquire有不一樣的實現。本文重點介紹ReentrantLock默認的實現,即非公平鎖的獲取鎖和釋放鎖的實現。node

非公平鎖的lock方法

lock方法

  • 在初始化ReentrantLock的時候,若是咱們不傳參數,那麼默認使用非公平鎖,也就是NonfairSync。
public ReentrantLock() {   
  sync = new NonfairSync();  
} 
複製代碼
  • 當咱們調用ReentrantLock的lock方法的時候,其實是調用了NonfairSync的lock方法,這個方法先用CAS操做,去嘗試搶佔該鎖。若是成功,就把當前線程設置在這個鎖上,表示搶佔成功。若是失敗,則調用acquire模板方法,等待搶佔。代碼以下:
static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
複製代碼
  • 上面調用acquire(1)實際上使用的是AbstractQueuedSynchronizer的acquire方法,它是一套鎖搶佔的模板,整體原理是先去嘗試獲取鎖,若是沒有獲取成功,就在CLH隊列中增長一個當前線程的節點,表示等待搶佔。而後進入CLH隊列的搶佔模式,進入的時候也會去執行一次獲取鎖的操做,若是仍是獲取不到,就調用LockSupport.park將當前線程掛起。那麼當前線程何時會被喚醒呢?當持有鎖的那個線程調用unlock的時候,會將CLH隊列的頭節點的下一個節點上的線程喚醒,調用的是LockSupport.unpark方法。acquire代碼比較簡單,具體以下:
public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	    selfInterrupt();
}
複製代碼
  • acquire方法內部先使用tryAcquire這個鉤子方法去嘗試再次獲取鎖,這個方法在NonfairSync這個類中其實就是使用了nonfairTryAcquire,具體實現原理是先比較當前鎖的狀態是不是0,若是是0,則嘗試去原子搶佔這個鎖(設置狀態爲1,而後把當前線程設置成獨佔線程),若是當前鎖的狀態不是0,就去比較當前線程和佔用鎖的線程是否是一個線程,若是是,會去增長狀態變量的值,從這裏看出可重入鎖之因此可重入,就是同一個線程能夠反覆使用它佔用的鎖。若是以上兩種狀況都不經過,則返回失敗false。代碼以下:
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
複製代碼
  • tryAcquire一旦返回false,就會則進入acquireQueued流程,也就是基於CLH隊列的搶佔模式

首先,在CLH鎖隊列尾部增長一個等待節點,這個節點保存了當前線程,經過調用addWaiter實現,這裏須要考慮初始化的狀況,在第一個等待節點進入的時候,須要初始化一個頭節點而後把當前節點加入到尾部,後續則直接在尾部加入節點就好了。編程

private Node addWaiter(Node mode) {
	// 初始化一個節點,這個節點保存當前線程
    Node node = new Node(Thread.currentThread(), mode);
    // 當CLH隊列不爲空的視乎,直接在隊列尾部插入一個節點
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
	// 當CLH隊列爲空的時候,調用enq方法初始化隊列
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化節點,頭尾都指向一個空節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {// 考慮併發初始化
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼
  • 將節點增長到CLH隊列後,進入acquireQueued方法。

首先,外層是一個無限for循環,若是當前節點是頭節點的下個節點,而且經過tryAcquire獲取到了鎖,說明頭節點已經釋放了鎖,當前線程是被頭節點那個線程喚醒的,這時候就能夠將當前節點設置成頭節點,而且將failed標記設置成false,而後返回。至於上一個節點,它的next變量被設置爲null,在下次GC的時候會清理掉。安全

若是本次循環沒有獲取到鎖,就進入線程掛起階段,也就是shouldParkAfterFailedAcquire這個方法。微信

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼
  • 若是嘗試獲取鎖失敗,就會進入shouldParkAfterFailedAcquire方法,會判斷當前線程是否掛起,若是前一個節點已是SIGNAL狀態,則當前線程須要掛起。若是前一個節點是取消狀態,則須要將取消節點從隊列移除。若是前一個節點狀態是其餘狀態,則嘗試設置成SIGNAL狀態,並返回不須要掛起,從而進行第二次搶佔。完成上面的過後進入掛起階段。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //
        return true;
    if (ws > 0) {
        //
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
複製代碼
  • 當進入掛起階段,會進入parkAndCheckInterrupt方法,則會調用LockSupport.park(this)將當前線程掛起。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

複製代碼

非公平鎖的unlock方法

  • 調用unlock方法,實際上是直接調用AbstractQueuedSynchronizer的release操做。
public void unlock() {
	sync.release(1);
}
複製代碼
  • 進入release方法,內部先嚐試tryRelease操做,主要是去除鎖的獨佔線程,而後將狀態減一,這裏減一主要是考慮到可重入鎖可能自身會屢次佔用鎖,只有當狀態變成0,才表示徹底釋放了鎖。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼
  • 一旦tryRelease成功,則將CHL隊列的頭節點的狀態設置爲0,而後喚醒下一個非取消的節點線程。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
          free = true;
          setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
 }
複製代碼
  • 一旦下一個節點的線程被喚醒,被喚醒的線程就會進入acquireQueued代碼流程中,去獲取鎖。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) 
       LockSupport.unpark(s.thread);
}
複製代碼

tryLock

在ReetrantLock的tryLock(long timeout, TimeUnit unit)提供了超時獲取鎖的功能。它的語義是在指定的時間內若是獲取到鎖就返回true,獲取不到則返回false。這種機制避免了線程無限期的等待鎖釋放。多線程

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
   return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
複製代碼

具體看一下內部類裏面的方法tryAcquireNanos併發

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
   if (Thread.interrupted())
       throw new InterruptedException();
   return tryAcquire(arg) ||
       doAcquireNanos(arg, nanosTimeout);
}
複製代碼

若是線程被中斷了,那麼直接拋出InterruptedException。若是未中斷,先嚐試獲取鎖,獲取成功就直接返回,獲取失敗則進入doAcquireNanos。tryAcquire咱們已經看過,這裏重點看一下doAcquireNanos作了什麼。框架

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 起始時間
    long lastTime = System.nanoTime();
    // 線程入隊
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 又是自旋!
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            // 若是前驅是頭節點而且佔用鎖成功,則將當前節點變成頭結點
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 若是已經超時,返回false
            if (nanosTimeout <= 0)
                return false;
            // 超時時間未到,且須要掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 阻塞當前線程直到超時時間到期
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 更新nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                //相應中斷
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

doAcquireNanos的流程簡述爲:線程先入等待隊列,而後開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在隊列裏找一個安全點把本身掛起直到超時時間過時。這裏爲何還須要循環呢?由於當前線程節點的前驅狀態可能不是SIGNAL,那麼在當前這一輪循環中線程不會被掛起,而後更新超時時間,開始新一輪的嘗試。ui

總結

ReentrantLock是可重入的鎖,其內部使用的就是獨佔模式的AQS。公平鎖和非公平鎖不一樣之處在於,公平鎖在獲取鎖的時候,不會先去檢查state狀態,而是直接執行aqcuire(1)。公平鎖多了hasQueuePredecessors這個方法,這個方法用於判斷CHL隊列中是否有節點,對於公平鎖,若是CHL隊列有節點,則新進入競爭的線程必定要在CHL上排隊,而非公平鎖則是無視CHL隊列中的節點,直接進行競爭搶佔,這就有可能致使CHL隊列上的節點永遠獲取不到鎖,這就是非公平鎖之因此不公平的緣由,這裏再也不贅述。this

訂閱最新文章,歡迎關注個人公衆號

微信公衆號

參考

  1. Java中可重入鎖ReentrantLock原理剖析
  2. ReentrantLock實現原理
相關文章
相關標籤/搜索