Java併發編程--AQS

概述

  抽象隊列同步器(AbstractQueuedSynchronizer,簡稱AQS)是用來構建鎖或者其餘同步組件的基礎框架,它使用一個整型的volatile變量(命名爲state)來維護同步狀態,經過內置的FIFO隊列來完成資源獲取線程的排隊工做。html

  volatile變量的讀寫和CAS是concurrent包得以實現的基礎。CAS表示若是當前狀態值等於預期值,則以原子方式將同步狀態設置爲給定的更新值,此操做具備volatile讀和寫的內存語義。AQS經過volatile的讀/寫和CAS所具備的volatile讀和寫的內存語義來實現線程之間的通訊。java

  高層類   Lock  同步器  阻塞隊列  Executor  併發容器
  基礎類 AQS  非阻塞數據結構  原子變量類
  volatile變量的讀/寫  CAS

  concurrent包的實現結構如上圖所示,AQS、非阻塞數據結構和原子變量類等基礎類都是基於volatile變量的讀/寫和CAS實現,而像Lock、同步器、阻塞隊列、Executor和併發容器等高層類又是基於基礎類實現。node

AQS的域和方法

  域

1 private transient volatile Node head; //同步隊列的head節點
2 private transient volatile Node tail; //同步隊列的tail節點
3 private volatile int state; //同步狀態

 

  方法

    AQS提供的能夠修改同步狀態的3個方法:編程

1 protected final int getState();  //獲取同步狀態
2 protected final void setState(int newState);  //設置同步狀態
3 protected final boolean compareAndSetState(int expect, int update);  //CAS設置同步狀態

 

    AQS提供的模板方法,主要有如下三類:安全

      1)獨佔式獲取和釋放同步狀態數據結構

1 public final void acquire(int arg) //獨佔式獲取同步狀態,若是不成功會進入同步隊列等待。
2 public final void acquireInterruptibly(int arg) //與acquire不一樣的是,能響應中斷
3 public final boolean tryAcquireNanos(int arg, long nanosTimeout) //增長超時機制
4 
5 public final boolean release(int arg) //獨佔式釋放同步狀態,該方法會調用重寫的tryRelease(int arg)。

 

        以上三種獲取同步狀態的方法都會調用自定義的tryAcquire(int arg)方法,acquire的獲取失敗時的入隊等待機制、acquireInterruptibly的響應中斷機制、tryAcquireNanos的超時機制等AQS已經實現好,這樣開發人員只須要實現本身的獲取同步狀態機制,就能夠大大下降了實現一個可靠自定義同步組件的門檻。併發

      2)共享式獲取和釋放同步狀態app

1 public final void acquireShared(int arg) //共享式獲取同步狀態,若是不成功會進入同步隊列等待。與獨佔式不一樣的是,同一時刻能夠有多個線程獲取到同步狀態。
2 public final void acquireSharedInterruptibly(int arg) //可響應中斷
3 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) //超時機制
4 
5 public final boolean releaseShared(int arg) //共享式釋放同步狀態,該方法會調用重寫的tryReleaseShared(int arg)。

 

        一樣以上三種獲取同步狀態的方法會調用自定義的tryAcquireShared方法。框架

      3)查詢同步隊列中的等待線程情ide

1 publicfinalCollection<Thread>getQueuedThreads()
2 publicfinalbooleanhasQueuedThreads()//返回包含可能正在等待獲取的線程列表,須要遍歷鏈表。返回的只是個估計值,且是無序的。這個方法的主要是爲子類提供的監視同步隊列措施而設計。
3 。。。

 

    

    AQS提供的自定義方法

      以上AQS的方法都爲final方法,不能被子類重寫,由於它們對於任何自定義同步器應該是不須要更改的,下面爲AQS提供的能夠重寫的方法。開發者須要根據自定義同步組件的特色,重寫如下方法。這些方法的實如今內部必須是線程安全的,一般應該很短而且不被阻塞。

