Java可重入鎖原理

1、 概述

本文首先介紹Lock接口、ReentrantLock的類層次結構以及鎖功能模板類AbstractQueuedSynchronizer的簡單原理,而後經過分析ReentrantLock的lock方法和unlock方法,來解釋ReentrantLock的內部原理,最後作一個總結。本文不涉及ReentrantLock中的條件變量。java

1.一、Lock接口

Lock接口,是對控制併發的工具的抽象。它比使用synchronized關鍵詞更靈活,而且可以支持條件變量。它是一種控制併發的工具,通常來講,它控制對某種共享資源的獨佔。也就是說,同一時間內只有一個線程能夠獲取這個鎖並佔用資源。其餘線程想要獲取鎖,必須等待這個線程釋放鎖。在Java實現中的ReentrantLock就是這樣的鎖。另一種鎖,它能夠容許多個線程讀取資源,可是隻能容許一個線程寫入資源,ReadWriteLock就是這樣一種特殊的鎖,簡稱讀寫鎖。下面是對Lock接口的幾個方法的整體描述:node

方法名稱 描述
lock 獲取鎖,若是鎖沒法獲取,那麼當前的線程就變爲不可被調度,直到鎖被獲取到
lockInterruptibly 獲取鎖,除非當前線程被中斷。若是獲取到了鎖,那麼當即返回,若是獲取不到,那麼當前線程變得不可被調度,一直休眠直到下面兩件事情發生:一、當前線程獲取到了鎖

 

二、其餘的線程中斷了當前的線程sql

tryLock 若是調用的時候可以獲取鎖,那麼就獲取鎖而且返回true,若是當前的鎖沒法獲取到,那麼這個方法會馬上返回false
tryLcok(long time,TimeUnit unit) 在指定時間內嘗試獲取鎖若是能夠獲取鎖,那麼獲取鎖而且返回true,若是當前的鎖沒法獲取,那麼當前的線程變得不可被調度,直到下面三件事之一發生:一、當前線程獲取到了鎖

 

二、當前線程被其餘線程中斷架構

三、指定的等待時間到了併發

unlock 釋放當前線程佔用的鎖
newCondition 返回一個與當前的鎖關聯的條件變量。在使用這個條件變量以前,當前線程必須佔用鎖。調用Condition的await方法,會在等待以前原子地釋放鎖,並在等待被喚醒後原子的獲取鎖

接下來,咱們將圍繞lock和unlock這兩個方法,來介紹整個ReentrantLock是怎麼工做的。在介紹ReentrantLock以前,咱們首先來看一下ReentrantLock的類層次結構以及和它密切相關的AbstractQueuedSynchronizer分佈式

1.二、ReentrantLock類層次結構

ReentrantLock實現了Lock接口,內部有三個內部類,Sync、NonfairSync、FairSync,Sync是一個抽象類型,它繼承AbstractQueuedSynchronizer,這個AbstractQueuedSynchronizer是一個模板類,它實現了許多和鎖相關的功能,並提供了鉤子方法供用戶實現,好比tryAcquire,tryRelease等。Sync實現了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync兩個類繼承自Sync,實現了lock方法,而後分別公平搶佔和非公平搶佔針對tryAcquire有不一樣的實現。高併發

1.三、AbstractQueuedSynchronizer

首先,AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer,AbstractOwnableSynchronizer的實現很簡單,它表示獨佔的同步器,內部使用變量exclusiveOwnerThread表示獨佔的線程。工具

其次,AbstractQueuedSynchronizer內部使用CLH鎖隊列來將併發執行變成串行執行。整個隊列是一個雙向鏈表。每一個CLH鎖隊列的節點,會保存前一個節點和後一個節點的引用,當前節點對應的線程,以及一個狀態。這個狀態用來代表該線程是否應該block。當節點的前一個節點被釋放的時候,當前節點就被喚醒,成爲頭部。新加入的節點會放在隊列尾部。性能

2、 非公平鎖的lock方法

2.一、lock方法流程圖

2.二、lock方法詳細描述

一、在初始化ReentrantLock的時候,若是咱們不傳參數是否公平,那麼默認使用非公平鎖,也就是NonfairSync。學習

