【java併發編程實戰6】AQS之獨佔鎖ReentrantLock實現

前言

自從JDK1.5後,jdk新增一個併發工具包java.util.concurrent,提供了一系列的併發工具類。而今天咱們須要學習的是java.util.concurrent.lock也就是它下面的lock包,其中有一個最爲常見類ReentrantLockjava

咱們知道ReentrantLock的功能是實現代碼段的併發訪問控制,也就是一般意義上所說的鎖。以前咱們也學習過一種鎖的實現,也就是synchronized關鍵詞,synchronized是在字節碼層面,經過對象的監視器鎖實現的。那麼ReentrantLock又是怎麼實現的呢?node

若是不看源碼,可能會覺得它的實現是經過相似於synchronized,經過對象的監視器鎖實現的。但事實上它僅僅是一個工具類!沒有使用更「高級」的機器指令,不是關鍵字,也不依靠JDK編譯時的特殊處理,僅僅做爲一個普普統統的類就完成了代碼塊的併發訪問控制,這就更讓人疑問它怎麼實現的代碼塊的併發訪問控制的了。api

咱們查看源碼發現,它是經過繼承抽象類實現的AbstractQueuedSynchronizer,爲了方便描述,接下來我將用AQS代替AbstractQueuedSynchronizer併發

關於AQS

AQS,它是用來構建鎖或者其餘同步組建的基礎框架,咱們見過許多同步工具類都是基於它構建的。包括 ReentrantLock、CountDownLatch等。在深刻了解AQS瞭解以前,咱們須要知道鎖跟AQS的區別。鎖,它是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現的細節;而AQS面像的是鎖的實現者,它簡化了鎖的實現。鎖與AQS很好的隔離使用者與實現者所須要關注的領域。那麼咱們今天就做爲一個鎖的實現者,一步一步分析鎖的實現。

AQS又稱同步器,它的內部有一個int成員變量state表示同步狀態,還有一個內置的FIFO隊列來實現資源獲取線程的排隊工做。經過它們咱們就能實現鎖。框架

在實現鎖以前,咱們須要考慮作爲鎖的使用者,鎖會有哪幾種?工具

一般來講,鎖分爲兩種,一種是獨佔鎖(排它鎖,互斥鎖),另外一種就是共享鎖了。根據這兩類,其實AQS也給咱們提供了兩套API。而咱們做爲鎖的實現者,一般都是要麼所有實現它的獨佔api,要麼實現它的共享api,而不會出現一塊兒實現的。即便juc內置的ReentrantReadWriteLock也是經過兩個子類分別來實現的。學習

鎖的實現

獨佔鎖

獨佔鎖又名互斥鎖,同一時間,只有一個線程能獲取到鎖,其他的線程都會被阻塞等待。其中咱們經常使用的ReentrantLock就是一種獨佔鎖,咱們一塊兒來ReentrantLock 分析ReentrantLock的同時看一看AQS的實現,再推理出AQS獨特的設計思路和實現方式。最後,再看其共享控制功能的實現。ui

首先咱們來看看獲取鎖的過程spa

加鎖

咱們查看ReentrantLock的源碼。來分析它的lock方法線程

public void lock() {
        sync.lock();
   }

與咱們以前分析的同樣,鎖的具體實現由內部的代理類完成,lock只是暴露給鎖的使用者的一套api。使用過ReentrantLock的同窗應該知道,ReentrantLock又分爲公平鎖和非公平鎖,因此,ReentrantLock內部只有兩個sync的實現。

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync{..}
     /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync{..}
  • 公平鎖 :每一個線程獲取鎖的順序是按照調用lock方法的前後順序來的。
  • 非公平鎖:每一個線程獲取鎖的順序是不會按照調用lock方法的前後順序來的。徹底看運氣。

因此咱們徹底能夠猜想到,這個公平與不公平的區別就體如今鎖的獲取過程。咱們以公平鎖爲例,來分析獲取鎖過程,最後對比非公平鎖的過程,尋找差別。

lock

