【轉載】jdk1.8 LongAdder源碼學習

本文轉自 https://blog.csdn.net/u011392897/article/details/60480108 java

LongAdder是jdk8新增的用於併發環境的計數器,目的是爲了在高併發狀況下,代替AtomicLong/AtomicInt,成爲一個用於高併發狀況下的高效的通用計數器。

高併發下計數,通常最早想到的應該是AtomicLong/AtomicInt,AtmoicXXX使用硬件級別的指令 CAS 來更新計數器的值,這樣能夠避免加鎖,機器直接支持的指令,效率也很高。可是AtomicXXX中的 CAS 操做在出現線程競爭時,失敗的線程會白白地循環一次,在併發很大的狀況下,由於每次CAS都只有一個線程能成功,競爭失敗的線程會很是多。失敗次數越多,循環次數就越多,不少線程的CAS操做愈來愈接近 自旋鎖(spin lock)。計數操做原本是一個很簡單的操做,實際須要耗費的cpu時間應該是越少越好,AtomicXXX在高併發計數時,大量的cpu時間都浪費會在 自旋 上了,這很浪費,也下降了實際的計數效率。數組

// 很簡單的一個類,這個類能夠當作是一個簡化的AtomicLong
// 經過cas操做來更新value的值
// @sun.misc.Contended是一個高端的註解,表明使用緩存行填來避免僞共享,能夠本身網上搜下,這個我就不細說了
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
 
    // Unsafe mechanics Unsafe相關的初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
說LongAdder比在高併發時比AtomicLong更高效,這麼說有什麼依據呢?LongAdder是根據ConcurrentHashMap這類爲併發設計的類的基本原理——鎖分段,來實現的,它裏面維護一組按需分配的計數單元,併發計數時,不一樣的線程能夠在不一樣的計數單元上進行計數,這樣減小了線程競爭,提升了併發效率。本質上是用空間換時間的思想,不過在實際高併發狀況中消耗的空間能夠忽略不計。
如今,在處理高併發計數時,應該優先使用LongAdder,而不是繼續使用AtomicLong。固然,線程競爭很低的狀況下進行計數,使用Atomic仍是更簡單更直接,而且效率稍微高一些。
其餘狀況,好比序號生成,這種狀況下須要準確的數值,全局惟一的AtomicLong纔是正確的選擇,此時不該該使用LongAdder。
 
下面簡要分析下LongAdder的源碼,有了ConcurrentHashMap(LongAdder比較像1.6和1.7的,能夠看下1.7的)的基礎,這個類的源碼看起來也不復雜。
1、類的關係

 

 

公共父類Striped64是實現中的核心,它實現一些核心操做,處理64位數據,很容易就能轉化爲其餘基本類型,是個通用的類。二元算術運算累積,指的是你能夠給它提供一個二元算術方式,這個類按照你提供的方式進行算術計算,並保存計算結果。二元運算中第一個操做數是累積器中某個計數單元當前的值,另一個值是外部提供的。
舉幾個例子:
假設每次操做都須要把原來的數值加上某個值,那麼二元運算爲 (x, y) -> x+y,這樣累積器每次都會加上你提供的數字y,這跟LongAdder的功能基本上是同樣的;
假設每次操做都須要把原來的數值變爲它的某個倍數,那麼能夠指定二元運算爲 (x, y) -> x*y,累積器每次都會乘以你提供的數字y,y=2時就是一般所說的每次都翻一倍;
假設每次操做都須要把原來的數值變成它的5倍,再加上3,再除以2,再減去4,再乘以你給定的數,最後還要加上6,那麼二元運算爲 (x, y) -> ((x*5+3)/2 - 4)*y +6,累積器每次累積操做都會按照你說的作;
......
LongAccumulator是標準的實現類,LongAdder是特化的實現類,它的功能等價於LongAccumulator((x, y) -> x+y, 0L)。它們的區別很簡單,前者能夠進行任何二元算術操做,後者只能進行加減兩種算術操做。
Double版本是Long版本的簡單改裝,相對Long版本,主要的變化就是用Double.longBitsToDouble 和Double.doubleToRawLongBits對底層的8字節數據進行long <---> double轉換,存儲的時候使用long型,計算的時候轉化爲double型。這是由於CAS是sun.misc.Unsafe中提供的操做,只對int、long、對象類型(引用或者指針)提供了這種操做,其餘類型都須要轉化爲這三種類型才能進行CAS操做。這裏的long型也能夠認爲是8字節的原始類型,由於把它視爲long類型是無心義的。java中沒有C語言中的 void* 無類型(或者叫原始類型),只能用最接近的long類型來代替。
 

