一文帶你學會AQS和併發工具類的關係2

1.建立公平鎖

1.使用方式

Lock reentrantLock = new ReentrantLock(true);
reentrantLock.lock(); //加鎖
try{
  // todo
} finally{
  reentrantLock.unlock(); // 釋放鎖
}

2.建立公平鎖

      在new ReentrantLock(true)的時候加入關鍵字truejava

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

      當傳入的參數值爲true的時候建立的對象爲new FairSync()公平鎖。node

2.加鎖的實現

1.普通的獲取鎖

reentrantLock.lock(); //加鎖

      加鎖的實際調用的方法是建立的公平鎖裏面的lock方法
圖片併發

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
    ...
}

      代碼中的acquire方法和非公平鎖中的acquire方法同樣都是調用的AQS中的final方法ide

## AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

      不過不一樣之處是這裏面的tryAcquire(arg)方法是調用的公平鎖裏面實現的方法
圖片工具

      這個方法其實和非公平鎖方法特別類似,只有一處不一樣公平鎖中含有一個特殊的方法叫作hasQueuedPredecessors()該方法也是AQS中的方法,該方法的實質就是要判斷該節點的前驅節點是不是head節點ui

## AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

      剩下的部分和前一篇分析的非公平鎖幾乎是一個流程this

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 設置當前線程爲當前鎖的獨佔線程
            setExclusiveOwnerThread(current);
            // 獲取鎖成功
            return true;
        }
    }
    // 若是是當前線程持有的鎖信息,在原來的state的值上加上acquires的值
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 設置state的值
        setState(nextc);
        // 獲取鎖成功
        return true;
    }
    // 獲取鎖失敗了才返回false
    return false;
}

      注意一下只有當返回false的時候纔是tryAcquire失敗的時候。此時就會走到繁瑣的addWaiter(Node.EXCLUSIVE)方法線程

2.普通獲取鎖失敗

      若是前面tryAcquire失敗就會進行接下來的addWaiter(Node.EXCLUSIVE)3d

## AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
    // 建立一個新的node節點 mode 爲Node.EXCLUSIVE = null
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取尾部節點
    Node pred = tail;
    // 若是尾部節點不爲空的話將新加入的節點設置成尾節點並返回當前node節點
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若是尾部節點爲空整明當前隊列是空值得須要將當前節點入隊的時候先初始化隊列
    enq(node);
    return node;
}

3.節點入隊方法

      enq(node)方法是節點入隊的方法咱們來分析一下,enq入隊方法也是AQS中的方法,注意該方法的死循環,不管如何也要將該節點加入到隊列中。指針

## AbstractQueuedSynchronizer
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 若是尾節點爲空的話,那麼須要插入一個新的節點當頭節點
        if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
    // 若是不爲空的話,將當前節點變爲尾節點並返回當前節點的前驅節點
        node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

      其實和非公平鎖的addWaiter(Node node)是同樣的流程,分析完。

4.acquireQueued方法

      此時當前節點已經被加入到了阻塞隊列中了,進入到了acquireQueued方法。該方法也是AQS中的方法。

## AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
         // 獲取當前節點的前驅節點
            final Node p = node.predecessor();
            // 若是當前節點的前驅節點是頭節點的話會再一次執行tryAcquire方法獲
            // 取鎖
            if (p == head && tryAcquire(arg)) {
             // 設置當前節點爲頭節點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

      注意 setHead(node) 中具體的實現細節thread爲null,prev也爲null其實就是若是當前節點的前驅節點爲頭節點的話,那麼當前節點變成了頭節點也就是以前阻塞隊列的虛擬頭節點。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

      若是不是頭節點或者tryAcquire()方法執行失敗執行下面的更加繁瑣的方法shouldParkAfterFailedAcquire(p, node),若是該方法返回true纔會執行到下面的parkAndCheckInterrupt()方法,這兩個方法都是AQS中的方法。

## AbstractQueuedSynchronizer
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅節點的狀態
int ws = pred.waitStatus;
// 若是前驅節點的狀態爲SIGNAL那麼直接就能夠沉睡了,由於若是一個節點要是進入
    // 阻塞隊列的話,那麼他的前驅節點的waitStatus必須是SIGNAL狀態。
    if (ws == Node.SIGNAL)
return true;
// 若是前驅節點不是Node.SIGNAL狀態就往前遍歷一值尋找節點的waitStatus必須
    // 是SIGNAL狀態的節點
    if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
    // 若是沒有找到符合條件的節點,那麼就將當前節點的前驅節點的waitStatus
        // 設置成SIGNAL狀態
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

      若是返回的值是false,就要注意此時又繼續進入了下一次死循環中,由於若是往前遍歷的過程當中有可能他的前驅節點變成了頭節點,那麼就能夠再次的獲取鎖,若是不是的話那麼只能
執行parkAndCheckInterrupt()方法進行線程的掛起了。

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

5.取消請求

      不管如何最終都走到了cancelAcquire方法

private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
// 跳過全部取消請求的節點
    while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 將當前節點設置成取消狀態,爲了後續遍歷跳過咱們
    node.waitStatus = Node.CANCELLED;
// 若是當前節點是尾節點,而且將當前節點的前驅節點設置成尾節點成功
    if (node == tail && compareAndSetTail(node, pred)) {
    // 當前節點的前驅節點的後續節點爲空
    compareAndSetNext(pred, predNext, null);
} else {
    int ws;
// 若是前驅節點不是頭節點
        if (pred != head &&
        // 前驅節點的狀態是Node.SIGNAL或者前驅節點的waitStatus設置           
        // 成Node.SIGNAL
        ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 前驅節點的thread 不爲空
            pred.thread != null) {
// 獲取當前節點的後繼節點
            Node next = node.next;
// 若是後繼節點不爲空,而且後繼節點waitStatus 小於0
            if (next != null && next.waitStatus <= 0)
// 將當前節點的後繼節點設置成當前節點的前驅節點的後繼節點
            compareAndSetNext(pred, predNext, next);
} else {
// 若是上面當前節點的前驅節點是head或者其餘條件不知足那麼就喚醒當前節點
        unparkSuccessor(node);
}
node.next = node; // help GC
}
}

      unparkSuccessor(node)喚醒當前節點,該方法也是AbstractQueuedSynchronizer中的方法

