CAS和AQS

1、CAS

CAS(Compare And Swap),即比較並交換。是解決多線程並行狀況下使用鎖形成性能損耗的一種機制,CAS操做包含三個操做數——內存位置(V)、預期原值(A)和新值(B)。若是內存位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新爲新值。不然,處理器不作任何操做。不管哪一種狀況,它都會在CAS指令以前返回該位置的值。CAS有效地說明了「我認爲位置V應該包含值A;若是包含該值,則將B放到這個位置;不然,不要更改該位置,只告訴我這個位置如今的值便可。html

 

CAS典型應用

 

java.util.concurrent.atomic 包下的類大可能是使用CAS操做來實現的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong)。下面以 AtomicInteger.java 的部分實現來大體講解下這些原子類的實現。java

 1 public class AtomicInteger extends Number implements java.io.Serializable {
 2     private static final long serialVersionUID = 6214790243416807050L;
 3  
 4     // setup to use Unsafe.compareAndSwapInt for updates
 5     private static final Unsafe unsafe = Unsafe.getUnsafe();
 6  
 7     private volatile int value;// 初始int大小
 8     // 省略了部分代碼...
 9  
10     // 帶參數構造函數,可設置初始int大小
11     public AtomicInteger(int initialValue) {
12         value = initialValue;
13     }
14     // 不帶參數構造函數,初始int大小爲0
15     public AtomicInteger() {
16     }
17  
18     // 獲取當前值
19     public final int get() {
20         return value;
21     }
22  
23     // 設置值爲 newValue
24     public final void set(int newValue) {
25         value = newValue;
26     }
27  
28     //返回舊值,並設置新值爲 newValue
29     public final int getAndSet(int newValue) {
30         /**
31         * 這裏使用for循環不斷經過CAS操做來設置新值
32         * CAS實現和加鎖實現的關係有點相似樂觀鎖和悲觀鎖的關係
33         * */
34         for (;;) {
35             int current = get();
36             if (compareAndSet(current, newValue))
37                 return current;
38         }
39     }
40  
41     // 原子的設置新值爲update, expect爲指望的當前的值
42     public final boolean compareAndSet(int expect, int update) {
43         return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
44     }
45  
46     // 獲取當前值current,並設置新值爲current+1
47     public final int getAndIncrement() {
48         for (;;) {
49             int current = get();
50             int next = current + 1;
51             if (compareAndSet(current, next))
52                 return current;
53         }
54     }
55  
56     // 此處省略部分代碼,餘下的代碼大體實現原理都是相似的
57 }
View Code

 

2、AQS多線程

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用於實現基於FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計爲做爲一些可用原子int值來表示狀態的同步器的基類。若是你有看過相似 CountDownLatch 類的源碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer 的內部類 Sync框架

 

http://www.javashuo.com/article/p-xmimshdu-bh.html

AQS用法

如上所述,AQS管理一個關於狀態信息的單一整數,該整數能夠表現任何狀態。好比, Semaphore 用它來表現剩餘的許可數,ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀態(還沒有開始、運行、完成和取消)ide

框架

  它維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這裏volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:函數

  • getState()
  • setState()
  • compareAndSetState()

  AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。性能

  不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:ui

  • isHeldExclusively():該線程是否正在獨佔資源。只有用到condition才須要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。

  以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。this

  再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。atom

  通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

下面以 CountDownLatch 舉例說明基於AQS實現同步器, CountDownLatch 用同步狀態持有當前計數,countDown方法調用 release從而致使計數器遞減;當計數器爲0時,解除全部線程的等待;await調用acquire,若是計數器爲0,acquire 會當即返回,不然阻塞。一般用於某任務須要等待其餘任務都完成後才能繼續執行的情景。源碼以下:

 1 public class CountDownLatch {
 2     /**
 3      * 基於AQS的內部Sync
 4      * 使用AQS的state來表示計數count.
 5      */
 6     private static final class Sync extends AbstractQueuedSynchronizer {
 7         private static final long serialVersionUID = 4982264981922014374L;
 8  
 9         Sync(int count) {
10             // 使用AQS的getState()方法設置狀態
11             setState(count);
12         }
13  
14         int getCount() {
15             // 使用AQS的getState()方法獲取狀態
16             return getState();
17         }
18  
19         // 覆蓋在共享模式下嘗試獲取鎖
20         protected int tryAcquireShared(int acquires) {
21             // 這裏用狀態state是否爲0來表示是否成功,爲0的時候能夠獲取到返回1,不然不能夠返回-1
22             return (getState() == 0) ? 1 : -1;
23         }
24  
25         // 覆蓋在共享模式下嘗試釋放鎖
26         protected boolean tryReleaseShared(int releases) {
27             // 在for循環中Decrement count直至成功;
28             // 當狀態值即count爲0的時候,返回false表示 signal when transition to zero
29             for (;;) {
30                 int c = getState();
31                 if (c == 0)
32                     return false;
33                 int nextc = c-1;
34                 if (compareAndSetState(c, nextc))
35                     return nextc == 0;
36             }
37         }
38     }
39  
40     private final Sync sync;
41  
42     // 使用給定計數值構造CountDownLatch
43     public CountDownLatch(int count) {
44         if (count < 0) throw new IllegalArgumentException("count < 0");
45         this.sync = new Sync(count);
46     }
47  
48     // 讓當前線程阻塞直到計數count變爲0,或者線程被中斷
49     public void await() throws InterruptedException {
50         sync.acquireSharedInterruptibly(1);
51     }
52  
53     // 阻塞當前線程,除非count變爲0或者等待了timeout的時間。當count變爲0時,返回true
54     public boolean await(long timeout, TimeUnit unit)
55         throws InterruptedException {
56         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
57     }
58  
59     // count遞減
60     public void countDown() {
61         sync.releaseShared(1);
62     }
63  
64     // 獲取當前count值
65     public long getCount() {
66         return sync.getCount();
67     }
68  
69     public String toString() {
70         return super.toString() + "[Count = " + sync.getCount() + "]";
71     }
72 }
View Code
相關文章
相關標籤/搜索