四個實現類的區別就上面這兩句話,這裏只講LongAdder一個類。緩存

 

2、核心實現Striped64多線程

四個類的核心實現都在Striped64中,這個類使用分段的思想,來儘可能平攤併發壓力。相似1.7及之前版本的ConcurrentHashMap.Segment,Striped64中使用了一個叫Cell的類,是一個普通的二元算術累積單元,線程也是經過hash取模操做映射到一個Cell上進行累積。爲了加快取模運算效率,也把Cell數組的大小設置爲2^n,同時大量使用Unsafe提供的底層操做。基本的實現桶1.7的ConcurrentHashMap很是像,並且更簡單。
 
一、累積單元Cell
看到這裏我想了一個看似簡單的問題:既然Cell這麼簡單,只有一個long型變量,爲何不直接用long value?
首先聲明下,Unsafe提供的操做很強大,也能對數組的元素進行volatile讀寫,同時數組計算某個元素的offset偏移量自己就很簡單,所以volatile、cas這種站不住腳。這個問題下面一點再進行解答。
// 很簡單的一個類,這個類能夠當作是一個簡化的AtomicLong
// 經過cas操做來更新value的值
// @sun.misc.Contended是一個高端的註解,表明使用緩存行填來避免僞共享,能夠本身網上搜下,這個我就不細說了
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
 
    // Unsafe mechanics Unsafe相關的初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

二、Striped64主體代碼併發

abstract class Striped64 extends Number {
    @sun.misc.Contended static final class Cell { ... }
 
