java公平鎖和非公平鎖

最經常使用的方式:node

 View Code安全

一、對於ReentrantLock須要掌握如下幾點數據結構

  • ReentrantLock的建立(公平鎖/非公平鎖)
  • 上鎖:lock()
  • 解鎖:unlock()

首先說一下類結構:ui

  • ReentrantLock-->Lock
  • NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
  • NonfairSync/FairSync-->Sync是ReentrantLock的三個內部類
  • Node是AbstractQueuedSynchronizer的內部類

注意:上邊這四條線,對應關係:"子類"-->"父類"this

 

二、ReentrantLock的建立線程

  • 支持公平鎖(先進來的線程先執行)
  • 支持非公平鎖(後進來的線程也可能先執行)

非公平鎖與非公平鎖的建立隊列

  • 非公平鎖:ReentrantLock()或ReentrantLock(false)
    final ReentrantLock lock = new ReentrantLock();
  • 公平鎖:ReentrantLock(true)
    final ReentrantLock lock = new ReentrantLock(true)

默認狀況下使用非公平鎖。ci

源代碼以下:get

ReentrantLock:同步

複製代碼

/** 同步器:內部類Sync的一個引用 */
    private final Sync sync;

    /**
     * 建立一個非公平鎖
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * 建立一個鎖
     * @param fair true-->公平鎖  false-->非公平鎖
     */
    public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

複製代碼

上述源代碼中出現了三個內部類Sync/NonfairSync/FairSync,這裏只列出類的定義,至於這三個類中的具體的方法會在後續的第一次引用的時候介紹。

Sync/NonfairSync/FairSync類定義:

複製代碼

/**
     * 該鎖同步控制的一個基類.下邊有兩個子類:非公平機制和公平機制.使用了AbstractQueuedSynchronizer類的
     */
    static abstract class Sync extends AbstractQueuedSynchronizer

    /**
     * 非公平鎖同步器
     */
    final static class NonfairSync extends Sync

    /**
     * 公平鎖同步器
     */
    final static class FairSync extends Sync

複製代碼

 

三、非公平鎖的lock()

具體使用方法:

lock.lock();

下面先介紹一下這個整體步驟的簡化版,而後會給出詳細的源代碼,並在源代碼的lock()方法部分給出詳細版的步驟。

 

簡化版的步驟:(非公平鎖的核心)

基於CAS嘗試將state(鎖數量)從0設置爲1

A、若是設置成功,設置當前線程爲獨佔鎖的線程;

B、若是設置失敗,還會再獲取一次鎖數量,

B一、若是鎖數量爲0,再基於CAS嘗試將state(鎖數量)從0設置爲1一次,若是設置成功,設置當前線程爲獨佔鎖的線程;

B二、若是鎖數量不爲0或者上邊的嘗試又失敗了,查看當前線程是否是已是獨佔鎖的線程了,若是是,則將當前的鎖數量+1;若是不是,則將該線程封裝在一個Node內,並加入到等待隊列中去。等待被其前一個線程節點喚醒。

 

源代碼:(再介紹源代碼以前,內心有一個獲取鎖的步驟的總的一個印象,就是上邊這個"簡化版的步驟")

3.一、ReentrantLock:lock()

複製代碼

/**
     *獲取一個鎖
     *三種狀況:
     *一、若是當下這個鎖沒有被任何線程(包括當前線程)持有,則當即獲取鎖,鎖數量==1,以後再執行相應的業務邏輯
     *二、若是當前線程正在持有這個鎖,那麼鎖數量+1,以後再執行相應的業務邏輯
     *三、若是當下鎖被另外一個線程所持有,則當前線程處於休眠狀態,直到得到鎖以後,當前線程被喚醒,鎖數量==1,再執行相應的業務邏輯
     */
    public void lock() {
        sync.lock();//調用NonfairSync(非公平鎖)或FairSync(公平鎖)的lock()方法
    }

