CAS機制

 

#####################################################################java

咱們知道多線程操做共享資源時,會出現三個問題:可見性、有序性以及原子性。安全

通常狀況下,咱們採用synchronized同步鎖(獨佔鎖、互斥鎖),即同一時間只有一個線程可以修改共享變量,其餘線程必須等待。可是這樣的話就至關於單線程,體現不出來多線程的優點。多線程

那麼咱們有沒有另外一種方式來解決這三個問題呢?併發

在上一章中,咱們提到了一個volatile關鍵字,它能夠解決可見性和有序性的問題。並且若是操做的共享變量是基本數據類型,而且同一時間只對變量進行讀取或者寫入的操做,那麼原子性問題也獲得瞭解決,就不會產生多線程問題了。框架

可是一般,咱們都要先讀取共享變量,而後操做共享變量,最後寫入共享變量,那麼這個時候怎麼保證整個操做的原子性呢?一種解決方式就是CAS技術。
CAS(Compare and Swap)即比較並交換。在講解這個以前,先了解兩個重要概念:悲觀鎖與樂觀鎖。ide

一. 悲觀鎖與樂觀鎖

  1. 悲觀鎖: 假定會發生併發衝突,即共享資源會被某個線程更改。因此當某個線程獲取共享資源時,會阻止別的線程獲取共享資源。也稱獨佔鎖或者互斥鎖,例如java中的synchronized同步鎖。
  2. 樂觀鎖: 假設不會發生併發衝突,只有在最後更新共享資源的時候會判斷一下在此期間有沒有別的線程修改了這個共享資源。若是發生衝突就重試,直到沒有衝突,更新成功。CAS就是一種樂觀鎖實現方式。

悲觀鎖會阻塞其餘線程。樂觀鎖不會阻塞其餘線程,若是發生衝突,採用死循環的方式一直重試,直到更新成功。函數

二. CAS的實現原理

CAS的原理很簡單,包含三個值當前內存值(V)、預期原來的值(A)以及期待更新的值(B)。ui

若是內存位置V的值與預期原值A相匹配,那麼處理器會自動將該位置值更新爲新值B,返回true。不然處理器不作任何操做,返回false。this

實現CAS最重要的一點,就是比較和交換操做的一致性,不然就會產生歧義。編碼

好比當前線程比較成功後,準備更新共享變量值的時候,這個共享變量值被其餘線程更改了,那麼CAS函數必須返回false。

要實現這個需求,java中提供了Unsafe類,它提供了三個函數,分別用來操做基本類型int和long,以及引用類型Object

public final native boolean compareAndSwapObject (Object obj, long valueOffset, Object expect, Object update); public final native boolean compareAndSwapInt (Object obj, long valueOffset, int expect, int update); public final native boolean compareAndSwapLong (Object obj, long valueOffset, long expect, long update); 

參數的意義:

  1. obj 和 valueOffset:表示這個共享變量的內存地址。這個共享變量是obj對象的一個成員屬性,valueOffset表示這個共享變量在obj類中的內存偏移量。因此經過這兩個參數就能夠直接在內存中修改和讀取共享變量值。
  2. expect: 表示預期原來的值。
  3. update: 表示期待更新的值。

接下來咱們來看看java併發框架下的atomic包是如何使用CAS的。

三. JUC併發框架下的原子類(atomic)

調用JUC併發框架下原子類的方法時,不須要考慮多線程問題。那麼咱們分析它是怎麼解決多線程問題的。以AtomicInteger類爲例

3.1 成員變量

// 經過它來實現CAS操做的。由於是int類型,因此調用它的compareAndSwapInt方法 private static final Unsafe unsafe = Unsafe.getUnsafe(); // value這個共享變量在AtomicInteger對象上內存偏移量, // 經過它直接在內存中修改value的值,compareAndSwapInt方法中須要這個參數 private static final long valueOffset; // 經過靜態代碼塊,在AtomicInteger類加載時就會調用 static { try { // 經過unsafe類,獲取value變量在AtomicInteger對象上內存偏移量 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } // 共享變量,AtomicInteger就保證了對它多線程操做的安全性。 // 使用volatile修飾,解決了可見性和有序性問題。 private volatile int value; 

有三個重要的屬性:

  1. unsafe: 經過它實現CAS操做,由於共享變量是int類型,因此調用compareAndSwapInt方法。
  2. valueOffset: 共享變量value在AtomicInteger對象上內存偏移量
  3. value: 共享變量,使用volatile修飾,解決了可見性和有序性問題。