    /** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();
 
    // cell數組,長度同樣要是2^n,能夠類比爲jdk1.7的ConcurrentHashMap中的segments數組
    transient volatile Cell[] cells;
 
    // 累積器的基本值,在兩種狀況下會使用:
    // 一、沒有遇到併發的狀況,直接使用base,速度更快;
    // 二、多線程併發初始化table數組時,必需要保證table數組只被初始化一次,所以只有一個線程可以競爭成功,這種狀況下競爭失敗的線程會嘗試在base上進行一次累積操做
    transient volatile long base;
 
    // 自旋標識,在對cells進行初始化,或者後續擴容時,須要經過CAS操做把此標識設置爲1(busy,忙標識,至關於加鎖),取消busy時能夠直接使用cellsBusy = 0,至關於釋放鎖
    transient volatile int cellsBusy;
 
    Striped64() {
    }
 
    // 使用CAS更新base的值
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
 
    // 使用CAS將cells自旋標識更新爲1
    // 更新爲0時能夠不用CAS,直接使用cellsBusy就行
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
 
    // 下面這兩個方法是ThreadLocalRandom中的方法,不過由於包訪問關係,這裏又從新寫一遍
 
    // probe翻譯過來是探測/探測器/探針這些,很差理解,它是ThreadLocalRandom裏面的一個屬性,
    // 不過並不影響對Striped64的理解,這裏能夠把它理解爲線程自己的hash值
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
 
    // 至關於rehash,從新算一遍線程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
 
    /**
     * 核心方法的實現,此方法建議在外部進行一次CAS操做(cell != null時嘗試CAS更新base值,cells != null時,CAS更新hash值取模後對應的cell.value)
     * @param x the value 前面我說的二元運算中的第二個操做數,也就是外部提供的那個操做數
     * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder).
     *     外部提供的二元算術操做,實例持有而且只能有一個,生命週期內保持不變,null表明LongAdder這種特殊可是最經常使用的狀況,能夠減小一次方法調用
     * @param wasUncontended false if CAS failed before call 若是爲false,代表調用者預先調用的一次CAS操做都失敗了
     */
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        int h;
        // 這個if至關於給線程生成一個非0的hash值
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false; // True if last slot nonempty 若是hash取模映射獲得的Cell單元不是null,則爲true,此值也能夠看做是擴容意向,感受這個更好理解
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // cells已經被初始化了
                if ((a = as[(n - 1) & h]) == null) { // hash取模映射獲得的Cell單元還爲null(爲null表示尚未被使用)
                    if (cellsBusy == 0) {       // Try to attach new Cell 若是沒有線程正在執行擴容
                        Cell r = new Cell(x);   // Optimistically create 先建立新的累積單元
                        if (cellsBusy == 0 && casCellsBusy()) { // 嘗試加鎖
                            boolean created = false;
                            try {               // Recheck under lock 在有鎖的狀況下再檢測一遍以前的判斷
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 考慮別的線程可能執行了擴容,這裏從新賦值從新判斷
                                    rs[j] = r; // 對沒有使用的Cell單元進行累積操做(第一次賦值至關因而累積上一個操做數,求和時再和base執行一次運算就獲得實際的結果)
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0; 清空自旋標識,釋放鎖
                            }
                            if (created) // 若是本來爲null的Cell單元是由本身進行第一次累積操做,那麼任務已經完成了,因此能夠退出循環
                                break;
                            continue;           // Slot is now non-empty 不是本身進行第一次累積操做,重頭再來
                        }
                    }
                    collide = false; // 執行這一句是由於cells被加鎖了,不能往下繼續執行第一次的賦值操做(第一次累積),因此還不能考慮擴容
                }
                else if (!wasUncontended) // CAS already known to fail 前面一次CAS更新a.value(進行一次累積)的嘗試已經失敗了,說明已經發生了線程競爭
                    wasUncontended = true; // Continue after rehash 狀況失敗標識,後面去從新算一遍線程的hash值
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 嘗試CAS更新a.value(進行一次累積) ------ 標記爲分支A
                    break; // 成功了就完成了累積任務,退出循環
                else if (n >= NCPU || cells != as) // cell數組已是最大的了,或者中途發生了擴容操做。由於NCPU不必定是2^n,因此這裏用 >=
                    collide = false; // At max size or stale 長度n是遞增的,執行到了這個分支,說明n >= NCPU會永遠爲true,下面兩個else if就永遠不會被執行了,也就永遠不會再進行擴容
                                     // CPU可以並行的CAS操做的最大數量是它的核心數(CAS在x86中對應的指令是cmpxchg,多核須要經過鎖緩存來保證總體原子性),當n >= NCPU時,再出現幾個線程映射到同一個Cell致使CAS競爭的狀況,那就真不關擴容的事了,徹底是hash值的鍋了
                else if (!collide) // 映射到的Cell單元不是null,而且嘗試對它進行累積時,CAS競爭失敗了,這時候把擴容意向設置爲true
                                   // 下一次循環若是仍是跟這一次同樣,說明競爭很嚴重,那麼就真正擴容
                    collide = true; // 把擴容意向設置爲true,只有這裏纔會給collide賦值爲true,也只有執行了這一句,纔可能執行後面一個else if進行擴容
                else if (cellsBusy == 0 && casCellsBusy()) { // 最後再考慮擴容,能到這一步說明競爭很激烈,嘗試加鎖進行擴容 ------ 標記爲分支B
                    try {
                        if (cells == as) {      // Expand table unless stale 檢查下是否被別的線程擴容了(CAS更新鎖標識,處理不了ABA問題,這裏再檢查一遍)
                            Cell[] rs = new Cell[n << 1]; // 執行2倍擴容
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0; // 釋放鎖
                    }
                    collide = false; // 擴容意向爲false
                    continue; // Retry with expanded table 擴容後重頭再來
                }
                h = advanceProbe(h); // 從新給線程生成一個hash值,下降hash衝突,減小映射到同一個Cell致使CAS競爭的狀況
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells沒有被加鎖,而且它沒有被初始化,那麼就嘗試對它進行加鎖,加鎖成功進入這個else if
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) { // CAS避免不了ABA問題,這裏再檢測一次,若是仍是null,或者空數組,那麼就執行初始化
                        Cell[] rs = new Cell[2]; // 初始化時只建立兩個單元
                        rs[h & 1] = new Cell(x); // 對其中一個單元進行累積操做,另外一個無論,繼續爲null
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0; // 清空自旋標識,釋放鎖
                }
                if (init) // 若是某個本來爲null的Cell單元是由本身進行第一次累積操做,那麼任務已經完成了,因此能夠退出循環
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // cells正在進行初始化時,嘗試直接在base上進行累加操做
                break;                          // Fall back on using base 直接在base上進行累積操做成功了,任務完成,能夠退出循環了
        }
    }
 
    // double的不講,更long的邏輯基本上是同樣的
    final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended);
 
    // Unsafe mechanics Unsafe初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
}

 

