ReentrantLock源碼解析

什麼是ReentrantLock

javadoc中的描述以下:java

一種可重入互斥鎖,具備與使用synchronized方法和語句訪問隱式監視器鎖相同的基本行爲和語義,同時具備額外的功能。node

最後成功加鎖而且沒有釋放該鎖的線程擁有一個ReentrantLock。當鎖不屬於其餘線程時,調用鎖的線程將返回併成功獲取鎖。若是當前線程已經擁有鎖,該方法將當即返回。可使用isHeldByCurrentThread和getHoldCount方法進行檢查。c#

該類的構造函數接受一個可選的公平性參數。當設置爲true時,在爭用下,鎖傾向於向等待時間最長的線程授予訪問權。不然,鎖並不保證任何特定的訪問順序。使用許多線程訪問的公平鎖的程序可能會顯示較低的整體吞吐量(即更慢),比起使用默認設置的程序,但在得到鎖和保證不會餓死方面,時間上的差別更小時間。可是請注意,鎖的公平性並不保證線程調度的公平性。所以,使用公平鎖的多個線程中的一個可能會在其餘活動線程沒有進展或當前沒有持有鎖的狀況下連續屢次得到該鎖。還要注意,非定時tryLock()方法不支持公平設置。若是鎖可用,即便其餘線程正在等待,它也會成功。多線程

建議在調用獲取鎖以後,老是當即使用try語句塊,最典型的是在構造以前/以後:app

 1  class X {
 2    private final ReentrantLock lock = new ReentrantLock();
 3    // ...
 4 
 5    public void m() {
 6      lock.lock();  // block until condition holds
 7      try {
 8        // ... method body
 9      } finally {
10        lock.unlock()
11      }
12    }
13  }

除了實現鎖接口以外,該類還定義了一些用於檢查鎖狀態的公共和受保護的方法。其中一些方法僅用於檢測和監視。函數

該類的序列化與內置鎖的行爲方式相同:反序列化鎖處於解鎖狀態,而與序列化時的狀態無關。ui

此鎖最多支持同一線程的2147483647個遞歸鎖。試圖超過此限制將致使鎖定方法拋出錯誤。this

公平鎖和非公平鎖

ReentrantLock類中定義了抽象靜態類java.util.concurrent.locks.ReentrantLock.Sync,繼承自java.util.concurrent.locks.AbstractQueuedSynchronizer。spa

ReentrantLock實例屬性中有Sync類型的sync。線程

 1     /** Synchronizer providing all implementation mechanics */
 2     private final Sync sync;
 3 
 4     /**
 5      * Base of synchronization control for this lock. Subclassed
 6      * into fair and nonfair versions below. Uses AQS state to
 7      * represent the number of holds on the lock.
 8      */
 9     abstract static class Sync extends AbstractQueuedSynchronizer {
10     ...
11     }

ReentrantLock有兩個構造函數,默認使用非公平策略。

 1     /**
 2      * Creates an instance of {@code ReentrantLock}.
 3      * This is equivalent to using {@code ReentrantLock(false)}.
 4      */
 5     public ReentrantLock() {
 6         sync = new NonfairSync();
 7     }
 8 
 9     /**
10      * Creates an instance of {@code ReentrantLock} with the
11      * given fairness policy.
12      *
13      * @param fair {@code true} if this lock should use a fair ordering policy
14      */
15     public ReentrantLock(boolean fair) {
16         sync = fair ? new FairSync() : new NonfairSync();
17     }

 

獲取鎖的實現

 ReentrantLock.lock()方法實際上是依賴於其內部sync對象的實現,下面以非公平策略(即內部sync對象是java.util.concurrent.locks.ReentrantLock.NonfairSync類型)爲例

1     public void lock() {
2         sync.lock();
3     }

 

 NonfairSync類的lock()實現中,先調用AbstractQueuedSynchronizer.compareAndSetState(int expect, int update),設置AbstractQueuedSynchronizer的state爲1。若是設置成功,調用AbstractOwnableSynchronizer.setExclusiveOwnerThread(Thread thread)設置exclusiveOwnerThread屬性爲當前線程。若是不成功,調用AbstractQueuedSynchronizer#acquire(int arg)嘗試獲取鎖。

 1         /**
 2          * Performs lock.  Try immediate barge, backing up to normal
 3          * acquire on failure.
 4          */
 5         final void lock() {
 6             if (compareAndSetState(0, 1))
 7                 setExclusiveOwnerThread(Thread.currentThread());
 8             else
 9                 acquire(1);
10         }

在AbstractQueuedSynchronizer#acquire(int arg)方法中,if的第一個判斷條件調用ReentrantLock.NonfairSync#tryAcquire(int acquires)方法嘗試獲取鎖,該方法直接調用了抽象靜態類ReentrantLock.Sync#nonfairTryAcquire(int acquires)方法。若是返回false,則添加進FIFO等待隊列中。

1     public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

抽象靜態類ReentrantLock.Sync#nonfairTryAcquire(int acquires)方法中,若是此時state爲0,說明鎖已經被其餘線程釋放,則再次嘗試獲取鎖。若是當前線程已經持有了鎖,則會把state+1,當state<0時,則拋出錯誤,所以最多能重複加鎖Int.MAX_VALUE次。若是仍舊獲取不到鎖,則返回false。

 1        final boolean nonfairTryAcquire(int acquires) {
 2             final Thread current = Thread.currentThread();
 3             int c = getState();
 4             if (c == 0) {
 5                 if (compareAndSetState(0, acquires)) {
 6                     setExclusiveOwnerThread(current);
 7                     return true;
 8                 }
 9             }
10             else if (current == getExclusiveOwnerThread()) {
11                 int nextc = c + acquires;
12                 if (nextc < 0) // overflow
13                     throw new Error("Maximum lock count exceeded");
14                 setState(nextc);
15                 return true;
16             }
17             return false;
18         }