3.2 重要方法

3.2.1 get與set方法

// 直接讀取。由於是volatile關鍵子修飾的,老是能看到(任意線程)對這個volatile變量最新的寫入 public final int get() { return value; } // 直接寫入。由於是volatile關鍵子修飾的,因此它修改value變量也會當即被別的線程讀取到。 public final void set(int newValue) { value = newValue; } 

由於value變量是volatile關鍵字修飾的,它老是能讀取(任意線程)對這個volatile變量最新的寫入。它修改value變量也會當即被別的線程讀取到。

3.2.2 compareAndSet方法

// 若是value變量的當前值(內存值)等於指望值(expect),那麼就把update賦值給value變量,返回true。 // 若是value變量的當前值(內存值)不等於指望值(expect),就什麼都不作,返回false。 // 這個就是CAS操做,使用unsafe.compareAndSwapInt方法,保證整個操做過程的原子性 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } 

經過調用unsafe的compareAndSwapInt方法實現CAS函數的。可是CAS函數只能保證比較並交換操做的原子性,可是更新操做並不必定會執行。好比咱們想讓共享變量value自增。
共享變量value自增是三個操做,1.讀取value值,2.計算value+1的值,3.將value+1的值賦值給value。分析這三個操做:

  1. 讀取value值,由於value變量是volatile關鍵字修飾的,可以讀取到任意線程對它最後一次修改的值,因此沒問題。
  2. 計算value+1的值:這個時候就有問題了,可能在計算這個值的時候,其餘線程更改了value值,由於沒有加同步鎖,因此其餘線程能夠更改value值。
  3. 將value+1的值賦值給value: 使用CAS函數,若是返回false,說明在當前線程讀取value值到調用CAS函數方法前,共享變量被其餘線程修改了,那麼value+1的結果值就不是咱們想要的了,由於要從新計算。

3.2.3 getAndAddInt方法

