Java多線程--AQS

ReentrantLock和AQS的關係

首先咱們來看看,若是用java併發包下的ReentrantLock來加鎖和釋放鎖,是個什麼樣的:java

1 ReentrantLock reentrantLock = new ReentrantLock();
2 reentrantLock.lock();
3 //業務代碼
4 reentrantLock.unlock();

 上面那段代碼就是搞一個Lock對象,而後加鎖和釋放鎖。那麼,這個跟AQS有啥關係?關係大了去了,由於java併發包下不少API都是基於AQS來實現的加鎖和釋放鎖等功能的,AQS是java併發包的基礎類。上一部分源碼:node

 1 public class ReentrantLock implements Lock, java.io.Serializable {
 2     private static final long serialVersionUID = 7373984872572414699L;
 3     /** Synchronizer providing all implementation mechanics */
 4     private final Sync sync;
 5 
 6     /**
 7      * Base of synchronization control for this lock. Subclassed
 8      * into fair and nonfair versions below. Uses AQS state to
 9      * represent the number of holds on the lock.
10      */
11     abstract static class Sync extends AbstractQueuedSynchronizer {
12         private static final long serialVersionUID = -5179523762034025860L;
13 
14         /**
15          * Performs {@link Lock#lock}. The main reason for subclassing
16          * is to allow fast path for nonfair version.
17          */
18         abstract void lock();
19 
20         /**
21          * Performs non-fair tryLock.  tryAcquire is implemented in
22          * subclasses, but both need nonfair try for trylock method.
23          */
24         final boolean nonfairTryAcquire(int acquires) {
25             final Thread current = Thread.currentThread();
26             int c = getState();
27             if (c == 0) {
28                 if (compareAndSetState(0, acquires)) {
29                     setExclusiveOwnerThread(current);
30                     return true;
31                 }
32             }
33             else if (current == getExclusiveOwnerThread()) {
34                 int nextc = c + acquires;
35                 if (nextc < 0) // overflow
36                     throw new Error("Maximum lock count exceeded");
37                 setState(nextc);
38                 return true;
39             }
40             return false;
41         }
42 
43         protected final boolean tryRelease(int releases) {
44             int c = getState() - releases;
45             if (Thread.currentThread() != getExclusiveOwnerThread())
46                 throw new IllegalMonitorStateException();
47             boolean free = false;
48             if (c == 0) {
49                 free = true;
50                 setExclusiveOwnerThread(null);
51             }
52             setState(c);
53             return free;
54         }
55 
56         protected final boolean isHeldExclusively() {
57             // While we must in general read state before owner,
58             // we don't need to do so to check if current thread is owner
59             return getExclusiveOwnerThread() == Thread.currentThread();
60         }
61 
62         final ConditionObject newCondition() {
63             return new ConditionObject();
64         }
65 
66         // Methods relayed from outer class
67 
68         final Thread getOwner() {
69             return getState() == 0 ? null : getExclusiveOwnerThread();
70         }
71 
72         final int getHoldCount() {
73             return isHeldExclusively() ? getState() : 0;
74         }
75 
76         final boolean isLocked() {
77             return getState() != 0;
78         }
79 
80         /**
81          * Reconstitutes the instance from a stream (that is, deserializes it).
82          */
83         private void readObject(java.io.ObjectInputStream s)
84             throws java.io.IOException, ClassNotFoundException {
85             s.defaultReadObject();
86             setState(0); // reset to unlocked state
87         }
88     }
89 }

說白了,ReentrantLock內部包含了一個AQS對象,也就是AbstractQueuedSynchronizer類型的對象。這個AQS對象就是ReentrantLock能夠實現加鎖和釋放鎖的關鍵性的核心組件。併發

ReentrantLock加鎖和釋放鎖的底層原理