二、當咱們調用ReentrantLock的lock方法的時候,其實是調用了NonfairSync的lock方法,這個方法先用CAS操做,去嘗試搶佔該鎖。若是成功,就把當前線程設置在這個鎖上,表示搶佔成功。若是失敗,則調用acquire模板方法,等待搶佔。代碼以下:

1

2

3

4

5

6

7

8

9

10

11

12

static final class NonfairSync extends Sync {

        final void lock() {

            if (compareAndSetState(0, 1))

                setExclusiveOwnerThread(Thread.currentThread());

            else

                acquire(1);

        }

 

        protected final boolean tryAcquire(int acquires) {

            return nonfairTryAcquire(acquires);

        }

}

 

三、調用acquire(1)實際上使用的是AbstractQueuedSynchronizer的acquire方法,它是一套鎖搶佔的模板,整體原理是先去嘗試獲取鎖,若是沒有獲取成功,就在CLH隊列中增長一個當前線程的節點,表示等待搶佔。而後進入CLH隊列的搶佔模式,進入的時候也會去執行一次獲取鎖的操做,若是仍是獲取不到,就調用LockSupport.park將當前線程掛起。那麼當前線程何時會被喚醒呢?當持有鎖的那個線程調用unlock的時候,會將CLH隊列的頭節點的下一個節點上的線程喚醒,調用的是LockSupport.unpark方法。acquire代碼比較簡單,具體以下:

 

1

2

3

4

5

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

}

 

3.一、acquire方法內部先使用tryAcquire這個鉤子方法去嘗試再次獲取鎖,這個方法在NonfairSync這個類中其實就是使用了nonfairTryAcquire,具體實現原理是先比較當前鎖的狀態是不是0,若是是0,則嘗試去原子搶佔這個鎖(設置狀態爲1,而後把當前線程設置成獨佔線程),若是當前鎖的狀態不是0,就去比較當前線程和佔用鎖的線程是否是一個線程,若是是,會去增長狀態變量的值,從這裏看出可重入鎖之因此可重入,就是同一個線程能夠反覆使用它佔用的鎖。若是以上兩種狀況都不經過,則返回失敗false。代碼以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

final boolean nonfairTryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0) // overflow

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

 

3.二、tryAcquire一旦返回false,就會則進入acquireQueued流程,也就是基於CLH隊列的搶佔模式:

3.2.一、首先,在CLH鎖隊列尾部增長一個等待節點,這個節點保存了當前線程,經過調用addWaiter實現,這裏須要考慮初始化的狀況,在第一個等待節點進入的時候,須要初始化一個頭節點而後把當前節點加入到尾部,後續則直接在尾部加入節點就好了。

代碼以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

private Node addWaiter(Node mode) {

// 初始化一個節點,這個節點保存當前線程

        Node node = new Node(Thread.currentThread(), mode);

        // 當CLH隊列不爲空的視乎,直接在隊列尾部插入一個節點

        Node pred = tail;

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

// 當CLH隊列爲空的時候,調用enq方法初始化隊列

        enq(node);

        return node;

}

 

private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            if (t == null) { // 初始化節點,頭尾都指向一個空節點

                if (compareAndSetHead(new Node()))

                    tail = head;

            } else {// 考慮併發初始化

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

        }

}

 

3.2.二、將節點增長到CLH隊列後,進入acquireQueued方法。

首先,外層是一個無限for循環,若是當前節點是頭節點的下個節點,而且經過tryAcquire獲取到了鎖,說明頭節點已經釋放了鎖,當前線程是被頭節點那個線程喚醒的,這時候就能夠將當前節點設置成頭節點,而且將failed標記設置成false,而後返回。至於上一個節點,它的next變量被設置爲null,在下次GC的時候會清理掉。

若是本次循環沒有獲取到鎖,就進入線程掛起階段,也就是shouldParkAfterFailedAcquire這個方法。

代碼以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

final boolean acquireQueued(final Node node, int arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

}

 

3.2.三、若是嘗試獲取鎖失敗,就會進入shouldParkAfterFailedAcquire方法,會判斷當前線程是否掛起,若是前一個節點已是SIGNAL狀態,則當前線程須要掛起。若是前一個節點是取消狀態,則須要將取消節點從隊列移除。若是前一個節點狀態是其餘狀態,則嘗試設置成SIGNAL狀態,並返回不須要掛起,從而進行第二次搶佔。完成上面的過後進入掛起階段。