查看FairSync的lock方法

final void lock() {
            acquire(1);
        }

這裏它調用到了父類AQS的acquire方法,因此咱們繼續查看acquire方法的代碼

acquire

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
            selfInterrupt();
    }

查看方法方法的註釋咱們能夠知道這個方法的做用,這裏我簡單的翻譯一下.

Acquires方法是一個獨佔鎖模式的方法,它是不會響應中斷的。它至少執行一次tryAcquire去獲取鎖,若是返回true,則表明獲取鎖成功,不然它將會被加入等待隊列阻塞,直到從新嘗試獲取鎖成功。因此咱們須要看看嘗試獲取鎖的方法tryAcquire的實現

tryAcruire

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

拋出一個異常,沒有實現。因此咱們須要查看它的子類,在咱們這裏就是FairSync的實現。

這裏也會你們會有疑惑,沒有實現爲何不寫成抽象方法呢,前面咱們提到過,咱們不會同時在一個類中實現獨佔鎖跟共享鎖的api,那麼tryAcruire是屬於獨佔鎖,那麼若是我想一個共享鎖也要從新獨佔鎖的方法嗎?因此大師的設計是絕對沒有問題的。
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//獲取當前線程
            int c = getState();  //獲取父類AQS中的標誌位
            if (c == 0) {
                if (!hasQueuedPredecessors() && 
                    //若是隊列中沒有其餘線程  說明沒有線程正在佔有鎖!
                    compareAndSetState(0, acquires)) { 
                    //修改一下狀態位,注意:這裏的acquires是在lock的時候傳遞來的,從上面的圖中能夠知道,這個值是寫死的1
                    setExclusiveOwnerThread(current);
                    //若是經過CAS操做將狀態爲更新成功則表明當前線程獲取鎖,所以,將當前線程設置到AQS的一個變量中,說明這個線程拿走了鎖。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
             //若是不爲0 意味着,鎖已經被拿走了,可是,由於ReentrantLock是重入鎖,
             //是能夠重複lock,unlock的,只要成對出現行。一次。這裏還要再判斷一次 獲取鎖的線程是否是當前請求鎖的線程。
                int nextc = c + acquires;//若是是的,累加在state字段上就能夠了。
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

目前爲止,若是獲取鎖成功,則返回true,獲取鎖的過程結束,若是獲取失敗,則返回false

按照以前的邏輯,若是線程獲取鎖失敗,則會被放入到隊列中,可是在放入以前,須要給線程包裝一下。

那麼這個addWaiter就是包裝線程而且放入到隊列的過程實現的方法。

addWaiter

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

註釋: 把當前線程做爲一個節點添加到隊列中,而且爲這個節點設置模式

模式: 也就是獨佔模式/共享模式,在這裏模式是形參,因此咱們看看起調方

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) Node.EXCLUSIVE 就表明這是獨佔鎖模式。

建立好節點後,將節點加入到隊列尾部,此處,在隊列不爲空的時候,先嚐試經過cas方式修改尾節點爲最新的節點,若是修改失敗,意味着有併發,這個時候纔會進入enq中死循環,「自旋」方式修改。

將線程的節點接入到隊裏中後,固然還須要作一件事:將當前線程掛起!這個事,由acquireQueued來作。

在解釋acquireQueued以前,咱們須要先看下AQS中隊列的內存結構,咱們知道,隊列由Node類型的節點組成,其中至少有兩個變量,一個封裝線程,一個封裝節點類型。

而實際上,它的內存結構是這樣的(第一次節點插入時,第一個節點是一個空節點,表明有一個線程已經獲取鎖,事實上,隊列的第一個節點就是表明持有鎖的節點):
0730009.png

黃色節點爲隊列默認的頭節點,每次有線程競爭失敗,進入隊列後其實都是插入到隊列的尾節點(tail後面)後面。這個從enq方法能夠看出來,上文中有提到enq方法爲將節點插入隊列的方法:

enq

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 一個空的節點,一般表明獲取鎖的線程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued

接着咱們來看看當節點被放入到隊列中,如何將線程掛起,也就是看看acquireQueued方法的實現。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取當前節點前驅結點
                final Node p = node.predecessor();
                // 若是前驅節點是head,那麼它就是等待隊列中的第一個線程
                // 由於咱們知道head就是獲取線程的節點,那麼它就有機會再次獲取鎖
                if (p == head && tryAcquire(arg)) {
                    //成功後,將上圖中的黃色節點移除,Node1變成頭節點。 也證明了head就是獲取鎖的線程的節點。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 一、檢查前一個節點的狀態,判斷是否要掛起
                // 二、若是須要掛起,則經過JUC下的LockSopport類的靜態方法park掛起當前線程,直到被喚醒。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 若是發生異常
            if (failed)
                // 取消請求,也就是將當前節點重隊列中移除。
                cancelAcquire(node);
        }
    }

