併發編程第六天----LongAdder源碼深度解析

簡介

AtomicLong 經過 CAS 提供了非阻塞的原子性操做,性能比使用同步鎖好多了。可是在高併發狀況下,大量線程爭奪同一個原子變量,只有一個線程的 CAS 能操做成功,其餘線程會不停地 CAS 自旋,極度浪費 CPU 資源。java

爲了解決這個問題,JDK8 提供了一個類 LongAdder把一個變量分紅多個變量,讓一樣多的線程去競爭多個資源,就解決了性能問題。數組

LongAdder 在內部維護了多個 Cell 原子變量,另外,多個線程在爭奪同一個 Cell 原子變量時若是失敗了,並非在當前 Cell 變量上一直嘗試,而是嘗試對其餘 Cell 變量進行 CAS 操做。最後,在獲取 LongAdder 當前值時,是把全部 Cell 變量的 value 值累加後再加上 base 返回的。下面咱們來看看 LongAdder 的使用及源碼。安全


使用

public class LongAdderTest {

    static LongAdder longAdder = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                for (int j = 0; j < 10; j++) {
                    longAdder.add(10);
                }
            }).start();
        }

        Thread.sleep(2000); // 保證前面的線程執行完

        System.out.println(longAdder.sum());
    }
}
複製代碼

毫無疑問,輸出的結果確定是 1000,LongAdder 是線程安全的。markdown


源碼分析

Striped64

LongAdder 是繼承自 Striped64,咱們來看看 Striped64 的主要成員變量。多線程

abstract class Striped64 extends Number {
    
    // 大小是 2 的 n 次方
    transient volatile Cell[] cells;
    
    // 基礎值
    transient volatile long base;
    
    // 一個標識,狀態只有 0 和 1,爲 1 表示 cells 數組在初始化或者擴容
    transient volatile int cellsBusy;
    
    @sun.misc.Contended static final class Cell {
        
        volatile long value;
        
        Cell(long x) { value = x; }
        
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe 機制
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                // 獲取 Cell 實例中變量 value 的內存偏移量
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    // Unsafe 機制 
    private static final sun.misc.Unsafe UNSAFE;
    // 記錄 Striped64 實例中變量 base 的偏移量
    private static final long BASE;
    
    // 記錄 Striped64 實例中變量 cellsBusy 的偏移量
    private static final long CELLSBUSY;
    
     // 記錄 Thread 實例中變量 threadLocalRandomProbe 的偏移量,用於計算訪問下標
    private static final long PROBE;
    
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
   
}
複製代碼

Unsafe 機制如今應該很是熟悉了,就是用於獲取變量在實例中的偏移量,用於 CAS 操做。併發

@sun.misc.Contended 能夠解決僞共享問題, Cell 內部有一個聲明爲 volatile 的變量,並經過 CAS 更新該值,保證了更新操做的原子性。CellAtomicLong 的優化。保證了線程操做 Cell 元素的原子性。app

Cell 數組的大小必定爲 2 的 n 次方。dom

threadLocalRandomProbe 用來計算當前線程訪問 Cell 數組的下標。ide

cellsBusy 做爲標識鎖變量,狀態只有 0 和 1 ,爲 1 表示 Cell 數組正在初始化或者擴容,其餘線程則不能進行初始化或者擴容高併發


LongAdder 源碼分析

咱們應該關注下面幾個問題:

  1. 當前線程應該訪問 Cell 數組裏的哪一個元素。
  2. 如何初始化 Cell 數組。
  3. Cell 數組什麼時候擴容
  4. 線程訪問分配的 Cell 元素有衝突時怎麼辦

sum()

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) { // 若是 Cell 數組爲空,則直接返回 base
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
複製代碼

累加全部 Cell 內部的 value 值,而後再累加上 base。因爲沒有對 Cell 數組進行加鎖,因此在累加的過程當中,可能有其餘線程對 Cell 數組的內容進行了修改,返回的值可能不是很精確。


add()

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {   // ( 1 )
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||    // ( 2 )
            (a = as[getProbe() & m]) == null ||     // ( 3 )
            !(uncontended = a.cas(v = a.value, v + x)))  // ( 4 )
            longAccumulate(x, null, uncontended); // ( 5 )
    }
}
複製代碼

( 1 ) 判斷 cells 是否爲 null,若是爲 null,則直接在 base 上進行累加,此時相似 AtomicLong,若是 cells 不爲 null 或者 CAS 操做失敗,則進行下面的操做。剛開始併發線程較少時,全部的累加操做都是對 base 變量進行,當某個線程第一次 CAS 操做失敗時,則進行初始化 Cell 數組

