ReetrantLock源碼分析

ReentrantLock類的大部分邏輯,都是其均繼承自AQS的內部類Sync實現的java

啥是AQS:

Java併發編程核心在於java.concurrent.util包而juc當中的大多數同步器實現都是圍繞着共同的基礎行爲,好比等待隊列、條件隊列、獨佔獲取、共享獲取等,而這個行爲的抽象就是基於AbstractQueuedSynchronizer簡稱AQS 它定義了一套多線程訪問共享資源的同步器框架,是一個依賴狀態(state)的同步器node

以公平鎖爲例子:算法

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(true);
        lock.lock(); //加鎖 斷點處
        try {
            Thread.sleep(5000);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
          lock.unlock();
        }
    

公平鎖、非公平鎖

public ReentrantLock(boolean fair) //ReetrantLock的有參構造函數
    sync = fair ? new FairSync() : new NonfairSync();
}

在加鎖行打斷點運行, 咱們能夠看到參數:編程

image-20201105163128535

image-20201105163128535多線程

記住這幾個值,後面會用到.併發

單步步入 F7框架

image-20201105163427610

image-20201105163427610less

看到lock()調用了Sync實例中的lock()方法函數

咱們能夠看到Sync是在ReentrantLock類中的一個抽象內部類 繼承於AbstractQueuedSynchronizer (AQS)(抽象隊列同步器) image-20201105163720831ui

點擊Sync類中lock抽象接口方法 咱們能夠發現ReetrantLock有公平和非公平鎖兩種方式實現方式.因爲本章咱們使用公平鎖講解因此咱們選擇公平鎖的實現方式繼續向下調試代碼 image-20201105165340504

 /**      * Sync object for fair locks      */
    static final class FairSync extends Sync //公平鎖實現方式 繼承於Sync
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);//獲取鎖 傳入參數1
        }
..

向下⬇️

  public final void acquire(int arg) {  //acquire方法接收參數 1
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           //這裏有兩個操做 使用短路與 若是第一個操做成功第二個操做不執行
      //1.嘗試以獨佔模式獲取鎖,在成功時返回 
      //2.線程進入隊列排隊
            selfInterrupt();
    }

方式1 首先嚐試獲取鎖

  /**          * Fair version of tryAcquire.  Don't grant access unless          * recursive call or no waiters or is first.          */
        protected final boolean tryAcquire(int acquires) {  //參數:1
            final Thread current = Thread.currentThread();  //獲取當前線程
            int c = getState(); //還記得咱們第一個斷點截圖的state值是0
            if (c == 0) {   //是0 說明咱們是第一次獲取鎖,若是不是0說明是重入,會進入else
                if (!hasQueuedPredecessors() &&
         //公平鎖和非公平鎖,主要是在方法 tryAcquire 中,是否有 !hasQueuedPredecessors() 判斷。
                    compareAndSetState(0, acquires)) {//經過cas操做state更新,指望值是0,新值是1
                    setExclusiveOwnerThread(current);//設置當前擁有獨佔訪問的線程。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { //若是 是重入狀況 會進入這裏.
          //進入前會判斷是不是當前線程拿的鎖
                int nextc = c + acquires; //假設重入次數爲2  那麼nextc = 2+1 =3
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); //更新state值
                return true;
            }
            return false;
        }

先判斷state是否爲0,若是爲0就執行上面提到的lock方法的前半部分,經過CAS操做將state的值從0變爲1,不然判斷當前線程是否爲exclusiveOwnerThread,而後把state++,也就是重入鎖的體現,咱們注意前半部分是經過CAS來保證同步,後半部分並無同步的體現,緣由是:後半部分是線程重入,再次得到鎖時才觸發的操做,此時當前線程擁有鎖,因此對ReentrantLock的屬性操做是無需加鎖的。若是tryAcquire()獲取失敗,則要執行addWaiter()向等待隊列中添加一個獨佔模式的節點。

public final boolean hasQueuedPredecessors() {
    Node t = tail; // 根據初始化順序倒序獲取字段
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
//在這個判斷中主要就是看當前線程是否是同步隊列的首位,是:true、否:false
//這部分涉及公平鎖的實現,CLH(Craig,Landin andHagersten)。三個做者的首字母組合

啥是CLH

CLH鎖即Craig, Landin, and Hagersten (CLH) locks。CLH鎖是一個自旋鎖。能確保無飢餓性。提供先來先服務的公平性。 爲何說JUC中的實現是基於CLH的「變種」,由於原始CLH隊列,通常用於實現自旋鎖。而JUC中的實現,獲取不到鎖的線程,通常會時而阻塞,時而喚醒。

image-20201106144400019 1.獲取不到鎖的線程,會進入隊尾,而後自旋,直到其前驅線程釋放鎖,具體位置是放在tail後的null位置,並讓新對象的next指向Null 2.若是head成功拿到了鎖 此時 把head中包含的線程指向爲Null並將head的前任設置爲head後清除現任head

方式2 沒獲取到鎖,須要進入隊列排隊:

進入方式須要改造項目讓多個線程進行搶佔資源改造以下:

import java.util.concurrent.locks.ReentrantLock;

class Scratch {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(true);
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                lock.lock();
                try {
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName()+"工做結束....");
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }).start();
        }
    }
}