1 protected boolean tryAcquire(int arg) //獨佔式獲取同步狀態,此方法應該查詢是否容許它在獨佔模式下獲取對象狀態,若是容許,則獲取它。返回值語義:true表明獲取成功,false表明獲取失敗。
2 protected boolean tryRelease(int arg) //獨佔式釋放同步狀態
3 
4 protected int tryAcquireShared(int arg) //共享式獲取同步狀態,返回值語義:負數表明獲取失敗、0表明獲取成功但沒有剩餘資源、正數表明獲取成功,還有剩餘資源。
5 protected boolean tryReleaseShared(int arg) //共享式釋放同步狀態
6 
7 protected boolean isHeldExclusively() //AQS是否被當前線程所獨佔

 

AQS的使用

  怎麼使用AQS實現自定義同步組件?

  自定義同步組件實例一:獨佔鎖(使用獨佔式的獲取與釋放)。

    Mutex同步組件同一時刻只容許一個線程佔用鎖,不支持可重入。0表示未鎖定狀態,1表示鎖定狀態。 

 1 class Mutex implements Lock, java.io.Serializable {
 2  
 3     //靜態內部類,自定義同步器
 4     private static class Sync extends AbstractQueuedSynchronizer {
 5     
 6         // 釋放處於佔用狀態(重寫isHeldExclusively)Report whether in locked state
 7         protected boolean isHeldExclusively() { 
 8             return getState() == 1; 
 9         }
10 
11         // 獨佔式獲取鎖(重寫tryAcquire) Acquire the lock if state is zero
12         public boolean tryAcquire(int acquires) {
13             assert acquires == 1; // Otherwise unused
14             if (compareAndSetState(0, 1)) {    //CAS設置狀態爲1。
15                 setExclusiveOwnerThread(Thread.currentThread());
16                 return true;
17             }
18             return false;
19         }
20 
21         // 獨佔式釋放鎖(重寫tryRelease) Release the lock by setting state to zero
22         protected boolean tryRelease(int releases) {
23             assert releases == 1; // Otherwise unused
24             if (getState() == 0) //獲取狀態
25                 throw new IllegalMonitorStateException();
26             setExclusiveOwnerThread(null);
27             setState(0);    //設置狀態爲0
28             return true;
29         }
30        
31         // Provide a Condition
32         //每一個Condition都包含一個隊列
33         Condition newCondition() { return new ConditionObject(); }
34 
35         // Deserialize properly
36         private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
37             s.defaultReadObject();
38             setState(0); // reset to unlocked state
39         }
40     }
41 
42     // The sync object does all the hard work. We just forward to it.
43     private final Sync sync = new Sync();
44     
45     //僅須要將操做代理到sync
46     public void lock()                { sync.acquire(1); }    //調用AQS的模板方法,
47     public boolean tryLock()          { return sync.tryAcquire(1); }
48     public void unlock()              { sync.release(1); }
49     public Condition newCondition()   { return sync.newCondition(); }
50     public boolean isLocked()         { return sync.isHeldExclusively(); }
51     public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
52     public void lockInterruptibly() throws InterruptedException { 
53         sync.acquireInterruptibly(1);
54     }
55     public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
56         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
57     }
58 }

 

  自定義同步組件實例二:鎖存器(使用共享的獲取與釋放方法)

    BooleanLatch可用在多個線程須要等待某個事件發生才能繼續執行的狀況中。初始狀態state=0, 此時全部線程獲取同步狀態方法tryAcquireShared返回-1,即獲取失敗,入等待隊列。直到有線程調用tryReleaseShared釋放同步狀態,被阻塞的狀態纔會進行執行。

 1 class BooleanLatch {
 2 
 3     private static class Sync extends AbstractQueuedSynchronizer {
 4         boolean isSignalled() { return getState() != 0; }
 5 
 6         protected int tryAcquireShared(int ignore) {
 7             return isSignalled()? 1 : -1;
 8         }
 9         
10         protected boolean tryReleaseShared(int ignore) {
11             setState(1);
12             return true;
13         }
14     }
15 
16     private final Sync sync = new Sync();
17     public boolean isSignalled() { return sync.isSignalled(); }
18     public void signal()         { sync.releaseShared(1); }
19     public void await() throws InterruptedException {
20         sync.acquireSharedInterruptibly(1);
21     }
22 }

 