代碼以下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)

            //

            return true;

        if (ws > 0) {

            //

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            //

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

        }

        return false;

    }

 

3.2.四、當進入掛起階段,會進入parkAndCheckInterrupt方法,則會調用LockSupport.park(this)將當前線程掛起。代碼:

 

1

2

3

4

private final boolean parkAndCheckInterrupt() {

        LockSupport.park(this);

        return Thread.interrupted();

}

 

3、 非公平鎖的unlock方法

3.一、unlock方法的活動圖

3.二、unlock方法詳細描述

一、調用unlock方法,實際上是直接調用AbstractQueuedSynchronizer的release操做。

二、進入release方法,內部先嚐試tryRelease操做,主要是去除鎖的獨佔線程,而後將狀態減一,這裏減一主要是考慮到可重入鎖可能自身會屢次佔用鎖,只有當狀態變成0,才表示徹底釋放了鎖。

三、一旦tryRelease成功,則將CHL隊列的頭節點的狀態設置爲0,而後喚醒下一個非取消的節點線程。

四、一旦下一個節點的線程被喚醒,被喚醒的線程就會進入acquireQueued代碼流程中,去獲取鎖。

具體代碼以下:

unlock代碼:

1

2

3

public void unlock() {

        sync.release(1);

}

 

release方法代碼:

1

2

3

4

5

6

7

8

9

public final boolean release(int arg) {

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h);

            return true;

        }

        return false;

}

 

Sync中通用的tryRelease方法代碼:

1

2

3

4

5

6

7

8

9

10

11

12

protected final boolean tryRelease(int releases) {

     int c = getState() - releases;

    if (Thread.currentThread() != getExclusiveOwnerThread())

                throw new IllegalMonitorStateException();

    boolean free = false;

    if (c == 0) {

          free = true;

          setExclusiveOwnerThread(null);

    }

    setState(c);

    return free;

}

 

unparkSuccessor代碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;

        if (s == null || s.waitStatus > 0) {

            s = null;

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            LockSupport.unpark(s.thread);

}

 

4、 公平鎖和非公平鎖的區別

公平鎖和非公平鎖,在CHL隊列搶佔模式上都是一致的,也就是在進入acquireQueued這個方法以後都同樣,它們的區別在初次搶佔上有區別,也就是tryAcquire上的區別,下面是二者內部調用關係的簡圖:

 

Java

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

NonfairSync

lock —> compareAndSetState

                | —> setExclusiveOwnerThread

      —> accquire

     | —> tryAcquire

                           |—>nonfairTryAcquire

                |—> acquireQueued

 

FairSync

lock —> acquire

               | —> tryAcquire

                           |—>!hasQueuePredecessors

                           |—>compareAndSetState

                           |—>setExclusiveOwnerThread

               |—> acquireQueued

 

真正的區別就是公平鎖多了hasQueuePredecessors這個方法,這個方法用於判斷CHL隊列中是否有節點,對於公平鎖,若是CHL隊列有節點,則新進入競爭的線程必定要在CHL上排隊,而非公平鎖則是無視CHL隊列中的節點,直接進行競爭搶佔,這就有可能致使CHL隊列上的節點永遠獲取不到鎖,這就是非公平鎖之因此不公平的緣由。

5、 總結

線程使用ReentrantLock獲取鎖分爲兩個階段,第一個階段是初次競爭,第二個階段是基於CHL隊列的競爭。在初次競爭的時候是否考慮隊列節點直接區分出了公平鎖和非公平鎖。在基於CHL隊列的鎖競爭中,依靠CAS操做保證原子操做,依靠LockSupport來作線程的掛起和喚醒,使用隊列來保證併發執行變成了串行執行,從而消除了併發所帶來的問題。整體來講,ReentrantLock是一個比較輕量級的鎖,並且使用面向對象的思想去實現了鎖的功能,比原來的synchronized關鍵字更加好理解。

歡迎歡迎學Java的朋友們加入java架構交流: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索