( 2 ) 、 ( 3 ) 決定當前線程應該訪問 Cell 數組裏的哪一個元素 (解決了問題 1 ) ,經過 getProbe() & m 計算的,getProbe 用於獲取當前線程的 ThreadLocalRandomProbe 的值, m 是 Cell 數組的長度。若是訪問的 Cell 數組元素爲 null,則執行代碼 ( 5 )。

若是該 Cell 元素存在則執行代碼 ( 4 ),經過 CAS 更新 Cell 元素的 value 值,若是更新失敗則執行代碼 ( 5 ),而且 uncontended 的值爲 false。


longAccumulate()

longAccumulate 是涉及處理初始化擴容解決衝突的方法,在 Striped64 類中。咱們能在裏面找到問題 2 、 3 、 4 的答案。

int h;  // 記錄當前線程的 threadLocalRandomProbe 的值
    if ((h = getProbe()) == 0) { 
        ThreadLocalRandom.current(); // 初始化當前線程的 threadLocalRandomProbe 的值
        h = getProbe();
        wasUncontended = true;
    }
複製代碼

最開始是判斷當前線程的 threadLocalRandomProbe 值是否爲 0 ,爲 0 則初始化 threadLocalRandomProbe。這個變量用於計算當前線程應該被分配到 Cell 數組的哪一個下標。


boolean collide = false;            
        for (;;) {  // 無限循環
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
               // cells 不爲 null 時
               ...
            }
            // cells 爲 null 則進行初始化
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           
                    if (cells == as) {
                        Cell[] rs = new Cell[2]; // 初始化大小爲2
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            
            // 爲了能找到一個空閒的 Cell,從新計算 hash 值
            h = advanceProbe(h); 
        }
複製代碼

Cell 數組爲 null 時,則進行初始化。 初始化時經過 casCellsBusy()cellsBusy 的值設爲 1,這時其餘線程就不能進行擴容或者初始化了

初始化 Cell 數組的大小爲 2,並經過 h & 1 計算出當前線程訪問的 Cell 元素,並進行賦值操做。(回答了問題 2 )


boolean collide = false;  // 爲 true 表示衝突
        
        for (;;) { // 無限循環
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) { // 當前線程訪問的 cell 元素爲 null
                
                    if (cellsBusy == 0) { 
                        Cell r = new Cell(x);   // 初始化 Cell 元素 (採用了延遲加載,用到時才初始化 Cell)
                        if (cellsBusy == 0 && casCellsBusy()) { // 獲取鎖變量
                            boolean created = false;
                            try {               
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           
                        }
                    }
                   
                    collide = false;
                }
                
                else if (!wasUncontended)       // 以前 CAS 更新失敗
                    wasUncontended = true;      
                
                // 當前線程訪問的 Cell 元素存在了,則進行 CAS 加操做 
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                    
                // NCPU 表明當前機器的 CPU 個數,Cell 數組的元素不能超過 NCPU
                else if (n >= NCPU || cells != as)   ( 7 )
                    collide = false;           
                
                
                else if (!collide)    ( 8 )
                    collide = true;
                    
                // 擴容操做
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   
                }
                
                // 爲了能找到一個空閒的 Cell,從新計算 hash 值
                h = advanceProbe(h);   ( 9 )
            }
        }
複製代碼

( 7 ) 、 ( 8 ) 都不符合條件時,纔會執行擴容操做。即:當前 Cell 數組的個數小於 CPU 的個數 而且 多個線程間發生了衝突。

( 7 ) 處表示只有當前 Cell 數組的長度小於 CPU 的個數,才能擴容。爲何要作這樣的限制呢 :只有當每一個 CPU 運行一個線程時纔會使多線程的效果最佳,也就是當 Cell 數組元素的個數與 CPU 個數一致時,每一個 Cell 元素都用一個 CPU 處理,效果最佳。

( 8 ) 表示多個線程訪問了 Cell 數組中的同一個元素,或者多個線程嘗試獲取 cellsBusy 鎖變量 致使了衝突。

擴容時先獲取 cellsBusy 鎖變量,而後大小擴充爲原來的兩倍再複製原來的變量到新數組。(解決了問題 3)。

( 9 ) 處對 CAS 失敗的線程從新計算 threadLocalRandomProb , 減小衝突機會。(解決了問題 4


總結

LongAccumulate() 源碼錶面複雜,可是隻要圍繞文章開頭題的四個問題去看,也不是很難。

相關文章
相關標籤/搜索