AQS的實現原理

  同步隊列

    同步隊列中的節點Node用來保存獲取同步狀態失敗的線程引用、等待狀態、以及前驅和後繼節點。

    AQS中兩個域:head節點和tail節點,組成一個FIFO的雙向隊列。

      private transient volatile Node head;

      private transient volatile Node tail;

    Node源碼以下。

 1 static final class Node {
 2     /** Marker to indicate a node is waiting in shared mode */
 3     static final Node SHARED = new Node();    //共享方式
 4     /** Marker to indicate a node is waiting in exclusive mode */
 5     static final Node EXCLUSIVE = null;        //獨佔方式
 6 
 7     /** waitStatus value to indicate thread has cancelled */
 8     static final int CANCELLED =  1;    //waitStatus=1爲取消狀態
 9     /** waitStatus value to indicate successor's thread needs unparking */
10     static final int SIGNAL    = -1;    //後繼節點的線程處於等待狀態,須要被喚醒。
11     /** waitStatus value to indicate thread is waiting on condition */
12     static final int CONDITION = -2;    //當前線程在condition上等待
13     static final int PROPAGATE = -3;    //表示下一次共享式同步狀態獲取將會無條件的被傳播下去。
14 
15     volatile int waitStatus;    //等待狀態,0-初始狀態
16     volatile Node prev;            //前驅節點
17     volatile Node next;            //後繼節點
18     volatile Thread thread;        //獲取同步的線程
19     Node nextWaiter;            
20 
21     final boolean isShared() {
22         return nextWaiter == SHARED;
23     }
24     //返回前驅節點
25     final Node predecessor() throws NullPointerException {
26         Node p = prev;
27         if (p == null)
28             throw new NullPointerException();
29         else
30             return p;
31     }
32 
33     Node() {    // Used to establish initial head or SHARED marker
34     }
35 
36     Node(Thread thread, Node mode) {     // Used by addWaiter
37         this.nextWaiter = mode;
38         this.thread = thread;
39     }
40 
41     Node(Thread thread, int waitStatus) { // Used by Condition
42         this.waitStatus = waitStatus;
43         this.thread = thread;
44     }
45 }

 

  獨佔式獲取同步狀態

    1)調用自定義的tryAcquire方法,該方法要保證線程線程安全的獲取同步狀態(如Mutex中的tryAcquire使用CAS更新保證原子性),若是獲取成功則return。

    2)若是獲取失敗,構造Node,並經過addWaiter方法將節點插入隊列尾部。Node.EXCLUSIVE表示節點以獨佔方式等待。

    3)acquireQueued方法中該節點自旋方式嘗試獲取同步狀態。若是獲取不到同步狀態,則阻塞節點中的線程,被阻塞線程喚醒依靠前驅節點的出隊或阻塞線程被終端來實現。

 1 //該方法對中斷不敏感,也就是因爲線程獲取同步狀態失敗後進入同步隊列中,後續對線程進行中斷操做時,線程不會從同步隊列中移出.
 2 public final void acquire(int arg) {
 3     if (!tryAcquire(arg) &&
 4         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 5         selfInterrupt();
 6 }
 7 
 8 //將節點添加到隊尾
 9 private Node addWaiter(Node mode) {
10     Node node = new Node(Thread.currentThread(), mode);
11     // Try the fast path of enq; backup to full enq on failure
12     //快速嘗試入隊,若是失敗則須要調用enq(node)方法入隊,這樣作有什麼好處?有必定機率減小一次方法調用
13     //compareAndSetTail保證Node入隊是線程安全的
14     Node pred = tail;
15     if (pred != null) {
16         node.prev = pred;
17         if (compareAndSetTail(pred, node)) {
18             pred.next = node;
19             return node;
20         }
21     }
22     enq(node);
23     return node;
24 }
25 
26 //初始化或自旋CAS直到入隊成功
27 private Node enq(final Node node) {
28     for (;;) {
29         Node t = tail;
30         if (t == null) { // Must initialize
31             if (compareAndSetHead(new Node()))
32                 tail = head;
33         } else {
34             node.prev = t;
35             if (compareAndSetTail(t, node)) {
36                 t.next = node;
37                 return t;
38             }
39         }
40     }
41 }

 

    入同步隊列以後怎麼獲取同步狀態?阻塞機制是怎樣的?

 1 //自旋方式嘗試獲取同步狀態
 2 final boolean acquireQueued(final Node node, int arg) {
 3     boolean failed = true;
 4     try {
 5         boolean interrupted = false;
 6         for (;;) {
 7             final Node p = node.predecessor();    //獲取當前節點的前驅節點
 8             if (p == head && tryAcquire(arg)) {    //若是前驅節點是head節點則嘗試獲取同步狀態
 9                 setHead(node);
10                 p.next = null; // help GC
11                 failed = false;
12                 return interrupted;
13             }
14             if (shouldParkAfterFailedAcquire(p, node) &&
15                 parkAndCheckInterrupt())    //判斷當前線程是否應該被阻塞,若是是則阻塞直到被喚醒繼續循環,若是不是則再次嘗試獲取同步狀態。
16                 interrupted = true;
17         }
18     } finally {
19         if (failed)
20             cancelAcquire(node);
21     }
22 }
23 
24 //判斷當前線程是否應該被阻塞,若是線程應該被阻塞則返回true。檢查和更新獲取同步狀態失敗Node的前驅節點的waitStatus。
25 //其中pred爲node的前驅節點
26 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
27     int ws = pred.waitStatus;
28     if (ws == Node.SIGNAL)
29         /*
30          * This node has already set status asking a release
31          * to signal it, so it can safely park.
32          */
33         //前驅節點已經設置爲SIGNAL狀態,在前驅節點的線程釋放同步狀態會喚醒當前Node的線程。
34         return true;
35     if (ws > 0) {
36         /*
37          * Predecessor was cancelled. Skip over predecessors and
38          * indicate retry.
39          */
40          //前驅節點是cancelled狀態,跳過被取消的Node,直到向前找到waitStatus > 0的Node做爲當前節點的前驅,而後重試獲取同步狀態。
41         do {
42             node.prev = pred = pred.prev;
43         } while (pred.waitStatus > 0);
44         pred.next = node;
45     } else {
46         /* waitStatus = 0是初始狀態。
47          * waitStatus must be 0 or PROPAGATE.  Indicate that we
48          * need a signal, but don't park yet.  Caller will need to
49          * retry to make sure it cannot acquire before parking.
50          */
51         //將前驅節點的等待狀態改成SIGNAL。
52         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
53     }
54     return false;
55 }
56 
57 //阻塞線程
58 private final boolean parkAndCheckInterrupt() {
59     LockSupport.park(this);    //使用LockSupport阻塞當前線程
60     return Thread.interrupted();
61 }

 

    在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。

  獨佔式釋放同步狀態

    在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點。

 1 public final boolean release(int arg) {
 2     if (tryRelease(arg)) {
 3         Node h = head;
 4         if (h != null && h.waitStatus != 0)
 5             unparkSuccessor(h);    //喚醒後繼節點線程
 6         return true;
 7     }
 8     return false;
 9 }
