CAS(Compare And Swap),即比較並交換。是解決多線程並行狀況下使用鎖形成性能損耗的一種機制,CAS操做包含三個操做數——內存位置(V)、預期原值(A)和新值(B)。若是內存位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新爲新值。不然,處理器不作任何操做。不管哪一種狀況,它都會在CAS指令以前返回該位置的值。CAS有效地說明了「我認爲位置V應該包含值A;若是包含該值,則將B放到這個位置;不然,不要更改該位置,只告訴我這個位置如今的值便可。html
java.util.concurrent.atomic
包下的類大可能是使用CAS操做來實現的(eg. AtomicInteger.java
,AtomicBoolean
,AtomicLong
)。下面以 AtomicInteger.java
的部分實現來大體講解下這些原子類的實現。java
1 public class AtomicInteger extends Number implements java.io.Serializable { 2 private static final long serialVersionUID = 6214790243416807050L; 3 4 // setup to use Unsafe.compareAndSwapInt for updates 5 private static final Unsafe unsafe = Unsafe.getUnsafe(); 6 7 private volatile int value;// 初始int大小 8 // 省略了部分代碼... 9 10 // 帶參數構造函數,可設置初始int大小 11 public AtomicInteger(int initialValue) { 12 value = initialValue; 13 } 14 // 不帶參數構造函數,初始int大小爲0 15 public AtomicInteger() { 16 } 17 18 // 獲取當前值 19 public final int get() { 20 return value; 21 } 22 23 // 設置值爲 newValue 24 public final void set(int newValue) { 25 value = newValue; 26 } 27 28 //返回舊值,並設置新值爲 newValue 29 public final int getAndSet(int newValue) { 30 /** 31 * 這裏使用for循環不斷經過CAS操做來設置新值 32 * CAS實現和加鎖實現的關係有點相似樂觀鎖和悲觀鎖的關係 33 * */ 34 for (;;) { 35 int current = get(); 36 if (compareAndSet(current, newValue)) 37 return current; 38 } 39 } 40 41 // 原子的設置新值爲update, expect爲指望的當前的值 42 public final boolean compareAndSet(int expect, int update) { 43 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 44 } 45 46 // 獲取當前值current,並設置新值爲current+1 47 public final int getAndIncrement() { 48 for (;;) { 49 int current = get(); 50 int next = current + 1; 51 if (compareAndSet(current, next)) 52 return current; 53 } 54 } 55 56 // 此處省略部分代碼,餘下的代碼大體實現原理都是相似的 57 }
2、AQS多線程
AQS(AbstractQueuedSynchronizer
),AQS是JDK下提供的一套用於實現基於FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計爲做爲一些可用原子int值來表示狀態的同步器的基類。若是你有看過相似 CountDownLatch
類的源碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer
的內部類 Sync
。框架
如上所述,AQS管理一個關於狀態信息的單一整數,該整數能夠表現任何狀態。好比, Semaphore
用它來表現剩餘的許可數,ReentrantLock
用它來表現擁有它的線程已經請求了多少次鎖;FutureTask
用它來表現任務的狀態(還沒有開始、運行、完成和取消)ide
它維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這裏volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:函數
AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。性能
不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現如下幾種方法:ui
以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。this
再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。atom
通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種便可。但AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。
下面以 CountDownLatch
舉例說明基於AQS實現同步器, CountDownLatch
用同步狀態持有當前計數,countDown
方法調用 release從而致使計數器遞減;當計數器爲0時,解除全部線程的等待;await
調用acquire,若是計數器爲0,acquire
會當即返回,不然阻塞。一般用於某任務須要等待其餘任務都完成後才能繼續執行的情景。源碼以下:
1 public class CountDownLatch { 2 /** 3 * 基於AQS的內部Sync 4 * 使用AQS的state來表示計數count. 5 */ 6 private static final class Sync extends AbstractQueuedSynchronizer { 7 private static final long serialVersionUID = 4982264981922014374L; 8 9 Sync(int count) { 10 // 使用AQS的getState()方法設置狀態 11 setState(count); 12 } 13 14 int getCount() { 15 // 使用AQS的getState()方法獲取狀態 16 return getState(); 17 } 18 19 // 覆蓋在共享模式下嘗試獲取鎖 20 protected int tryAcquireShared(int acquires) { 21 // 這裏用狀態state是否爲0來表示是否成功,爲0的時候能夠獲取到返回1,不然不能夠返回-1 22 return (getState() == 0) ? 1 : -1; 23 } 24 25 // 覆蓋在共享模式下嘗試釋放鎖 26 protected boolean tryReleaseShared(int releases) { 27 // 在for循環中Decrement count直至成功; 28 // 當狀態值即count爲0的時候,返回false表示 signal when transition to zero 29 for (;;) { 30 int c = getState(); 31 if (c == 0) 32 return false; 33 int nextc = c-1; 34 if (compareAndSetState(c, nextc)) 35 return nextc == 0; 36 } 37 } 38 } 39 40 private final Sync sync; 41 42 // 使用給定計數值構造CountDownLatch 43 public CountDownLatch(int count) { 44 if (count < 0) throw new IllegalArgumentException("count < 0"); 45 this.sync = new Sync(count); 46 } 47 48 // 讓當前線程阻塞直到計數count變爲0,或者線程被中斷 49 public void await() throws InterruptedException { 50 sync.acquireSharedInterruptibly(1); 51 } 52 53 // 阻塞當前線程,除非count變爲0或者等待了timeout的時間。當count變爲0時,返回true 54 public boolean await(long timeout, TimeUnit unit) 55 throws InterruptedException { 56 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 57 } 58 59 // count遞減 60 public void countDown() { 61 sync.releaseShared(1); 62 } 63 64 // 獲取當前count值 65 public long getCount() { 66 return sync.getCount(); 67 } 68 69 public String toString() { 70 return super.toString() + "[Count = " + sync.getCount() + "]"; 71 } 72 }