private void unparkSuccessor(Node node) {
    // 獲取當前節點的狀態
    int ws = node.waitStatus;
    // 若是當前節點狀態小於0那麼設置成0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當前節點的後繼節點
    Node s = node.next;
    // 若是後繼節點爲空,或者後繼節點的狀態小於0
    if (s == null || s.waitStatus > 0) {
        // 後繼節點置爲null。視爲取消請求的節點
        s = null;
        // 獲取尾節點,而且尾節點不爲空,不是當前節點,那麼就往前遍歷尋找
        // 節點waitStatus 狀態小於0的節點賦予給當前節點的後繼節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
     // 喚醒後繼節點
        LockSupport.unpark(s.thread);
}

3.獲取鎖的流程圖

      流程圖和上一篇非公平鎖的獲取流程圖十分類似只有一點點區別這裏就不過多的描述了。

4.釋放鎖的實現

4.1釋放鎖代碼分析

      嘗試釋放此鎖。若是當前線程是此鎖的持有者,則保留計數將減小。 若是保持計數如今爲零,則釋放鎖定。 若是當前線程不是此鎖的持有者,則拋出IllegalMonitorStateException。

## ReentrantLock
public void unlock() {
    sync.release(1);
}

sync.release(1) 調用的是AbstractQueuedSynchronizer中的release方法

## AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

分析tryRelease(arg)方法
圖片

      tryRelease(arg)該方法調用的是ReentrantLock中

protected final boolean tryRelease(int releases) {
// 獲取當前鎖持有的線程數量和須要釋放的值進行相減
    int c = getState() - releases; 
    // 若是當前線程不是鎖佔有的線程拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若是此時c = 0就意味着state = 0,當前鎖沒有被任意線程佔有
    // 將當前所的佔有線程設置爲空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 設置state的值爲 0
    setState(c);
    return free;
}

      若是頭節點不爲空,而且waitStatus != 0,喚醒後續節點若是存在的話。
這裏的判斷條件爲何是h != null && h.waitStatus != 0?

由於h == null的話,Head還沒初始化。初始狀況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。因此說,這裏若是還沒來得及入隊,就會出現head == null 的狀況。

  1. h != null && waitStatus == 0 代表後繼節點對應的線程仍在運行中,不須要喚醒
  2. h != null && waitStatus < 0 代表後繼節點可能被阻塞了,須要喚醒
    private void unparkSuccessor(Node node) {
    // 獲取頭結點waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當前節點的下一個節點
    Node s = node.next;
    //若是下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就從尾部節點開始找往前遍歷,找到隊列中第一個waitStatus<0的節點。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 若是當前節點的下個節點不爲空,並且狀態<=0,就把當前節點喚醒
    if (s != null)
        LockSupport.unpark(s.thread);
    }

    爲何要從後往前找第一個非Cancelled的節點呢?
    看一下addWaiter方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

      咱們從這裏能夠看到,節點入隊並非原子操做,也就是說,node.prev = pred, compareAndSetTail(pred, node) 這兩個地方能夠看做Tail入隊的原子操做,可是此時pred.next = node;還沒執行,若是這個時候執行了unparkSuccessor方法,就沒辦法從前日後找了,因此須要從後往前找。還有一點緣由,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node
因此,若是是從前日後找,因爲極端狀況下入隊的非原子操做和CANCELLED節點產生過程當中斷開Next指針的操做,可能會致使沒法遍歷全部的節點。因此,喚醒對應的線程後,對應的線程就會繼續往下執行。

4.2 釋放鎖流程圖

圖片

5.注意

      下一篇講解併發工具包下的LockSupport,謝謝你們的關注和支持!有問題但願你們指出,共同進步!!!

相關文章
相關標籤/搜索