複製代碼

3.二、NonfairSync:lock()

複製代碼

/**
         * 1)首先基於CAS將state(鎖數量)從0設置爲1,若是設置成功,設置當前線程爲獨佔鎖的線程;-->請求成功-->第一次插隊
         * 2)若是設置失敗(即當前的鎖數量可能已經爲1了,即在嘗試的過程當中,已經被其餘線程先一步佔有了鎖),這個時候當前線程執行acquire(1)方法
         * 2.1)acquire(1)方法首先調用下邊的tryAcquire(1)方法,在該方法中,首先獲取鎖數量狀態,
         * 2.1.1)若是爲0(證實該獨佔鎖已被釋放,當下沒有線程在使用),這個時候咱們繼續使用CAS將state(鎖數量)從0設置爲1,若是設置成功,當前線程獨佔鎖;-->請求成功-->第二次插隊;固然,若是設置不成功,直接返回false
         * 2.2.2)若是不爲0,就去判斷當前的線程是否是就是當下獨佔鎖的線程,若是是,就將當前的鎖數量狀態值+1(這也就是可重入鎖的名稱的來源)-->請求成功
         * 
         * 下邊的流程一句話:請求失敗後,將當前線程鏈入隊尾並掛起,以後等待被喚醒。
         * 
         * 2.2.3)若是最後在tryAcquire(1)方法中上述的執行都沒成功,即請求沒有成功,則返回false,繼續執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
         * 2.2)在上述方法中,首先會使用addWaiter(Node.EXCLUSIVE)將當前線程封裝進Node節點node,而後將該節點加入等待隊列(先快速入隊,若是快速入隊不成功,其使用正常入隊方法無限循環一直到Node節點入隊爲止)
         * 2.2.1)快速入隊:若是同步等待隊列存在尾節點,將使用CAS嘗試將尾節點設置爲node,並將以前的尾節點插入到node以前
         * 2.2.2)正常入隊:若是同步等待隊列不存在尾節點或者上述CAS嘗試不成功的話,就執行正常入隊(該方法是一個無限循環的過程,即直到入隊爲止)-->第一次阻塞
         * 2.2.2.1)若是尾節點爲空(初始化同步等待隊列),建立一個dummy節點,並將該節點經過CAS嘗試設置到頭節點上去,設置成功的話,將尾節點也指向該dummy節點(即頭節點和尾節點都指向該dummy節點)
         * 2.2.2.1)若是尾節點不爲空,執行與快速入隊相同的邏輯,即便用CAS嘗試將尾節點設置爲node,並將以前的尾節點插入到node以前
         * 最後,若是順利入隊的話,就返回入隊的節點node,若是不順利的話,無限循環去執行2.2)下邊的流程,直到入隊爲止
         * 2.3)node節點入隊以後,就去執行acquireQueued(final Node node, int arg)(這又是一個無限循環的過程,這裏須要注意的是,無限循環等於阻塞,多個線程能夠同時無限循環--每一個線程均可以執行本身的循環,這樣才能使在後邊排隊的節點不斷前進)
         * 2.3.1)獲取node的前驅節點p,若是p是頭節點,就繼續使用tryAcquire(1)方法去嘗試請求成功,-->第三次插隊(固然,此次插隊不必定不會使其得到執行權,請看下邊一條),
         * 2.3.1.1)若是第一次請求就成功,不用中斷本身的線程,若是是以後的循環中將線程掛起以後又請求成功了,使用selfInterrupt()中斷本身
         * (注意p==head&&tryAcquire(1)成功是惟一跳出循環的方法,在這以前會一直阻塞在這裏,直到其餘線程在執行的過程當中,不斷的將p的前邊的節點減小,直到p成爲了head且node請求成功了--即node被喚醒了,才退出循環)
         * 2.3.1.2)若是p不是頭節點,或者tryAcquire(1)請求不成功,就去執行shouldParkAfterFailedAcquire(Node pred, Node node)來檢測當前節點是否是能夠安全的被掛起,
         * 2.3.1.2.1)若是node的前驅節點pred的等待狀態是SIGNAL(便可以喚醒下一個節點的線程),則node節點的線程能夠安全掛起,執行2.3.1.3)
         * 2.3.1.2.2)若是node的前驅節點pred的等待狀態是CANCELLED,則pred的線程被取消了,咱們會將pred以前的連續幾個被取消的前驅節點從隊列中剔除,返回false(即不能掛起),以後繼續執行2.3)中上述的代碼
         * 2.3.1.2.3)若是node的前驅節點pred的等待狀態是除了上述兩種的其餘狀態,則使用CAS嘗試將前驅節點的等待狀態設爲SIGNAL,並返回false(由於CAS可能會失敗,這裏無論失敗與否,都返回false,下一次執行該方法的以後,pred的等待狀態就是SIGNAL了),以後繼續執行2.3)中上述的代碼
         * 2.3.1.3)若是能夠安全掛起,就執行parkAndCheckInterrupt()掛起當前線程,以後,繼續執行2.3)中以前的代碼
         * 最後,直到該節點的前驅節點p以前的全部節點都執行完畢爲止,咱們的p成爲了頭節點,而且tryAcquire(1)請求成功,跳出循環,去執行。
         * (在p變爲頭節點以前的整個過程當中,咱們發現這個過程是不會被中斷的)
         * 2.3.2)固然在2.3.1)中產生了異常,咱們就會執行cancelAcquire(Node node)取消node的獲取鎖的意圖。
         */
        final void lock() {
            if (compareAndSetState(0, 1))//若是CAS嘗試成功
                setExclusiveOwnerThread(Thread.currentThread());//設置當前線程爲獨佔鎖的線程
            else
                acquire(1);
        }