10 
11 //喚醒後繼節點
12 private void unparkSuccessor(Node node) {
13     /*
14      * If status is negative (i.e., possibly needing signal) try
15      * to clear in anticipation of signalling.  It is OK if this
16      * fails or if status is changed by waiting thread.
17      */
18      //將當前節點的waitStatus改成0-原始狀態,目的是什麼?
19     int ws = node.waitStatus;
20     if (ws < 0)
21         compareAndSetWaitStatus(node, ws, 0);
22 
23     /*
24      * Thread to unpark is held in successor, which is normally
25      * just the next node.  But if cancelled or apparently null,
26      * traverse backwards from tail to find the actual
27      * non-cancelled successor.
28      */
29      //若是後繼節點爲null或被取消,則從tail向前找到正常的後繼節點
30     Node s = node.next;
31     if (s == null || s.waitStatus > 0) {
32         s = null;
33         for (Node t = tail; t != null && t != node; t = t.prev)
34             if (t.waitStatus <= 0)
35                 s = t;
36     }
37     if (s != null)
38         LockSupport.unpark(s.thread);    //喚醒後繼節點
39 }

 

  共享式獲取同步狀態

    1)首先調用自定義方法tryAcquireShared嘗試獲取同步狀態,至少調用一次tryAcquireShared方法,若是返回值>=0,則獲取成功,return;不然執行步驟2),

    2)當獲取失敗時,爲當前線程以共享方式建立Node並插入同步隊列。

    3)入隊後,以自旋方式嘗試獲取同步狀態,若是前驅節點爲head節點,則嘗試獲取同步狀態,獲取失敗,則阻塞線程。

 1 //共享式獲取同步狀態,忽略異常。
 2 //注意:實現自定義方法tryAcquireShared時,要遵循AQS定義的返回值語義,負數表明獲取失敗、0表明獲取成功但沒有剩餘資源、正數表明獲取成功,還有剩餘資源。
 3 public final void acquireShared(int arg) {
 4     if (tryAcquireShared(arg) < 0)    //
 5         doAcquireShared(arg);
 6 }
 7 
 8 private void doAcquireShared(int arg) {
 9     final Node node = addWaiter(Node.SHARED);    //爲當前線程以共享方式建立Node並插入同步隊列尾部。
10     boolean failed = true;
11     try {
12         boolean interrupted = false;
13         for (;;) {
14             final Node p = node.predecessor();
15             if (p == head) {    //若是前驅節點爲head節點,則嘗試獲取同步狀態
16                 int r = tryAcquireShared(arg);
17                 if (r >= 0) {    //獲取成功
18                     setHeadAndPropagate(node, r);    //設置node爲head節點,還有剩餘資源則繼續喚醒後繼的node
19                     p.next = null; // help GC
20                     if (interrupted)    //若是等待過程當中被中斷過,則中斷當前線程
21                         selfInterrupt();    //Thread.currentThread().interrupt();
22                     failed = false;
23                     return;
24                 }
25             }
26             //shouldParkAfterFailedAcquire方法判斷當前線程是否應該被阻塞,若是是則調用parkAndCheckInterrupt阻塞當前線程
27             if (shouldParkAfterFailedAcquire(p, node) &&
28                 parkAndCheckInterrupt())
29                 interrupted = true;
30         }
31     } finally {
32         if (failed)
33             cancelAcquire(node);
34     }
35 }
36 
37 //獲取成功後,設置node爲head節點,
38 private void setHeadAndPropagate(Node node, int propagate) {
39     Node h = head; // Record old head for check below
40     setHead(node);    //設置node爲head節點,所以node出隊
41     //若是還有剩餘資源,嘗試喚醒node節點的後繼節點
42     /*
43      * Try to signal next queued node if:
44      *   Propagation was indicated by caller,
45      *     or was recorded (as h.waitStatus) by a previous operation
46      *     (note: this uses sign-check of waitStatus because
47      *      PROPAGATE status may transition to SIGNAL.)
48      * and
49      *   The next node is waiting in shared mode,
50      *     or we don't know, because it appears null
51      *
52      * The conservatism in both of these checks may cause
53      * unnecessary wake-ups, but only when there are multiple
54      * racing acquires/releases, so most need signals now or soon
55      * anyway.
56      */
57     if (propagate > 0 || h == null || h.waitStatus < 0) {
58         Node s = node.next;
59         if (s == null || s.isShared())
60             doReleaseShared();
61     }
62 }
63 
64 //設置node爲head節點,所以node出隊
65 private void setHead(Node node) {
66     head = node;
67     node.thread = null;
68     node.prev = null;
69 }

 

  共享式釋放同步狀態

 1 public final boolean releaseShared(int arg) {
 2     if (tryReleaseShared(arg)) {
 3         doReleaseShared();
 4         return true;
 5     }
 6     return false;
 7 }
 8 
 9 //喚醒後繼節點線程並確保被傳播