public final int getAndAddInt(Object obj, long valueOffset, int var) { int expect; // 利用循環,直到更新成功才跳出循環。 do { // 獲取value的最新值 expect = this.getIntVolatile(obj, valueOffset); // expect + var表示須要更新的值,若是compareAndSwapInt返回false,說明value值被其餘線程更改了。 // 那麼就循環重試,再次獲取value最新值expect,而後再計算須要更新的值expect + var。直到更新成功 } while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + var)); // 返回當前線程在更改value成功後的,value變量原先值。並非更改後的值 return expect; } 

這個方法在Unsafe類中,利用do_while循環,先利用當前值,計算更新值,而後經過compareAndSwapInt方法設置value變量,若是compareAndSwapInt方法返回失敗,表示value變量的值被別的線程更改了,因此循環獲取value變量最新值,再經過compareAndSwapInt方法設置value變量。直到設置成功。跳出循環,返回更新前的值。

// 將value的值當前值的基礎上加1,並返回當前值 public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } // 將value的值當前值的基礎上加-1,並返回當前值 public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } // 將value的值當前值的基礎上加delta,並返回當前值 public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } // 將value的值當前值的基礎上加1,並返回更新後的值(即當前值加1) public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } // 將value的值當前值的基礎上加-1,並返回更新後的值(即當前值加-1) public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } // 將value的值當前值的基礎上加delta,並返回更新後的值(即當前值加delta) public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } 

都是利用unsafe.getAndAddInt方法實現的。

四.重要示例

import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; class Data { AtomicInteger num; public Data(int num) { this.num = new AtomicInteger(num); } public int getAndDecrement() { return num.getAndDecrement(); } } class MyRun implements Runnable { private Data data; // 用來記錄全部賣出票的編號 private List<Integer> list; private CountDownLatch latch; public MyRun(Data data, List<Integer> list, CountDownLatch latch) { this.data = data; this.list = list; this.latch = latch; } @Override public void run() { try { action(); } finally { // 釋放latch共享鎖 latch.countDown(); } } // 進行買票操做,注意這裏沒有使用data.num>0做爲判斷條件,直到賣完線程退出。 // 那麼作會致使這兩處使用了共享變量data.num,那麼作多線程同步時,就要考慮更多條件。 // 這裏只for循環了5次,表示每一個線程只賣5張票,並將全部賣出去編號存入list集合中。 public void action() { for (int i = 0; i < 5; i++) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } int newNum = data.getAndDecrement(); System.out.println("線程"+Thread.currentThread().getName()+" num=="+newNum); list.add(newNum); } } } public class ThreadTest { public static void startThread(Data data, String name, List<Integer> list,CountDownLatch latch) { Thread t = new Thread(new MyRun(data, list, latch), name); t.start(); } public static void main(String[] args) { // 使用CountDownLatch來讓主線程等待子線程都執行完畢時,才結束 CountDownLatch latch = new CountDownLatch(6); long start = System.currentTimeMillis(); // 這裏用併發list集合 List<Integer> list = new CopyOnWriteArrayList(); Data data = new Data(30); startThread(data, "t1", list, latch); startThread(data, "t2", list, latch); startThread(data, "t3", list, latch); startThread(data, "t4", list, latch); startThread(data, "t5", list, latch); startThread(data, "t6", list, latch); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 處理一下list集合,進行排序和翻轉 Collections.sort(list); Collections.reverse(list); System.out.println(list); long time = System.currentTimeMillis() - start; // 輸出一共花費的時間 System.out.println("\n主線程結束 time=="+time); } } 

結果輸出

線程t1 num==30 線程t2 num==29 線程t3 num==28 線程t5 num==26 線程t4 num==27 線程t6 num==25 線程t1 num==24 線程t2 num==23 線程t6 num==22 線程t4 num==19 線程t5 num==20 線程t3 num==21 線程t2 num==18 線程t3 num==13 線程t5 num==14 線程t1 num==15 線程t6 num==17 線程t4 num==16 線程t2 num==12 線程t1 num==9 線程t5 num==10 線程t3 num==11 線程t4 num==7 線程t6 num==8 線程t5 num==5 線程t4 num==4 線程t1 num==6 線程t2 num==3 線程t3 num==2 線程t6 num==1 [30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1] 主線程結束 time==58

咱們使用AtomicInteger,代替同步鎖來解決多線程安全的。

######################################################################

一、舉例說明。

 

假設如今線程A和線程B同時執行getAndAdd操做:

1.AtomicInteger裏面的value原始值爲3,即主內存中AtomicInteger的value爲3,根據Java內存模型,線程A和線程B各自持有一份value的副本,值爲3。
2.線程A經過getIntVolatile(var1, var2)方法獲取到value值3,線程切換,線程A掛起。
3.線程B經過getIntVolatile(var1, var2)方法獲取到value值3,並利用compareAndSwapInt方法比較內存值也爲3,比較成功,修改內存值爲2,線程切換,線程B掛起。
4.線程A恢復,利用compareAndSwapInt方法比較,發現手裏的值3和內存值2不一致,此時value正在被另一個線程修改,線程A不能修改value值。
5.線程的compareAndSwapInt實現,循環判斷,從新獲取value值,由於value是volatile變量,因此線程對它的修改,線程A老是可以看到。線程A繼續利用compareAndSwapInt進行比較並替換,直到compareAndSwapInt修改爲功返回true。
整個過程當中,利用CAS保證了對於value的修改的線程安全性。

不過因爲CAS編碼確實稍微複雜,並且jdk做者自己也不但願你直接使用unsafe(後面會講到)來進行代碼的編寫,因此若是不能深入理解CAS以及unsafe仍是要慎用,使用一些別人已經實現好的無鎖類或者框架就行了。

二、CAS問題。

CAS雖然很高效的解決原子操做,可是CAS仍然存在三大問題。

(1)ABA問題。

   循環時間長開銷大和只能保證一個共享變量的原子操做ABA問題。由於CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。從Java1.5開始JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。

2)循環時間長開銷大

   自旋CAS若是長時間不成功,會給CPU帶來很是大的執行開銷。若是JVM能支持處理器提供的pause指令那麼效率會有必定的提高,pause指令有兩個做用,第一它能夠延遲流水線執行指令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零。第二它能夠避免在退出循環的時候因內存順序衝突(memory order violation)而引發CPU流水線被清空(CPU pipeline flush),從而提升CPU的執行效率。

(3)只能保證對一個共享變量的原子操做。

   當對一個共享變量執行操做時,咱們可使用循環CAS的方式來保證原子操做,可是對多個共享變量操做時,循環CAS就沒法保證操做的原子性,這個時候就能夠用鎖,或者有一個取巧的辦法,就是把多個共享變量合併成一個共享變量來操做。好比有兩個共享變量i=2,j=a,合併一下ij=2a,而後用CAS來操做ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你能夠把多個變量放在一個對象裏來進行CAS操做。

##########################################################################

 

##########################################################################

相關文章
相關標籤/搜索