02.併發多線程-CAS

什麼是CAS

  • 全稱:Compare and Set ,也就是先比較再設置的意思
  • 也稱爲:Compare and swap,先比較並交換

流程

image

併發包(j.u.c)下的應用示例

以原子包下的AtomicInteger類進行源碼分析CAS
//建立一個原子類的對象,並進行++ 操做,從0開始
   AtomicInteger atomicInteger = new AtomicInteger();
    atomicInteger.incrementAndGet();
複製代碼
  • 看看內部進行了哪些操做
  • AtomicInteger 內部的結構
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
    
    // CAS的核心類,因爲java方法沒法直接訪問底層系統,須要經過本地(native)方法來訪問
    //Unsafe至關於一個後門,基於該類能夠直接操做特定內存的數據
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //表示該變量值在內存中的偏移地址,由於Unsafe就是根據內存偏移地址獲取數據的
    //偏移量的理解:內存中存數數據的方式:一個存儲數據的 實際地址=段首地址+偏移量 
    //對應的現實中  家庭地址= 小區地址+門牌號
    private static final long valueOffset;
    //使用volatile保證了多線程之間的內存可見性
    private volatile int value;
    
    
       // 建立對象的時候  就會將valueOffset的值獲取到  
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
 
    
    
     /**
     * 有參構造
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    /**
     * 無參構造
     */
    public AtomicInteger() {
    }

複製代碼
  • 怎麼保證每次自增都是原子性的
