AbstractQueuedSynchronizer

AQS 是用來構建鎖和同步工具的基本框架。本文主要基於 AQS 做者 Doug Lea 的論文 The java.util.concurrent Synchronizer Framework 和 JDK 1.8 的文檔。java

這篇文章也同時發佈在個人博客中。node

不過英文好的話,仍是直接看論文吧。編程

設計要求

同步(維持變量在各個線程間狀態的一致性)至少須要兩種操做:設計模式

  • acquire:阻塞線程直到*同步狀態(synchronization state)*容許線程運行
  • release:改變同步狀態,而且 unblock 一個或多個阻塞的線程

同時支持兩種模式:bash

  • exclusive mode:一次只容許一個線程改變同步狀態
  • shared mode:多個線程同時改變同步狀態可能成功;一次同時喚醒多個線程

同時框架須要一些高級功能:併發

  • 非阻塞和阻塞地改變同步狀態(如tryLocklock
  • 超時功能,超時即放棄嘗試
  • 響應線程中斷

實現

同步器的基本思想很直接簡潔,用僞代碼表示以下:框架

acquireide

while (syncronization state does not allow acquire) {
  enqueue current thread if not already queued;
  possiblly block current thread;
}
複製代碼

release工具

update synchronization state;
if (state may premit a blocked thread acuire) {
  unblock one or more queued thread;
}
複製代碼

要實現這兩個操做,須要三個基本模塊的配合:優化

  • 以原子操做管理同步狀態
  • block 和 unblock 線程
  • 維護 FIFO 隊列

Synchronization state

AQS 使用一個 32 位整數(int)來表明共享資源,也就是同步狀態。

該整數能夠表現任何狀態。好比, Semaphore 用它來表現剩餘的許可數,ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀態 (還沒有開始、運行、完成和取消)

Blocking

AQS 使用 JUC 包下LockSupport中的pack()unpack()方法來阻塞和喚醒進程。最終會調用Unsafe.park()Unsafe.unpack()兩個 native 方法,最終的阻塞線程和喚醒線程具體實現仍是由操做系統來實現的。

Queue

AQS 維護一個 FIFO 的隊列,來管理阻塞的線程,能夠實現公平性(也能夠不公平),也就是同時支持公平鎖和非公平鎖兩種模式。內部使用 CLH Lock,可是作了不少優化,好比CLH 鎖不是自旋的而是阻塞的。

AQS 中的 CLH lock 和原汁原味的 CLH lock 相比,主要有兩點不一樣:

  • 不使用自旋鎖而是阻塞鎖,調用pack()unpack()實現。

  • 節點有顯式的後繼節點next,原來的 CLH lock 不須要顯式的鏈表由於當前一個節點爲釋放鎖時,後一個節點在一直輪詢,因此它可以拿到鎖。而 AQS 的鎖是阻塞的,須要調用unpack(Thread)來喚醒請求鎖的線程,因此須要知道它的後繼節點。

AQS 同時還設置了一個signal bit來避免沒必要要的pack()unpack()調用。在調用pack()以前,首先設置signal bit爲 true,而後再次檢查節點狀態,若是還不能拿到鎖,就調用pack()阻塞線程。

這樣就能夠用更加詳細的僞代碼來描述acquirerelease,這裏只考慮exclusive mode、不可中斷的、沒有超時功能的狀況:

acquire:

if (!tryAcquire(arg)) {
  node = create and enqueue new node;
  pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set)
    	park();
    else 
    	compareAndSet pred's signal bit to ture pred = node's effective predecessor;
  }
  head = node;
}
複製代碼

release:

if (tryRelease(arg) && head node's signal bit is set) { compareAndSet head's signal bit to false;
  unpack head's successor, if one exists }s 複製代碼

使用

實現一個同步器須要實現下面的方法:

tryAcquire()
tryRelease()
tryAcquireShared()
tryReleaseShared()
isHeldExclusively()
複製代碼

以上方法不須要所有實現,根據獲取的鎖的種類能夠選擇實現不一樣的方法.

  • 支持獨佔 (排他) 獲取鎖的同步器應該實現tryAcquiretryReleaseisHeldExclusively
  • 支持共享獲取的同步器應該實現tryAcquireSharedtryReleaseSharedisHeldExclusively
  • 固然也能夠同時支持 exclusive 模式和 shared 模式,好比ReentrantReadWriteLock

實現一個同步器最好的設計模式是把功能委託給一個AQS的私有內部子類,而不是直接繼承 AQS 來實現(這樣會破壞同步器的簡潔性,調用者可能會調用 AQS 的其餘方法破壞同步狀態)。

例子

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/** * @author leer * Created at 4/25/19 6:24 PM * 一個不可重入的互斥鎖 */
public class Mutex {
  static final class Sync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int ignore) {
      return compareAndSetState(0, 1);
    }

    @Override
    protected boolean tryRelease(int ignore) {
      setState(0);
      return true;
    }
  }
  private final Sync sync = new Sync();
  
  public void lock() {
    sync.acquire(0);
  }
  
  public void unlock() {
    sync.release(0);
  }
}
複製代碼

AQS 在 Synchronizers 中的具體實現

ReentrantLock

  • ReentrantLock 是可重入的:因此須要記錄當前線程獲取原子狀態的次數,若是次數爲零,那麼就說明這個線程放棄了鎖(也有可能其餘線程佔據着鎖從而須要等待),若是次數大於 1,也就是得到了重進入的效果,而其餘線程只能被 park 住,直到這個線程重進入鎖次數變成 0 而釋放原子狀態
  • ReentrantLock 有公平鎖和非公平鎖兩種模式:對應的, ReentrantLock 內部有兩個 AQS 的子類。(the fair one disabling barging)

非公平鎖的tryAcquire實現:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
複製代碼

ReentrantReadWriteLock

ReentrantReadWriteLock 使用 同步狀態的 16 位來存放讀鎖計數,另外的 16 位存放寫鎖計數。

  • 寫鎖和ReentrantLock相似
  • 讀鎖使用 shared 模式的 AQS來支持多個讀者同時讀

Semaphore

Semaphore使用同步狀態來保存當前可用許可數量。它重寫tryAcquireShared來減小計數來模擬獲取資源,若是計數小於 0 則會阻塞線程;重寫tryReleaseShared來模擬釋放資源。同時它也有公平模式和非公平模式。

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
複製代碼

CountDownLatch

Semaphore相似,同步狀態保存當前的計數值。countDown()調用releaseShared()await()方法調用acquireShared(),等待計數器到零。

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
複製代碼

FutureTask

Only when JDK version < 1.7

FutureTask使用同步狀態保存Future任務的狀態(initial、running、cancelled、done)。

設置和取消一個任務將調用release(),調用Future.get()等待結果將會調用acquire()

參考

相關文章
相關標籤/搜索