如今若是有一個線程過來嘗試用ReentrantLock的lock()方法進行加鎖,會發生什麼事情呢?ui

 1 public abstract class AbstractQueuedSynchronizer
 2     extends AbstractOwnableSynchronizer
 3     implements java.io.Serializable {
 4 
 5    /**
 6     * The thread that enqueued this node.  Initialized on
 7     * construction and nulled out after use.
 8     */
 9     volatile Thread thread;
10 
11     /**
12      * The synchronization state.
13      */
14     private volatile int state;
15 
16 }

這個AQS對象內部有一個核心的變量叫作state,是int類型的,表明了加鎖的狀態。初始狀態下,這個state的值是0。另外,這個AQS內部還有一個關鍵變量,用來記錄當前加鎖的是哪一個線程,初始化狀態下,這個變量是null。接着線程1跑過來調用ReentrantLock的lock()方法嘗試進行加鎖,這個加鎖的過程,直接就是用CAS操做將state值從0變爲1。若是以前沒人加過鎖,那麼state的值確定是0,此時線程1就能夠加鎖成功。一旦線程1加鎖成功了以後,就能夠設置當前加鎖線程是本身。this

AQS就是併發包裏的一個核心組件,裏面有state變量、加鎖線程變量等核心的東西,維護了加鎖狀態。ReentrantLock這種東西只是一個外層的API,內核中的鎖機制實現都是依賴AQS組件的。這個ReentrantLock之因此用Reentrant打頭,意思就是他是一個可重入鎖。意思就是你能夠對一個ReentrantLock對象屢次執行lock()加鎖和unlock()釋放鎖,也就是能夠對一個鎖加屢次,叫作可重入加鎖。你們看明白了那個state變量以後,就知道了如何進行可重入加鎖!其實每次線程1可重入加鎖一次,會判斷一下當前加鎖線程就是本身,那麼他本身就能夠可重入屢次加鎖,每次加鎖就是把state的值給累加1,別的沒啥變化,實現原理以下:spa

 1 public class ReentrantLock implements Lock, java.io.Serializable {
 2     /**
 3      * Sync object for non-fair locks
 4      */
 5     static final class NonfairSync extends Sync {
 6         private static final long serialVersionUID = 7316153563782823691L;
 7 
 8         /**
 9          * Performs lock.  Try immediate barge, backing up to normal
10          * acquire on failure.
11          */
12         final void lock() {
13             if (compareAndSetState(0, 1))
14                 setExclusiveOwnerThread(Thread.currentThread());
15             else
16                 acquire(1);
17         }
18 
19         protected final boolean tryAcquire(int acquires) {
20             return nonfairTryAcquire(acquires);
21         }
22     }
23 }
24 
25 public abstract class AbstractQueuedSynchronizer
26     extends AbstractOwnableSynchronizer
27     implements java.io.Serializable {
28 
29     /**
30      * Acquires in exclusive mode, ignoring interrupts.  Implemented
31      * by invoking at least once {@link #tryAcquire},
32      * returning on success.  Otherwise the thread is queued, possibly
33      * repeatedly blocking and unblocking, invoking {@link
34      * #tryAcquire} until success.  This method can be used
35      * to implement method {@link Lock#lock}.
36      *
37      * @param arg the acquire argument.  This value is conveyed to
38      *        {@link #tryAcquire} but is otherwise uninterpreted and
39      *        can represent anything you like.
40      */
41     public final void acquire(int arg) {
42         if (!tryAcquire(arg) &&
43             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
44             selfInterrupt();
45     }
46 
47 }

接着,若是線程1加鎖了以後,線程2跑過來加鎖會怎麼樣呢?咱們來看看鎖的互斥是如何實現的,線程2跑過來一下看到state的值不是0,因此CAS操做將state從0變爲1的過程會失敗,由於state的值當前爲1,說明已經有人加鎖了!接着線程2會看一下,是否是本身以前加的鎖啊?固然不是了,「加鎖線程」這個變量明確記錄了是線程1佔用了這個鎖,因此線程2此時就是加鎖失敗。接着,線程2會將本身放入AQS中的一個等待隊列,由於本身嘗試加鎖失敗了,此時就要將本身放入隊列中來等待,等待線程1釋放鎖以後,本身就能夠從新嘗試加鎖了,因此你們能夠看到,AQS是如此的核心。AQS內部還有一個等待隊列,專門放那些加鎖失敗的線程。線程

 1 /**
 2  * Condition implementation for a {@link
 3  * AbstractQueuedSynchronizer} serving as the basis of a {@link
 4  * Lock} implementation.
 5  *
 6  * <p>Method documentation for this class describes mechanics,
 7  * not behavioral specifications from the point of view of Lock
 8  * and Condition users. Exported versions of this class will in
 9  * general need to be accompanied by documentation describing
10  * condition semantics that rely on those of the associated
11  * {@code AbstractQueuedSynchronizer}.
12  *
13  * <p>This class is Serializable, but all fields are transient,
14  * so deserialized conditions have no waiters.
15  */
16 public class ConditionObject implements Condition, java.io.Serializable {
17         private static final long serialVersionUID = 1173984872572414699L;
18         /** First node of condition queue. */
19         private transient Node firstWaiter;
20         /** Last node of condition queue. */
21         private transient Node lastWaiter;
22 
23         /**
24          * Creates a new {@code ConditionObject} instance.
25          */
26         public ConditionObject() { }
27 
28         // Internal methods
29 
30         /**
31          * Adds a new waiter to wait queue.
32          * @return its new wait node
33          */
34         private Node addConditionWaiter() {
35             Node t = lastWaiter;
36             // If lastWaiter is cancelled, clean out.
37             if (t != null && t.waitStatus != Node.CONDITION) {
38                 unlinkCancelledWaiters();
39                 t = lastWaiter;
40             }
41             Node node = new Node(Thread.currentThread(), Node.CONDITION);
42             if (t == null)
43                 firstWaiter = node;
44             else
45                 t.nextWaiter = node;
46             lastWaiter = node;
47             return node;
48         }
49 }

接着,線程1在執行完本身的業務邏輯代碼以後,就會釋放鎖,他釋放鎖的過程很是的簡單,就是將AQS內的state變量的值遞減1,若是state值爲0,則完全釋放鎖,會將「加鎖線程」變量也設置爲null!code

 1 public class ReentrantLock implements Lock, java.io.Serializable {  
 2     /**
 3      * Attempts to release this lock.
 4      *
 5      * <p>If the current thread is the holder of this lock then the hold
 6      * count is decremented.  If the hold count is now zero then the lock
 7      * is released.  If the current thread is not the holder of this
 8      * lock then {@link IllegalMonitorStateException} is thrown.
 9      *
10      * @throws IllegalMonitorStateException if the current thread does not
11      *         hold this lock
12      */
13     public void unlock() {
14         sync.release(1);
15     }
16 }
17 
18 public abstract class AbstractQueuedSynchronizer
19     extends AbstractOwnableSynchronizer
20     implements java.io.Serializable {
21      public final boolean release(int arg) {
22         if (tryRelease(arg)) {
23             Node h = head;
24             if (h != null && h.waitStatus != 0)
25                 unparkSuccessor(h);
26             return true;
27         }
28         return false;
29     }
30 }

 

接下來,會從等待隊列的隊頭喚醒線程2從新嘗試加鎖。好!線程2如今就從新嘗試加鎖,這時仍是用CAS操做將state從0變爲1,此時就會成功,成功以後表明加鎖成功,就會將state設置爲1。此外,還要把「加鎖線程」設置爲線程2本身,同時線程2本身就從等待隊列中出隊了。orm

其實一句話總結:AQS就是一個併發包的基礎組件,用來實現各類鎖,各類同步組件的。它包含了state變量、加鎖線程、等待隊列等併發中的核心組件。對象

相關文章
相關標籤/搜索