什麼是CAS
全稱:Compare and Set ,也就是先比較再設置的意思
也稱爲:Compare and swap,先比較並交換
流程
併發包(j.u.c)下的應用示例
以原子包下的AtomicInteger類進行源碼分析CAS
//建立一個原子類的對象,並進行++ 操做,從0開始
AtomicInteger atomicInteger = new AtomicInteger();
atomicInteger.incrementAndGet();
複製代碼
看看內部進行了哪些操做
AtomicInteger 內部的結構
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// CAS的核心類,因爲java方法沒法直接訪問底層系統,須要經過本地(native)方法來訪問
//Unsafe至關於一個後門,基於該類能夠直接操做特定內存的數據
private static final Unsafe unsafe = Unsafe.getUnsafe();
//表示該變量值在內存中的偏移地址,由於Unsafe就是根據內存偏移地址獲取數據的
//偏移量的理解:內存中存數數據的方式:一個存儲數據的 實際地址=段首地址+偏移量
//對應的現實中 家庭地址= 小區地址+門牌號
private static final long valueOffset;
//使用volatile保證了多線程之間的內存可見性
private volatile int value;
// 建立對象的時候 就會將valueOffset的值獲取到
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value" ));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* 有參構造
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* 無參構造
*/
public AtomicInteger () {
}
複製代碼
/**
*
*/
public final int incrementAndGet () {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
/**
*
* @param var1 : AtomicInteger對象
* @param var2 : valueOffset
* @param var4 : 固定值 1
* @Description :將value進行自增,而且返回自增的值
* @Author Licy
* @Date 2019/6/14 20:03
* @return int
*
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
//能夠當作compareAndSwapInt(obj, offset, expect, update)
} while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
複製代碼
具體流程
實際存儲的值是放在value中的
類加載的時候,還獲取了unsafe實例
定義了valueOffset,而且在靜態代碼塊中初始化了改值,當靜態代碼塊執行的時候,獲取的就是value的偏移量
getAndAddInt函數中 var5獲取的就是valueOffset所表明的具體的值,也就是value的值
compareAndSwapInt函數的意思:若是obj內的value和expect相等,就證實沒有其餘線程改變過這個變量,那麼久更新他爲update,若是這一步CAS沒有成功,那就採用自旋的方式繼續進行CAS操做
比較和設置乍一看是兩個步驟,其實在JNI裏面藉助了cpu的指令完成,保證其原子問題
底層原理
CAS底層使用JNI調用C代碼實現的,若是你有Hotspot源碼,那麼在Unsafe.cpp裏能夠找到它的實現:
所產生的問題
一、ABA的問題
CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。這就是CAS的ABA問題
常見的解決思路是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。
目前在JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。
二、循環時間開銷大
若是CAS不成功,則會原地自旋,若是長時間自旋會給CPU帶來很是大的執行開銷。
不過 在jdk8中已經解決此問題
Java 8對CAS機制的優化
java.util.concurrency.atomic.LongAdder
大致的流程
public class LongAdder extends Striped64 implements Serializable {
//.....
}
abstract class Striped64 extends Number {
/**
*
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
//
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value" ));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
複製代碼
LongAdder繼承了Striped64類
Striped64類中維護了一個懶加載數據Cell[]和一個額外的base實例域
數據的大小是2的N次方,使用每一個線程Thread內部的哈希值訪問
使用註解Contended修飾是解決false sharing(僞共享的問題),解決不一樣操做系統位數緩存行大小不同,防止cell數據發生僞共享的狀況
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder () {
}
public void add(long x) {
// as是Striped64中的cells屬性
// b是Striped64中的base屬性
// v是當前線程hash 到的Cell中存儲的值
// m是cells的長度減1,hash 時做爲掩碼使用
// a是當前線程hash 到的Cell
Cell[] as; long b, v; int m; Cell a;
// 條件1:cells不爲空,說明出現過競爭,cells已經建立
// 條件2:cas操做base失敗,說明其它線程先一步修改了base,正在出現競爭
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true 表示當前競爭還不激烈
// false 表示競爭激烈,多個線程hash 到同一個Cell,可能要擴容
boolean uncontended = true ;
// 條件1:cells爲空,說明正在出現競爭,上面是從條件2過來的
// 條件2:應該不會出現
// 條件3:當前線程所在的Cell爲空,說明當前線程尚未更新過Cell,應初始化一個Cell
// 條件4:更新當前線程所在的Cell失敗,說明如今競爭很激烈,多個線程hash 到了同一個Cell,應擴容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是線程中的threadLocalRandomProbe字段
// 它是經過隨機數生成的一個值,對於一個肯定的線程這個值是固定的
// 除非刻意修改它
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 調用Striped64中的方法處理
longAccumulate(x, null, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment () {
add(1L);
}
/**
* Equivalent to {@code add(-1)}.
*/
public void decrement () {
add(-1L);
}
複製代碼
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存儲線程的probe值
int h;
// 若是getProbe()方法返回0,說明隨機數未初始化
if ((h = getProbe()) == 0) {
// 強制初始化
ThreadLocalRandom.current(); // force initialization
// 從新獲取probe值
h = getProbe();
// 都未初始化,確定還不存在競爭激烈
wasUncontended = true ;
}
// 是否發生碰撞
boolean collide = false ; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells已經初始化過
if ((as = cells) != null && (n = as.length) > 0) {
// 當前線程所在的Cell未初始化
if ((a = as[(n - 1) & h]) == null) {
// 當前無其它線程在建立或擴容cells,也沒有線程在建立Cell
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一個Cell,值爲當前須要增長的值
Cell r = new Cell(x); // Optimistically create
// 再次檢測cellsBusy,並嘗試更新它爲1
// 至關於當前線程加鎖
if (cellsBusy == 0 && casCellsBusy()) {
// 是否建立成功
boolean created = false ;
try { // Recheck under lock
Cell[] rs; int m, j;
// 從新獲取cells,並找到當前線程hash 到cells數組中的位置
// 這裏必定要從新獲取cells,由於as並不在鎖定範圍內
// 有可能已經擴容了,這裏要從新獲取
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 把上面新建的Cell放在cells的j位置處
rs[j] = r;
// 建立成功
created = true ;
}
} finally {
// 至關於釋放鎖
cellsBusy = 0;
}
// 建立成功了就返回
// 值已經放在新建的Cell裏面了
if (created)
break ;
continue ; // Slot is now non-empty
}
}
// 標記當前未出現衝突
collide = false ;
}
// 當前線程所在的Cell不爲空,且更新失敗了
// 這裏簡單地設爲true ,至關於簡單地自旋一次
// 經過下面的語句修改線程的probe再從新嘗試
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true ; // Continue after rehash
// 再次嘗試CAS更新當前線程所在Cell的值,若是成功了就返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break ;
// 若是cells數組的長度達到了CPU核心數,或者cells擴容了
// 設置collide爲false 並經過下面的語句修改線程的probe再從新嘗試
else if (n >= NCPU || cells != as)
collide = false ; // At max size or stale
// 上上個elseif都更新失敗了,且上個條件不成立,說明出現衝突了
else if (!collide)
collide = true ;
// 明確出現衝突了,嘗試佔有鎖,並擴容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 檢查是否有其它線程已經擴容過了
if (cells == as) { // Expand table unless stale
// 新數組爲原數組的兩倍
Cell[] rs = new Cell[n << 1];
// 把舊數組元素拷貝到新數組中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 從新賦值cells爲新數組
cells = rs;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 已解決衝突
collide = false ;
// 使用擴容後的新數組從新嘗試
continue ; // Retry with expanded table
}
// 更新失敗或者達到了CPU核心數,從新生成probe,並重試
h = advanceProbe(h);
}
// 未初始化過cells數組,嘗試佔有鎖並初始化cells數組
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 是否初始化成功
boolean init = false ;
try { // Initialize table
// 檢測是否有其它線程初始化過
if (cells == as) {
// 新建一個大小爲2的Cell數組
Cell[] rs = new Cell[2];
// 找到當前線程hash 到數組中的位置並建立其對應的Cell
rs[h & 1] = new Cell(x);
// 賦值給cells數組
cells = rs;
// 初始化成功
init = true ;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 初始化成功直接返回
// 由於增長的值已經同時建立到Cell中了
if (init)
break ;
}
// 若是有其它線程在初始化cells數組中,就嘗試更新base
// 若是成功了就返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break ; // Fall back on using base
}
}
複製代碼
首先有一個base的值,剛開始多線程來不停的累加數值,都是對base進行累加的
好比剛開始累加成了5,接着發現併發更新的線程數量過多,就會開始實行分段CAS機制
分段CSA機制就是在內部有一個Cell數組,每一個數組是一個數據的分段,這是讓大量的線程分別去對不一樣Cell內部的value值進行CAS的累加操做,這樣就把CAS計算壓力分散到了不一樣的Cell分段數值中,獎勵了多線程併發更新同一個數值時出現的無限循環的問題
並且該類內部也實現了自動分段遷移的機制,就是若是某個Cell的value執行CAS失敗了,那麼就會自動去找另一個Cell分段內的value值進行CAS操做
若是你要從LongAdder中獲取當前累加的總值,就會把base值和全部Cell分段數值加起來返回給你。