回到這裏發現等待隊列addWaiter方法添加了一個Null 將本身加入CLH隊列的尾部 關注公衆號:[JAVA寶典]

image-20201105171529838

image-20201105171529838

發現等待隊列addWaiter方法添加了一個Null null的含義是: image-20201105171624412

使用null是私有佔用資源模式,使用new Node()是 共享模式.(順帶一提,writelock不互斥,就是使用的共享模式)

Node還有幾個等待狀態:

 /**          * Status field, taking on only the values:          *   SIGNAL:     The successor of this node is (or will soon be)          *               blocked (via park), so the current node must          *               unpark its successor when it releases or          *               cancels. To avoid races, acquire methods must          *               first indicate they need a signal,          *               then retry the atomic acquire, and then,          *               on failure, block.          *   CANCELLED:  This node is cancelled due to timeout or interrupt.          *               Nodes never leave this state. In particular,          *               a thread with cancelled node never again blocks.          *   CONDITION:  This node is currently on a condition queue.          *               It will not be used as a sync queue node          *               until transferred, at which time the status          *               will be set to 0. (Use of this value here has          *               nothing to do with the other uses of the          *               field, but simplifies mechanics.)          *   PROPAGATE:  A releaseShared should be propagated to other          *               nodes. This is set (for head node only) in          *               doReleaseShared to ensure propagation          *               continues, even if other operations have          *               since intervened.          *   0:          None of the above          *          * The values are arranged numerically to simplify use.          * Non-negative values mean that a node doesn't need to          * signal. So, most code doesn't need to check for particular          * values, just for sign.          *          * The field is initialized to 0 for normal sync nodes, and          * CONDITION for condition nodes.  It is modified using CAS          * (or when possible, unconditional volatile writes).          */
        volatile int waitStatus;

在addWaiter(Node.EXCLUSIVE)處斷點:

 /**      * 根據給定的模式爲當前節點建立一個Node      *      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared    //      * @return the new node      */
    private Node addWaiter(Node mode) {
        //線程對應的Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //尾節點不爲空
        if (pred != null) {
            //當前node的前驅指向尾節點
            node.prev = pred;
            //將當前node設置爲新的尾節點
            //若是cas操做失敗,說明線程競爭
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //lockfree的方式插入隊尾
        enq(node);  //只有在 tail == null時才進入
        return node;
    }

先找到等待隊列的tail節點pred,若是pred!=null,就把當前線程添加到pred後面進入等待隊列,若是不存在tail節點執行enq()

    private Node enq(final Node node) {
        //經典的lockfree算法:循環+CAS
        for (;;) {
            Node t = tail;
            //尾節點爲空
            if (t == null) { // Must initialize
                //初始化頭節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

這裏進行了循環,若是此時存在了tail就執行同上一步驟的添加隊尾操做,若是依然不存在,就把當前線程做爲head結點。 插入節點後,調用acquireQueued()進行阻塞

    /**      * Acquires in exclusive uninterruptible mode for thread already in      * queue. Used by condition wait methods as well as acquire.      *      * @param node the node      * @param arg the acquire argument      * @return {@code true} if interrupted while waiting      */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null// help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

先獲取當前節點的前一節點p,若是p是head的話就再進行一次tryAcquire(arg)操做,若是成功就返回,不然就執行shouldParkAfterFailedAcquire、parkAndCheckInterrupt來達到阻塞效果;

 

unlock

在unlock處打斷點

進入了

    public void unlock() {
        sync.release(1);
    }

image-20201106142501425

image-20201106142501425

繼續跟進發現進入release方法,繼續查看tryRelease(arg)方法

嘗試釋放鎖有三種實現 咱們點ReetrantLock實現方式 image-20201106142623929

源碼: image-20201106142942146

        protected final boolean tryRelease(int releases) //最後進入到實際解鎖源碼中
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) //判斷持有線程和當前執行線程是不是同一個,不然報錯
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { //判斷釋放掉releases參數後是不是0 若是 爲0 設置當前排他鎖的佔有線程爲Null
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);//更新state爲0
            return free;//返回解鎖是否成功
        }
相關文章
相關標籤/搜索