前段時間在面試,發現面試官都有問到同步器AQS的相關問題。AQS爲Java中幾乎全部的鎖和同步器提供一個基礎框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基於AQS原理的幾個核心點,談談對AbstractQueuedSynchronizer的理解,並實現一個自定義同步器。java
AQS全稱是AbstractQueuedSynchronizer,即抽象同步隊列。下面看一下AQS的類圖結構: node
爲了方便下面幾個關鍵點的理解,你們先熟悉一下AQS的類圖結構。面試
在AQS中維持了一個單一的共享狀態state,來實現同步器同步。看一下state的相關代碼以下:
複製代碼
/** * The synchronization state. */
private volatile int state;
/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */
protected final int getState() {
return state;
}
/** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */
protected final void setState(int newState) {
state = newState;
}
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
談到CLH隊列,咱們結合以上state狀態,先來看一下AQS原理圖:算法
CLH(Craig, Landin, and Hagersten locks) 同步隊列 是一個FIFO雙向隊列,其內部經過節點head和tail記錄隊首和隊尾元素,隊列元素的類型爲Node。AQS依賴它來完成同步狀態state的管理,當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。編程
CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),condition隊列的後續節點(nextWaiter)以下圖:設計模式
waitStatus幾種狀態狀態:安全
咱們再看一下CLH隊列入列以及出列的代碼:bash
CLH隊列入列就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。addWaiter方法以下:多線程
//構造Node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節點)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS設置尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//屢次嘗試
enq(node);
return node;
}
複製代碼
由以上代碼可得,addWaiter設置尾節點失敗的話,調用enq(Node node)方法設置尾節點,enq方法以下:併發
private Node enq(final Node node) {
//死循環嘗試,知道成功爲止
for (;;) {
Node t = tail;
//tail 不存在,設置爲首節點
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next),然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點。能夠看一下如下兩段源碼:
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
複製代碼
private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
CAS算法,能夠看一下我工做實戰中仿造CAS算法解決併發問題的實現 juejin.im/post/5d0616…
咱們都知道,synchronized控制同步的時候,能夠配合Object的wait()、notify(),notifyAll() 系列方法能夠實現等待/通知模式。而Lock呢?它提供了條件Condition接口,配合await(),signal(),signalAll() 等方法也能夠實現等待/通知機制。ConditionObject實現了Condition接口,給AQS提供條件變量的支持 。
咱們先來看一下圖:
ConditionObject隊列與CLH隊列的愛恨情仇:
區別:
AQS支持兩種同步模式:獨佔式和共享式。
同一時刻僅有一個線程持有同步狀態,如ReentrantLock。又可分爲公平鎖和非公平鎖。
公平鎖: 按照線程在隊列中的排隊順序,有禮貌的,先到者先拿到鎖。
非公平鎖: 當線程要獲取鎖時,無視隊列順序直接去搶鎖,不講道理的,誰搶到就是誰的。
acquire(int arg)是獨佔式獲取同步狀態的方法,咱們來看一下源碼:
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
//構造Node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節點)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS設置尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//屢次嘗試
enq(node);
return node;
}
複製代碼
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
複製代碼
結合源代碼,可得acquire(int arg)方法流程圖,以下:
多個線程可同時執行,如Semaphore/CountDownLatch等都是共享式的產物。
acquireShared(long arg)是共享式獲取同步狀態的方法,能夠看一下源碼:
public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製代碼
由上可得,先調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,若是獲取失敗,調用doAcquireShared(int arg)自旋方式獲取同步狀態,方法源碼以下:
private void doAcquireShared(long arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
模板方法模式: 在一個方法中定義一個算法的骨架,而將一些步驟延遲到子類中。模板方法使得子類能夠在不改變算法結構的狀況下,從新定義算法中的某些步驟。
模板方法模式生活中的例子: 假設咱們要去北京旅遊,那麼咱們能夠坐高鐵或者飛機,或者火車,那麼定義交通方式的抽象類,能夠有如下模板:買票->安檢->乘坐xx交通工具->到達北京。讓子類繼承該抽象類,實現對應的模板方法。
AQS定義的一些模板方法以下:isHeldExclusively()//該線程是否正在獨佔資源。只有用到condition才須要去實現它。
tryAcquire(int)//獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int)//獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。 tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
簡言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,給子類實現自定義的同步器。
基於以上分析,咱們都知道state,CLH隊列,ConditionObject隊列 等這些關鍵點,你要實現自定義鎖的話,首先須要肯定你要實現的是獨佔鎖仍是共享鎖,定義原子變量state的含義,再定義一個內部類去繼承AQS,重寫對應的模板方法。
咱們來看一下基於 AQS 實現的不可重入的獨佔鎖的demo,來自《Java併發編程之美》:
public class NonReentrantLock implements Lock,Serializable{
//內部類,自定義同步器
static class Sync extends AbstractQueuedSynchronizer {
//是否鎖已經被持有
public boolean isHeldExclusively() {
return getState() == 1;
}
//若是state爲0 則嘗試獲取鎖
public boolean tryAcquire(int arg) {
assert arg== 1 ;
//CAS設置狀態,能保證操做的原子性,當前爲狀態爲0,操做成功狀態改成1
if(compareAndSetState(0, 1)){
//設置當前獨佔的線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//嘗試釋放鎖,設置state爲0
public boolean tryRelease(int arg) {
assert arg ==1;
//若是同步器同步器狀態等於0,則拋出監視器非法狀態異常
if(getState() == 0)
throw new IllegalMonitorStateException();
//設置獨佔鎖的線程爲null
setExclusiveOwnerThread(null);
//設置同步狀態爲0
setState(0);
return true;
}
//返回Condition,每一個Condition都包含了一個Condition隊列
Condition newCondition(){
return new ConditionObject();
}
}
//建立一個Sync來作具體的工做
private final Sync sync= new Sync ();
@Override
public void lock() {
sync.acquire(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
複製代碼
NonReentrantLockDemoTest:
public class NonReentrantLockDemoTest {
private static NonReentrantLock nonReentrantLock = new NonReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
nonReentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
nonReentrantLock.unlock();
}
});
thread.start();
}
}
}
複製代碼
運行結果:
AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下來能夠看一下它們的使用案例。
使用ReentrantLock來實現個簡單線程安全的list,以下:
public class ReentrantLockList {
// 線程不安全的list
private ArrayList<String> array = new ArrayList<>();
//獨佔鎖
private volatile ReentrantLock lock = new ReentrantLock();
//添加元素
public void add(String e){
lock.lock();
try {
array.add(e);
}finally {
lock.unlock();
}
}
//刪除元素
public void remove(String e){
lock.lock();
try {
array.remove(e);
}finally {
lock.unlock();
}
}
//獲取元素
public String get(int index){
lock.lock();
try {
return array.get(index);
}finally {
lock.unlock();
}
}
}
複製代碼
Java多線程有一到比較經典的面試題:ABC三個線程順序輸出,循環10遍。
public class ABCSemaphore {
private static Semaphore A = new Semaphore(1);
private static Semaphore B = new Semaphore(1);
private static Semaphore C = new Semaphore(1);
static class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
A.acquire();
System.out.print("A");
B.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
B.acquire();
System.out.print("B");
C.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
C.acquire();
System.out.print("C");
A.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
// 開始只有A能夠獲取, BC都不能夠獲取, 保證了A最早執行
B.acquire();
C.acquire();
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
複製代碼
歡迎你們關注,你們一塊兒學習,一塊兒討論。