這裏我還須要解釋的是:

一、Node節點除了存儲當前線程以外,節點類型,前驅後驅指針以後,還存儲一個叫waitStatus的變量,該變量用於描述節點的狀態。共有四種狀態。

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

分別表示:

  • 1 = 取消狀態,該節點將會被隊列移除。
  • -1 = 等待狀態,後驅節點處於等待狀態。
  • -2 = 等待被通知,該節點將會阻塞至被該鎖的condition的await方法喚醒。
  • -3 = 共享傳播狀態,表明該節點的狀態會向後傳播。

到此爲止,一個線程對於鎖的一次競爭才告於段落,結果有兩種,要麼成功獲取到鎖(不用進入到AQS隊列中),要麼,獲取失敗,被掛起,等待下次喚醒後繼續循環嘗試獲取鎖,值得注意的是,AQS的隊列爲FIFO隊列,因此,每次被CPU假喚醒,且當前線程不是出在頭節點的位置,也是會被掛起的。AQS經過這樣的方式,實現了競爭的排隊策略。

釋放鎖

看完了加鎖,再看釋放鎖。咱們先不看代碼也能夠猜想到釋放鎖須要的步驟。

  • 隊列的頭節點是當前獲取鎖的線程,因此咱們須要移除頭節點
  • 釋放鎖,喚醒頭節點後驅節點來競爭鎖

接下來咱們查看源碼來驗證咱們的猜測是否在正確。

unlock

public void unlock() {
    sync.release(1);
}

unlock方法調用AQS的release方法,由於咱們的acquire的時候傳入的是1,也就是同步狀態量+1,那麼對應的解鎖就要-1。

release

public final boolean release(int arg) {
        // 嘗試釋放鎖
        if (tryRelease(arg)) {
            // 釋放鎖成功,獲取當前隊列的頭節點
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 喚醒當前節點的下一個節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease

一樣的它是交給子類實現的

protected final boolean tryRelease(int releases) {
        
            int c = getState() - releases;
            // 當前線程不是獲取鎖的線程 拋出異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 由於是重入的關係,不是每次釋放鎖c都等於0,直到最後一次釋放鎖時,才通知AQS不須要再記錄哪一個線程正在獲取鎖。
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
    }

unparkSuccessor

釋放鎖成功以後,就喚醒頭節點後驅節點來競爭鎖

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

值得注意的是,尋找的順序是從隊列尾部開始往前去找的最前面的一個waitStatus小於0的節點。由於大於0 就是1狀態的節點是取消狀態。

公平鎖與非公平鎖

到此咱們鎖獲取跟鎖的釋放已經分析的差很少。那麼公平鎖跟非公平鎖的區別在於加鎖的過程。對比代碼

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
}
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
}

從代碼中也能夠看出來,非公平在公平鎖的加鎖的邏輯以前先直接cas修改一次state變量(嘗試獲取鎖),成功就返回,不成功再排隊,從而達到不排隊直接搶佔的目的。

最後歡迎你們關注一下個人我的公衆號。一塊兒交流一塊兒學習,有問必答。
公衆號

相關文章
相關標籤/搜索