若是沒有獲取到鎖,則線程應該被阻塞,首先調用AbstractQueuedSynchronizer#addWaiter(Node mode) 將當前線程建立爲Node並添加到隊列中。方法內部先判斷是否已經存在尾部節點。若是已經存在,將這個node添加到原尾部節點的尾部。若是不存在,調用AbstractQueuedSynchronizer#enq(final Node node)建立隊列。

 1     private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;
 5         if (pred != null) {
 6             node.prev = pred;
 7             if (compareAndSetTail(pred, node)) {
 8                 pred.next = node;
 9                 return node;
10             }
11         }
12         enq(node);
13         return node;
14     }

在AbstractQueuedSynchronizer#enq(final Node node)方法中,經過自旋和CAS操做保證能寫入隊列的尾部。

 1     private Node enq(final Node node) {
 2         for (;;) {
 3             Node t = tail;
 4             if (t == null) { // Must initialize
 5                 if (compareAndSetHead(new Node()))
 6                     tail = head;
 7             } else {
 8                 node.prev = t;
 9                 if (compareAndSetTail(t, node)) {
10                     t.next = node;
11                     return t;
12                 }
13             }
14         }
15     }

與該線程相關聯的Node對象建立以後,調用AbstractQueuedSynchronizer#acquireQueued(final Node node, int arg)方法。若是當前node是頭部節點,則再次嘗試獲取鎖。

 1     final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 final Node p = node.predecessor();
 7                 if (p == head && tryAcquire(arg)) {
 8                     setHead(node);
 9                     p.next = null; // help GC
10                     failed = false;
11                     return interrupted;
12                 }
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

若是仍然沒法獲取到,則調用AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire(Node pred, Node node)檢測及更新node的status爲Node.SIGNAL狀態。

 1     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus;
 3         if (ws == Node.SIGNAL)
 4             /*
 5              * This node has already set status asking a release
 6              * to signal it, so it can safely park.
 7              */
 8             return true;
 9         if (ws > 0) {
10             /*
11              * Predecessor was cancelled. Skip over predecessors and
12              * indicate retry.
13              */
14             do {
15                 node.prev = pred = pred.prev;
16             } while (pred.waitStatus > 0);
17             pred.next = node;
18         } else {
19             /*
20              * waitStatus must be 0 or PROPAGATE.  Indicate that we
21              * need a signal, but don't park yet.  Caller will need to
22              * retry to make sure it cannot acquire before parking.
23              */
24             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
25         }
26         return false;
27     }

若是成功,調用java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt()方法將調用LockSupport#park(java.lang.Object)方法,將該線程設置爲等待狀態

1     private final boolean parkAndCheckInterrupt() {
2         LockSupport.park(this);
3         return Thread.interrupted();
4     }

 

釋放鎖的實現

ReentrantLock#unlock其實是調用了AbstractQueuedSynchronizer#release(int arg) 方法

1     public void unlock() {
2         sync.release(1);
3     }

在AbstractQueuedSynchronizer#release(int releases)方法中,先調用ReentrantLock.Sync#tryRelease(int releases)方法嘗試釋放鎖

1     public final boolean release(int arg) {
2         if (tryRelease(arg)) {
3             Node h = head;
4             if (h != null && h.waitStatus != 0)
5                 unparkSuccessor(h);
6             return true;
7         }
8         return false;
9     }

在ReentrantLock.Sync#tryRelease(int releases)方法嘗試釋放鎖時,只有當c == 0時纔將鎖釋放

 1         protected final boolean tryRelease(int releases) {
 2             int c = getState() - releases;
 3             if (Thread.currentThread() != getExclusiveOwnerThread())
 4                 throw new IllegalMonitorStateException();
 5             boolean free = false;
 6             if (c == 0) {
 7                 free = true;
 8                 setExclusiveOwnerThread(null);
 9             }
10             setState(c);
11             return free;
12         }

鎖釋放後,調用AbstractQueuedSynchronizer#unparkSuccessor(Node node)方法將隊列的頭部節點喚醒

 1     private void unparkSuccessor(Node node) {
 2         /*
 3          * If status is negative (i.e., possibly needing signal) try
 4          * to clear in anticipation of signalling.  It is OK if this
 5          * fails or if status is changed by waiting thread.
 6          */
 7         int ws = node.waitStatus;
 8         if (ws < 0)
 9             compareAndSetWaitStatus(node, ws, 0);
10 
11         /*
12          * Thread to unpark is held in successor, which is normally
13          * just the next node.  But if cancelled or apparently null,
14          * traverse backwards from tail to find the actual
15          * non-cancelled successor.
16          */
17         Node s = node.next;
18         if (s == null || s.waitStatus > 0) {
19             s = null;
20             for (Node t = tail; t != null && t != node; t = t.prev)
21                 if (t.waitStatus <= 0)
22                     s = t;
23         }
24         if (s != null)
25             LockSupport.unpark(s.thread);
26     }

因爲線程是在調用lock()方法時,在AbstractQueuedSynchronizer#acquireQueued(final Node node, int arg)方法的第14行被掛起,喚醒後會繼續執行後續的代碼,再次進入for循環,獲取到鎖後,將自己節點設置爲head節點,原head節點的next設置爲null,以便進行回收。

相關文章
相關標籤/搜索