JAVA多線程之AQS的實現重入鎖ReentrantLock

在JAVA體系當中鎖是實現多線程中同步的一個重要機制, JAVA的鎖有synchronized這個是JVM層面 實現的鎖, JAVA實現的鎖同步有: Lock、ReadWriteLock、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,包括上一篇中講到的ThreadPoolExecutor中Worker執行用到鎖 也是來自於本文所講的AQS實現,因此要理解JAVA的鎖,除了JVM裏實現的外,只要搞懂AQS的就能夠了, 話很少說,趕忙開始吧java

AQS的數據結構

image.png

AbstractQueuedSynchronizer的類以及重要成員變量

下面代碼中能夠看出AQS的類繼承關係比較簡單,就是繼承AbstractOwnableSynchronizer類 並實現了Serializable接口,node

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
複製代碼

AbstractOwnableSynchronizer類功能比較簡單,主要是設置鎖的排他線程,設置排它線程表示是排它鎖,沒有設置表示共享鎖。markdown

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
複製代碼

下面是AQS中比較重要的是三個成員變量, head是等待隊列中頭節點, tail是等待隊列中尾結點,state是同步的狀態值。數據結構

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

/**
 * The synchronization state.
 */
private volatile int state;
複製代碼

加下來看下NOde類的數據結構的定義:
等待隊列中waitStatus(等待狀態)的的枚舉:
1.CANCELLED(1) 這個節點是因爲超時或者中斷。
2.SIGNAL(-1) 這個節點的後繼是被阻塞的,因此當前Node取消或者釋放鎖時,須要unpark去喚醒後繼節點。
3.CONDITION(-2) 這個節點是在條件等待隊列。
4.PROPAGATE(-3) 就是對當前的節點操做傳播到其餘節點, 好比doReleaseShared共享模式的釋放鎖就是傳播後續節點。 5.0 不是上面的任何一種狀態多線程

static final class Node {