/**
     *
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    
    
           
       
       /**
         *
         * @param var1 : AtomicInteger對象
         * @param var2 : valueOffset
         * @param var4 : 固定值 1
         * @Description :將value進行自增,而且返回自增的值
         * @Author Licy
         * @Date  2019/6/14 20:03
         * @return int
         *
         */
    
    public final int getAndAddInt(Object var1, long var2, int var4) {
     int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
     
        //能夠當作compareAndSwapInt(obj, offset, expect, update)
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
複製代碼
具體流程
  • 實際存儲的值是放在value中的
  • 類加載的時候,還獲取了unsafe實例
  • 定義了valueOffset,而且在靜態代碼塊中初始化了改值,當靜態代碼塊執行的時候,獲取的就是value的偏移量
  • getAndAddInt函數中 var5獲取的就是valueOffset所表明的具體的值,也就是value的值
  • compareAndSwapInt函數的意思:若是obj內的value和expect相等,就證實沒有其餘線程改變過這個變量,那麼久更新他爲update,若是這一步CAS沒有成功,那就採用自旋的方式繼續進行CAS操做
  • 比較和設置乍一看是兩個步驟,其實在JNI裏面藉助了cpu的指令完成,保證其原子問題

底層原理

  • CAS底層使用JNI調用C代碼實現的,若是你有Hotspot源碼,那麼在Unsafe.cpp裏能夠找到它的實現:

所產生的問題

一、ABA的問題
  • CAS須要在操做值的時候檢查下值有沒有發生變化,若是沒有發生變化則更新,可是若是一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,可是實際上卻變化了。這就是CAS的ABA問題
  • 常見的解決思路是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。
  • 目前在JDK的atomic包裏提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法做用是首先檢查當前引用是否等於預期引用,而且當前標誌是否等於預期標誌,若是所有相等,則以原子方式將該引用和該標誌的值設置爲給定的更新值。
二、循環時間開銷大
  • 若是CAS不成功,則會原地自旋,若是長時間自旋會給CPU帶來很是大的執行開銷。
  • 不過 在jdk8中已經解決此問題

Java 8對CAS機制的優化

java.util.concurrency.atomic.LongAdder
  • java8新增的一個類,提供原子累計值的方法
大致的流程

image

public class LongAdder extends Striped64 implements Serializable {
//.....
}


abstract class Striped64 extends Number {


    /**
     * 
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;


//
@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
}
複製代碼
  • LongAdder繼承了Striped64類
  • Striped64類中維護了一個懶加載數據Cell[]和一個額外的base實例域
  • 數據的大小是2的N次方,使用每一個線程Thread內部的哈希值訪問
  • 使用註解Contended修飾是解決false sharing(僞共享的問題),解決不一樣操做系統位數緩存行大小不同,防止cell數據發生僞共享的狀況
public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;
    public LongAdder() {
    }

 
     
public void add(long x) {
    // as是Striped64中的cells屬性
    // b是Striped64中的base屬性
    // v是當前線程hash到的Cell中存儲的值
    // m是cells的長度減1,hash時做爲掩碼使用
    // a是當前線程hash到的Cell
    Cell[] as; long b, v; int m; Cell a;
    // 條件1:cells不爲空,說明出現過競爭,cells已經建立
    // 條件2:cas操做base失敗,說明其它線程先一步修改了base,正在出現競爭
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // true表示當前競爭還不激烈
        // false表示競爭激烈,多個線程hash到同一個Cell,可能要擴容
        boolean uncontended = true;
        // 條件1:cells爲空,說明正在出現競爭,上面是從條件2過來的
        // 條件2:應該不會出現
        // 條件3:當前線程所在的Cell爲空,說明當前線程尚未更新過Cell,應初始化一個Cell
        // 條件4:更新當前線程所在的Cell失敗,說明如今競爭很激烈,多個線程hash到了同一個Cell,應擴容
        if (as == null || (m = as.length - 1) < 0 ||
            // getProbe()方法返回的是線程中的threadLocalRandomProbe字段
            // 它是經過隨機數生成的一個值,對於一個肯定的線程這個值是固定的
            // 除非刻意修改它
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 調用Striped64中的方法處理
            longAccumulate(x, null, uncontended);
    }
}


    /**
     * Equivalent to {@code add(1)}.
     */
    public void increment() {
        add(1L);
    }

    /**
     * Equivalent to {@code add(-1)}.
     */
    public void decrement() {
        add(-1L);
    }
複製代碼
final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    // 存儲線程的probe值
    int h;
    // 若是getProbe()方法返回0,說明隨機數未初始化
    if ((h = getProbe()) == 0) {
        // 強制初始化
        ThreadLocalRandom.current(); // force initialization
        // 從新獲取probe值
        h = getProbe();
        // 都未初始化,確定還不存在競爭激烈
        wasUncontended = true;
    }
    // 是否發生碰撞
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // cells已經初始化過
        if ((as = cells) != null && (n = as.length) > 0) {
            // 當前線程所在的Cell未初始化
            if ((a = as[(n - 1) & h]) == null) {
                // 當前無其它線程在建立或擴容cells,也沒有線程在建立Cell
                if (cellsBusy == 0) {       // Try to attach new Cell
                    // 新建一個Cell,值爲當前須要增長的值
                    Cell r = new Cell(x);   // Optimistically create
                    // 再次檢測cellsBusy,並嘗試更新它爲1
                    // 至關於當前線程加鎖
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 是否建立成功
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 從新獲取cells,並找到當前線程hash到cells數組中的位置
                            // 這裏必定要從新獲取cells,由於as並不在鎖定範圍內
                            // 有可能已經擴容了,這裏要從新獲取
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 把上面新建的Cell放在cells的j位置處
                                rs[j] = r;
                                // 建立成功
                                created = true;
                            }
                        } finally {
                            // 至關於釋放鎖
                            cellsBusy = 0;
                        }
                        // 建立成功了就返回
                        // 值已經放在新建的Cell裏面了
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                // 標記當前未出現衝突
                collide = false;
            }
            // 當前線程所在的Cell不爲空,且更新失敗了
            // 這裏簡單地設爲true,至關於簡單地自旋一次
            // 經過下面的語句修改線程的probe再從新嘗試
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 再次嘗試CAS更新當前線程所在Cell的值,若是成功了就返回
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 若是cells數組的長度達到了CPU核心數,或者cells擴容了
            // 設置collide爲false並經過下面的語句修改線程的probe再從新嘗試
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 上上個elseif都更新失敗了,且上個條件不成立,說明出現衝突了
            else if (!collide)
                collide = true;
            // 明確出現衝突了,嘗試佔有鎖,並擴容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 檢查是否有其它線程已經擴容過了
                    if (cells == as) {      // Expand table unless stale
                        // 新數組爲原數組的兩倍
                        Cell[] rs = new Cell[n << 1];
                        // 把舊數組元素拷貝到新數組中
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 從新賦值cells爲新數組
                        cells = rs;
                    }
                } finally {
                    // 釋放鎖
                    cellsBusy = 0;
                }
                // 已解決衝突
                collide = false;
                // 使用擴容後的新數組從新嘗試
                continue;                   // Retry with expanded table
            }
            // 更新失敗或者達到了CPU核心數,從新生成probe,並重試
            h = advanceProbe(h);
        }
        // 未初始化過cells數組,嘗試佔有鎖並初始化cells數組
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 是否初始化成功
            boolean init = false;
            try {                           // Initialize table
                // 檢測是否有其它線程初始化過
                if (cells == as) {
                    // 新建一個大小爲2的Cell數組
                    Cell[] rs = new Cell[2];
                    // 找到當前線程hash到數組中的位置並建立其對應的Cell
                    rs[h & 1] = new Cell(x);
                    // 賦值給cells數組
                    cells = rs;
                    // 初始化成功
                    init = true;
                }
            } finally {
                // 釋放鎖
                cellsBusy = 0;
            }
            // 初始化成功直接返回
            // 由於增長的值已經同時建立到Cell中了
            if (init)
                break;
        }
        // 若是有其它線程在初始化cells數組中,就嘗試更新base
        // 若是成功了就返回
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

複製代碼
  • 首先有一個base的值,剛開始多線程來不停的累加數值,都是對base進行累加的
  • 好比剛開始累加成了5,接着發現併發更新的線程數量過多,就會開始實行分段CAS機制
  • 分段CSA機制就是在內部有一個Cell數組,每一個數組是一個數據的分段,這是讓大量的線程分別去對不一樣Cell內部的value值進行CAS的累加操做,這樣就把CAS計算壓力分散到了不一樣的Cell分段數值中,獎勵了多線程併發更新同一個數值時出現的無限循環的問題
  • 並且該類內部也實現了自動分段遷移的機制,就是若是某個Cell的value執行CAS失敗了,那麼就會自動去找另一個Cell分段內的value值進行CAS操做
  • 若是你要從LongAdder中獲取當前累加的總值,就會把base值和全部Cell分段數值加起來返回給你。
相關文章
相關標籤/搜索