看完這個在來看看第一點中我提的問題:既然Cell這麼簡單,爲何不直接用long value?app

先看看我特別標明的兩個分支:分支A是用CAS更新對應的cell.value,是個寫操做,分支B是進行擴容。
ConcurrentHashMap中,擴容和寫操做是會嚴格處理的,在一個分段鎖管轄區內,不會出現擴容和寫操做併發:1.6和1.7的擴容操做都是在put內部執行的,put自己就會加鎖,所以擴容進行時會阻塞對同一個Segment的寫操做;1.8中擴容時,put/remove等方法若是遇見正在其餘線程正在執行擴容,會去幫助擴容,擴容完成了以後纔會去嘗試加鎖執行真正的寫操做。
雖然B分支會進行」加鎖「,可是A操做跟cellsBusy無關,」加鎖「並不由止A操做的執行。AB兩個分支是不互斥的, 所以Striped64這裏會出現A分支的寫操做,和B分支擴容操做併發執行的狀況。
那麼問題是:爲何這麼併發執行沒問題?
仔細看看A操做,就明白了。A操做使用CAS更新Cell對象中的某個屬性,並不改變數組持有的Cell對象的引用,擴容操做進行的是數組持有的Cell對象引用的複製,複製後引用指向的仍是原來的那個Cell對象。
舉個例子就是,舊的cell數組,叫做old,old[1] = cellA,cellA.value = 1,擴容後的新數組,叫做new,任然有new[1] = cellA。A分支實際上執行的是cellA.value = 2,不管分支A和B怎麼併發執行,執行完成後新數組都能看到分支A對Cell的改變,擴容先後實際上數組持有的是同一羣Cell對象。
這下就知道爲何不直接用long變量代替Cell對象了吧。long[]進行復制時,兩個數組完徹底全分離了,A分支直接做用在舊數組上,B分支擴容後,看不到串行復制執行後對舊數組同一位置的改變。舉個例子就是,old[1]=10,A分支要把old[1]更新爲11,這時候B分支已經複製到old[5]了,A分支執行完成後,B分支建立的新數組new[1]可能仍是10(無論是多少,反正沒記錄A分支的操做),這樣A分支的操做就被遺失了,程序會有問題。
下面簡單畫了個示意圖,能夠看看。



3、LongAdder
看完了Striped64的講解,這部分就很簡單了,只是一些簡單的封裝。
public class LongAdder extends Striped64 implements Serializable {
 
    // 構造方法,什麼也不作,直接使用默認值,base = 0, cells = null
    public LongAdder() {
    }
 
    // add方法,根據父類的longAccumulate方法的要求,這裏要進行一次CAS操做
    // (雖然這裏有兩個CAS,可是第一個CAS成功了就不會執行第二個,要執行第二個,第一個就被「短路」了不會被執行)
    // 在線程競爭不激烈時,這樣作更快
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
 
    public void increment() {
        add(1L);
    }
 
    public void decrement() {
        add(-1L);
    }
 
    // 返回累加的和,也就是「當前時刻」的計數值
    // 此返回值可能不是絕對準確的,由於調用這個方法時還有其餘線程可能正在進行計數累加,
    //     方法的返回時刻和調用時刻不是同一個點,在有併發的狀況下,這個值只是近似準確的計數值
    // 高併發時,除非全局加鎖,不然得不到程序運行中某個時刻絕對準確的值,可是全局加鎖在高併發狀況下是下下策
    // 在不少的併發場景中,計數操做並非核心,這種狀況下容許計數器的值出現一點誤差,此時可使用LongAdder
    // 在必須依賴準確計數值的場景中,應該本身處理而不是使用通用的類
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
 
    // 重置計數器,只應該在明確沒有併發的狀況下調用,能夠用來避免從新new一個LongAdder
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
 
    // 至關於sum()後再調用reset()
    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }
 
    // 其餘的不說了
}

 

 
簡單總結下:
這個類是jdk1.8新增的類,目的是爲了提供一個通用的,更高效的用於併發場景的計數器。能夠網上搜下一些關於LongAdder的性能測試,有不少現成的,我本身就不寫了。
jdk1.8的ConcurrentHashMap中,沒有再使用Segment,使用了一個簡單的仿造LongAdder實現的計數器,這樣可以保證計數效率不低於使用Segment的效率。
相關文章
相關標籤/搜索