    /**標誌它是共享模式 */
    static final Node SHARED = new Node();
    /** 標誌他是排它模式 */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    ```
    volatile int waitStatus;
     /**
     * 鏈接當前節點的前一個Node,
     */
    volatile Node prev;
     /**
     * 鏈接當前節點的下一個一個Node,
     */
    volatile Node next;
    
    volatile Thread thread;

    /**
     * 指向條件等待隊列的下一個Node或者是SHARED的Node,由於條件隊列
     * 只有排它模式下才能訪問, 這裏只須要一個簡單的單向鏈表去保存等待條件
     * 變量的node,由於條件變量只能是排它鎖,因此用這個nextWaiter也能夠
     * SHARED變量去表示當前是共享鎖
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     * 判斷是否共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /**
    * 返回等待隊列的前一個node
    */
   final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
複製代碼

加鎖流程

首先看下ReentrantLock的獲取鎖的過程,它的構造函數初始化一個NonfairSync,也就是說 這個重入鎖是非公平鎖,app

public ReentrantLock() {
    sync = new NonfairSync();
}
複製代碼

從這裏看到ReentrantLoc默認建立的非公平鎖,調用lock會直接進行CAS嘗試搶鎖(這裏state=0表明無所,1表明有一個線程已經搶到了鎖),若是沒有搶到鎖,則正常調用AQS的acquire方法默認參數1去獲取鎖,函數

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            //設置排他線程爲當前線程
            setExclusiveOwnerThread(Thread.currentThread());
        else
          // 搶鎖
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
複製代碼

那麼接下來重點看下AQS的acquire方法獲取鎖的邏輯 1 tryAcquire嘗試獲取鎖, 2. 若是獲取失敗,則加入一個排他的Node到等待隊列ui

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
         //獲取隊列
        acquireQueued(
        //加入等待隊列
        addWaiter(Node.EXCLUSIVE), arg))
        //中斷當前線程
        selfInterrupt();
}
複製代碼

非公平鎖,一開始就會嘗試搶鎖,而無論以前等待隊列中是否有等待的搶鎖的線程,this

  1. 此時state值爲0,則直接經過CAS將其值修改爲acquires(ReentrentLock是1),並設置排他線程爲它本身,這裏說明ReentrantLock的加的鎖是排它鎖。
  2. 當排它線程就是當前線程,則表示當前線程已經獲取鎖,這個時候表明是偏向鎖,增長acquires值,並賦值給state變量 ,表示同一線程同時同一把鎖的次數,
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //直接嘗試獲取鎖
        if (compareAndSetState(0, acquires)) {
           //設置排它鎖爲當前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //當前線程已經獲取鎖,偏向鎖的處理
    else if (current == getExclusiveOwnerThread()) {
        // 以前的state值,加上申請的值
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
            //賦值當前state
            setState(nextc);
        return true;
    }
    return false;
}
複製代碼

上面的tryAcquire這一步嘗試搶鎖失敗後,而後執行addWaiter(Node.EXCLUSIVE),新建一個排它模式的Node並加入等待隊列。atom

private Node addWaiter(Node mode) {
    //新建Node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //保存隊列尾部節點
    Node pred = tail;
    //若是尾節點不爲空
    if (pred != null) {
       //將新的節點prev指針指向以前的尾指針
        node.prev = pred;
       //CAS的原子操做將新建的Node加入隊列尾部
        if (compareAndSetTail(pred, node)) {
            //將以前的尾部的next指針新建的node
            pred.next = node;
            return node;
        }
    }
    //若是上面的經過CAS失敗了,則表示有其餘線程已經加入了隊列,則經過enq加入
    //新建的Node到等待隊列
    enq(node);
    return node;
}
複製代碼

下圖是上面node加入到隊列尾部的過程,這裏注意雙向隊列首先是新增的node的prev指針指向原先的tail,至於爲何在下面會有分析 image.png

下面enq就是經過for的無限循環中再以CAS方式加入了新建的Node,這樣保證新建的Node必定加入到等待隊列中。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //等待隊列爲空
        if (t == null) { // Must initialize
            //設置虛頭節點,就是空的Node
            if (compareAndSetHead(new Node()))
                //將尾部節點指向都節點
                tail = head;
        } else {
           //首先讓新建的node的前驅節點指向當前隊列的尾部節點
            node.prev = t;
            //CAS設置新建的node爲尾部接節點
            if (compareAndSetTail(t, node)) {、
               //將新建的Node設置爲新的尾部節點
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼

將新建的尾部節點加入等待隊列後,而後執行acquireQueued方法,參數是上面加入到等待隊列中的Node以及args(tryAcquire的acquire變量),這個方法的邏輯是處理新的Node加入等待隊列,而後嘗試從隊列中喚醒以前的等待隊列中的線程去運行,

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
           //獲取上一步加入隊列尾部的節點的前驅節點
            final Node p = node.predecessor();
             //若是前驅是head,說明以前乜有的線程了,則再次調用tryAcquire獲取鎖
            if (p == head && tryAcquire(arg)) {
           //設置新加入的node爲頭節點,而且設置node的thread爲空,status並無改變
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //若是node的前驅不是頭節點,則調用判斷前驅節點的waitStatus
            //是SIGNAL,則返回true
            if (shouldParkAfterFailedAcquire(p, node) &&
               //能走到這一步,說明上一步中前驅節點的waitStatus是SIGNAL,則執行  // LockSupport的park讓線程等待,直到有線程釋放鎖後執行LockSupport.unpark,而且檢查線程是否中斷
                parkAndCheckInterrupt())
                //設置中斷標誌爲true
                interrupted = true;
        }
    } finally {
        //若是此時failed是true,表示獲取鎖失敗,則執行cancelAcquire的,則取消獲        // 取鎖
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

shouldParkAfterFailedAcquire是獲取鎖後失敗後,須要park阻塞等待鎖,

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //獲取前驅節點的waitStatus
    int ws = pred.waitStatus;
    //若是是Node的等待狀態是SIGNAL,則直接返回true,表示當釋放鎖的時候能夠直接喚     //醒它,因此它是能夠直接能夠阻塞去等待鎖。
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
       //若是等待狀態大於0,表明是1(CANCELLED)取消狀態,
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
           //若是pred是取消狀態的Node,則刪除pred節點
          node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //前驅節點的next節點指向新增的尾部node
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         //若是等待狀態是0或者PROPAGATE,表示須要喚醒,可是不須要park,
         //調用者須要重試去保證它在park以前不能獲取鎖
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
複製代碼

若是獲取鎖失敗,acquireQueued方法中failed爲true時,調用cancelAcquire取消獲取鎖,

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    //移除取消狀態的節點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 獲取前驅的next節點
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    //設置當前狀態爲取消狀態
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    //若是當前節點
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 前驅節點不是頭節點
        if (pred != head &&
            //當前驅的waitStatus等於SIGNAL狀態
            ((ws = pred.waitStatus) == Node.SIGNAL ||
              //或則狀態小於0,表明不是取消狀態, 那麼設置狀態爲SIGNAL狀態
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
             //前驅節點的thread不爲空,表示不是虛頭節點
            pred.thread != null) {
            //獲取node的next節點
            Node next = node.next;
            //若是此時next不爲空 而且waitStatus小於0,表示有後續節點而且後續節點不是取消狀態
            if (next != null && next.waitStatus <= 0)
           //則將pred的next直接pred的下一個指針指向node的next, 至關於直接移除node節點
             compareAndSetNext(pred, predNext, next);
        } else {
          //1.前驅節點是頭節點
          //2.前驅節點不是SIGNAL狀態而且(waiteStatus是取消狀態或者成功設置waitStatus爲SIGNAL狀態)
          // 3.前驅的thread爲空,表明是前驅是虛頭節點
          // 以上三種狀況須要喚醒鏈表中從尾部開始遍歷開始往前找到最前的第一個
          //waitStatus不是取消狀態的節點
           unparkSuccessor(node);
        }
        //將node的next節點指向當前節點
        node.next = node; // help GC
    }
}
複製代碼

喚醒前驅節點的邏輯是從tail節點從尾部往前,經過找到鏈表上一個waiteStatus小於0的節點(即不是取消狀態的線程),而後執行LockSupport的unpark喚醒等待鎖的線程, 這個就是上面提到雙向鏈表是兩個指針,分兩步首先是加的prev指針,因此得從從後往前遍歷就能保證遍歷到所有節點

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    //表明此時節點須要喚醒
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

釋放鎖

接來下咱們看下如何釋放鎖,他同樣是來自於AQS父類的release方法,

public final boolean release(int arg) {
   //首先經過tryRelease方法抽象方法,由子類實現,實際由Sync類實現
    if (tryRelease(arg)) {
        // 獲取虛頭節點
        Node h = head;
        //若是頭節點不爲空,而且waitStatus不是爲0,則說有須要等待鎖的線程
        if (h != null && h.waitStatus != 0)
            //喚醒第一個等待鎖的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼

將state變量減去release後,若是c = 0,說明已經沒有無鎖了,那麼設置排它線程爲NULL, 並設置state變量爲0

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
複製代碼

總結 本文主要對於ReentrantLock的加排它鎖和釋放排它鎖的過程作了一次詳細的分析,主要是圍繞state變量還有CLH的雙端隊列展開,以及線程阻塞(LockSupport.park)和喚醒(LockSupport.park),最後ConditionObject沒有分析到,後面再寫。

相關文章
相關標籤/搜索