10 private void doReleaseShared() {
11     /*
12      * Ensure that a release propagates, even if there are other
13      * in-progress acquires/releases.  This proceeds in the usual
14      * way of trying to unparkSuccessor of head if it needs
15      * signal. But if it does not, status is set to PROPAGATE to
16      * ensure that upon release, propagation continues.
17      * Additionally, we must loop in case a new node is added
18      * while we are doing this. Also, unlike other uses of
19      * unparkSuccessor, we need to know if CAS to reset status
20      * fails, if so rechecking.
21      */
22     for (;;) {
23         Node h = head;
24         if (h != null && h != tail) {
25             int ws = h.waitStatus;
26             if (ws == Node.SIGNAL) {
27                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
28                     continue;            // loop to recheck cases
29                 unparkSuccessor(h);    //喚醒後繼節點
30             }
31             else if (ws == 0 &&
32                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
33                 continue;                // loop on failed CAS
34         }
35         if (h == head)                   // loop if head changed
36             break;
37     }
38 }

 

  獨佔式可中斷獲取

    可響應中斷獲取與普通獲取的區別:當線程被中斷時,會當即返回,並拋出InterruptedException,執行cancelAcquire方法取消獲取同步狀態;而普通獲取只是將中斷標誌位置爲true,但線程依舊會阻塞在等待隊列中。

 1 public final void acquireInterruptibly(int arg)
 2         throws InterruptedException {
 3     if (Thread.interrupted())
 4         throw new InterruptedException();    //拋出中斷異常
 5     if (!tryAcquire(arg))
 6         doAcquireInterruptibly(arg);
 7 }
 8 
 9 private void doAcquireInterruptibly(int arg)
10     throws InterruptedException {
11     final Node node = addWaiter(Node.EXCLUSIVE);
12     boolean failed = true;
13     try {
14         for (;;) {
15             final Node p = node.predecessor();
16             if (p == head && tryAcquire(arg)) {
17                 setHead(node);
18                 p.next = null; // help GC
19                 failed = false;
20                 return;
21             }
22             if (shouldParkAfterFailedAcquire(p, node) &&
23                 parkAndCheckInterrupt())
24                 throw new InterruptedException();    //惟一的區別,拋出中斷異常。而普通的獲取操做,只是將中斷標誌位置爲true。
25         }
26     } finally {
27         if (failed)
28             cancelAcquire(node);
29     }
30 }

 

    如何取消線程?

 1 private void cancelAcquire(Node node) {
 2     // Ignore if node doesn't exist
 3     if (node == null)
 4         return;
 5 
 6     node.thread = null;    //將線程置爲null
 7 
 8     // Skip cancelled predecessors
 9     Node pred = node.prev;
10     while (pred.waitStatus > 0)    //即waitStatus=1,爲cancelled狀態。跳過狀態爲取消的前驅節點
11         node.prev = pred = pred.prev;
12 
13     // predNext is the apparent node to unsplice. CASes below will
14     // fail if not, in which case, we lost race vs another cancel
15     // or signal, so no further action is necessary.
16     Node predNext = pred.next;
17 
18     // Can use unconditional write instead of CAS here.
19     // After this atomic step, other Nodes can skip past us.
20     // Before, we are free of interference from other threads.
21     node.waitStatus = Node.CANCELLED;    //當前節點置爲cancelled狀態
22 
23     // If we are the tail, remove ourselves.
24     if (node == tail && compareAndSetTail(node, pred)) {    //若是當前節點爲tail節點,刪除該節點
25         compareAndSetNext(pred, predNext, null);
26     } else {
27         // If successor needs signal, try to set pred's next-link
28         // so it will get one. Otherwise wake it up to propagate.
29         //
30         int ws;
31         if (pred != head &&
32             ((ws = pred.waitStatus) == Node.SIGNAL ||
33              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
34             pred.thread != null) {
35             //若是前驅節點waitStatus爲SIGNAL或者CAS更新爲SIGNAL成功,則pred釋放同步狀態時會通知後繼節點
36             //而且pred.thread不爲null,則cas將pred的後繼節點置爲node.next,
37             Node next = node.next;
38             if (next != null && next.waitStatus <= 0)
39                 compareAndSetNext(pred, predNext, next);
40         } else {
41             unparkSuccessor(node);    //喚醒後繼節點
42         }
43 
44         node.next = node; // help GC next指向本身,node從等待隊列中移除。
45     }
46 }

 

  獨佔式可超時獲取

    在響應中斷的基礎上增長了超時機制

 1 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
 2         throws InterruptedException {
 3     if (Thread.interrupted())
 4         throw new InterruptedException();
 5     return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
 6 }
 7 
 8 private boolean doAcquireNanos(int arg, long nanosTimeout)
 9     throws InterruptedException {
10     long lastTime = System.nanoTime();
11     final Node node = addWaiter(Node.EXCLUSIVE);
12     boolean failed = true;
13     try {
14         for (;;) {
15             final Node p = node.predecessor();
16             if (p == head && tryAcquire(arg)) {
17                 setHead(node);
18                 p.next = null; // help GC
19                 failed = false;
20                 return true;
21             }
22             if (nanosTimeout <= 0)    //判斷是否超時,nanosTimeout<=0表示超時,若是超時則return false.
23                 return false;
24             if (shouldParkAfterFailedAcquire(p, node) &&
25                 nanosTimeout > spinForTimeoutThreshold)
26                 LockSupport.parkNanos(this, nanosTimeout);    //若是還剩餘的時間nanosTimeout>閾值(spinForTimeoutThreshold=1000納秒),則阻塞當前線程nanosTimeout納秒。當nanosTimeout<1000納秒時,則不阻塞當前線程,而是進入快速的自旋過程。緣由在於,很是短的超時等待沒法作到十分精確,若是這時再進行超時等待,相反會讓nanosTimeout的超時從總體上表現得反而不精確。所以,在超時很是短的場景下,同步器會進入無條件的快速自旋。
27             long now = System.nanoTime();    //當前時間
28             //lastTime表示上次喚醒時間,now - lastTime表示已經睡眠的時間
29             nanosTimeout -= now - lastTime;    //nanosTimeout表示還剩餘的時間,nanosTimeout>0表示表示超時時間未到
30             lastTime = now;
31             if (Thread.interrupted())
32                 throw new InterruptedException();    //拋異常
33         }
34     } finally {
35         if (failed)
36             cancelAcquire(node);
37     }
38 }

 

  總結,三種方式獲取同步狀態方式的對比,主要區別在於獲取同步狀態失敗時的處理邏輯:

 

    acquire方法直接阻塞線程,不響應中斷,只是將中斷標記置爲true,但線程依舊會阻塞在等待隊列中。

    acquireInterruptibly方法直接阻塞線程,響應中斷,當線程被中斷時,會當即返回,並拋出InterruptedException。

    tryAcquireNanos方法將線程阻塞nanosTimeout秒,如何超時還未獲取到同步狀態,則返回。同時支持響應中斷。

1 public final void acquire(int arg)    //獨佔式獲取同步狀態,若是不成功會進入同步隊列等待。
2 public final void acquireInterruptibly(int arg)    //與acquire不一樣的是,能響應中斷
3 public final boolean tryAcquireNanos(int arg, long nanosTimeout)    //增長超時機制

 

LockSupport

  在阻塞和喚醒線程時,使用了LockSupport類。LockSupport提供了一系列阻塞和喚醒線程的公共方法。底層使用unsafe提供的方法實現。

 1 void park()    //阻塞當前線程,只有調用unpark或中斷才能從park方法中返回
 2 void parkNanos(long nanos)    //超時阻塞當前線程,超時則返回
 3 void parkUntil(long deadline)    //截止時間阻塞當前線程,直到deadline
 4 
 5 void unpark(Thread thread)    //喚醒處於阻塞狀態的線程
 6 
 7 //jdk1.6中增長了如下方法,blocker表示當前線程要等待的對象。線程dump信息比使用park方法要多,方便問題排查和監控。
 8 void park(Object blocker)
 9 void parkNanos(Object blocker, long nanos)
10 void parkUntil(Object blocker, long deadline)

 

 

參考資料:

  《Java併發編程的藝術》

  《Java併發之AQS詳解》http://www.cnblogs.com/waterystone/p/4920797.html

相關文章
相關標籤/搜索