複製代碼

注意:在這個方法中,我列出了一個線程獲取鎖的詳細的過程,本身看註釋。

下面列出NonfairSync:lock()中調用的幾個方法與相關屬性。

3.2.一、AbstractQueuedSynchronizer:鎖數量state屬性+相關方法:

複製代碼

/**
     * 鎖數量
     */
    private volatile int state;

    /**
     * 獲取鎖數量
     */
    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

複製代碼

注意:state是volatile型的

3.2.二、AbstractOwnableSynchronizer:屬性+setExclusiveOwnerThread(Thread t)

複製代碼

/**
     * 當前擁有獨佔鎖的線程
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 設置獨佔鎖的線程爲線程t
     */
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

複製代碼

3.2.三、AbstractQueuedSynchronizer:屬性+acquire(int arg)

複製代碼

/**
     * 獲取鎖的方法
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();//中斷本身
    }

複製代碼

在介紹上邊這個方法以前,先要說一下AbstractQueuedSynchronizer的一個內部類Node的總體構造,源代碼以下:

複製代碼

/**
     * 同步等待隊列(雙向鏈表)中的節點
     */
    static final class Node {
        /** 線程被取消了 */
        static final int CANCELLED = 1;
        /** 
         * 若是前驅節點的等待狀態是SIGNAL,表示當前節點未來能夠被喚醒,那麼當前節點就能夠安全的掛起了 
         * 不然,當前節點不能掛起 
         */
        static final int SIGNAL = -1;
        /**線程正在等待條件*/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** 一個標記:用於代表該節點正在獨佔鎖模式下進行等待 */
        static final Node EXCLUSIVE = null;
        //值就是前四個int(CANCELLED/SIGNAL/CONDITION/PROPAGATE),再加一個0

        volatile int waitStatus;
        /**前驅節點*/
        volatile Node prev;

        /**後繼節點*/
        volatile Node next;

        /**節點中的線程*/
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special value SHARED.
         * Because condition queues are accessed only when holding in exclusive
         * mode, we just need a simple linked queue to hold nodes while they are
         * waiting on conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive, we save a
         * field by using special value to indicate shared mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回該節點前一個節點
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) { // 用於addWaiter中
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

複製代碼

注意:這裏我給出了Node類的完整版,其中部分屬性與方法是在共享鎖的模式下使用的,而咱們這裏的ReentrantLock是一個獨佔鎖,只需關注其中的與獨佔鎖相關的部分就好(具體有註釋)

 

3.三、AbstractQueuedSynchronizer:acquire(int arg)方法中使用到的兩個方法

3.3.一、NonfairSync:tryAcquire(int acquires)

 View Code

Syn:

 View Code

注意:這個方法就完成了"簡化版的步驟"中的"A/B/B1"三步,若是上述的請求不能成功,就要執行下邊的代碼了,

下邊的代碼,用一句話介紹:請求失敗後,將當前線程鏈入隊尾並掛起,以後等待被喚醒。在你看下邊的代碼的時候內心默記着這句話。

3.3.二、AbstractQueuedSynchronizer:addWaiter(Node mode)

 View Code

AbstractQueuedSynchronizer:enq(final Node node)

 View Code

注意:這裏就是一個完整的入隊方法,具體邏輯看註釋和ReentrantLock:lock()的註釋部分的相關部分。

3.3.三、AbstractQueuedSynchronizer:acquireQueued(final Node node, int arg)

 View Code

AbstractQueuedSynchronizer:shouldParkAfterFailedAcquire(Node pred, Node node)

 View Code

AbstractQueuedSynchronizer:

 View Code

 

以上就是一個線程獲取非公平鎖的整個過程(lock())。

 

四、公平鎖的lock()

具體用法與非公平鎖同樣

若是掌握了非公平鎖的流程,那麼掌握公平鎖的流程會很是簡單,只有兩點不一樣(最後會講)。

 

簡化版的步驟:(公平鎖的核心)

獲取一次鎖數量,

B一、若是鎖數量爲0,若是當前線程是等待隊列中的頭節點,基於CAS嘗試將state(鎖數量)從0設置爲1一次,若是設置成功,設置當前線程爲獨佔鎖的線程;

B二、若是鎖數量不爲0或者當前線程不是等待隊列中的頭節點或者上邊的嘗試又失敗了,查看當前線程是否是已是獨佔鎖的線程了,若是是,則將當前的鎖數量+1;若是不是,則將該線程封裝在一個Node內,並加入到等待隊列中去。等待被其前一個線程節點喚醒。

 

源代碼:

4.一、ReentrantLock:lock()

 View Code

4.二、FairSync:lock()

 View Code

4.三、AbstractQueuedSynchronizer:acquire(int arg)就是非公平鎖使用的那個方法

4.3.一、FairSync:tryAcquire(int acquires)

 View Code

最後,若是請求失敗後,將當前線程鏈入隊尾並掛起,以後等待被喚醒,下邊的代碼與非公平鎖同樣。

 

總結:公平鎖與非公平鎖對比

  • FairSync:lock()少了插隊部分(即少了CAS嘗試將state從0設爲1,進而得到鎖的過程)
  • FairSync:tryAcquire(int acquires)多了須要判斷當前線程是否在等待隊列首部的邏輯(實際上就是少了再次插隊的過程,可是CAS獲取仍是有的)。

最後說一句,

  • ReentrantLock是基於AbstractQueuedSynchronizer實現的,AbstractQueuedSynchronizer能夠實現獨佔鎖也能夠實現共享鎖,ReentrantLock只是使用了其中的獨佔鎖模式
  • 這一起代碼比較多,邏輯比較複雜,最好在閱讀的過程當中,能夠拿一根筆畫畫入隊等與數據結構相關的圖
  • 必定要記住"簡化版的步驟",這是整個非公平鎖與公平鎖的核心
相關文章
相關標籤/搜索