【java學習筆記】LongAdder


LongAdder是JDK1.8在java.util.concurrent.atomic包下新引入的 爲了 高併發下實現高性能統計的類。

1.背景

AtomicLong是在高併發下對單一變量進行CAS操做,從而保證其原子性。java

public final long getAndAdd(long delta) {
    return unsafe.getAndAddLong(this, valueOffset, delta);
}

在Unsafe類中,若是有多個線程進入,只有一個線程能成功CAS,其餘線程都失敗。失敗的線程會重複進行下一輪的CAS,可是下一輪仍是隻有一個線程成功。數組

public final long getAndAddLong(Object o, long offset, long delta) {
    long v;
    do {
        v= this.getLongVolatile(o,offset);
    } while(!this.compareAndSwapLong(o,offset, v, v+delta));
    return v;
}

即在高併發下,AtomicLong的性能會愈來愈差勁。多線程

所以,引入了替代方案,LongAdder。併發

2.LongAdder

LongAdder是一種以空間換時間的解決方案。其內部維護了一個值base,和一個cell數組,當線程寫base有衝突時,將其寫入數組的一個cell中。將base和全部cell中的值求和就獲得最終LongAdder的值了。
Method sum() (or, equivalently, longValue()) returns the current total combined across the variables maintaining the sum.app

public long longValue() {
    return sum();
}

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

3.Striped64內部結構

LongAdder類繼承了Striped64類,其中,class Striped64維護有有 Cell的內部類,Base,Cell數組等相關成員變量。
NCPU:表示當前計算機CPU數量,用於控制cells數組長度。由於一個CPU同一時間只能執行一個線程,若是cells數組長度 大於 CPU數量,並不能提升併發數,且形成空間的浪費。
cells:存放Cell的數組。
base:在沒有發生過競爭時,數據會累加到base上。 或者,當cells擴容時,是須要將數據寫到base中的。
cellsBusy:鎖。0表示無鎖狀態,1表示其餘線程已經持有鎖。初始化cells,建立Cell,擴容cells都須要獲取鎖。less

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;  // 當前value基於當前對象的內存偏移
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
//表示當前計算機CPU數量,控制cells數組長度
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Cell[] cells;
transient volatile long base;  //在沒有發生過競爭時,數據會累加到base上, 或者 當cells擴容時,須要將數據寫到base中
transient volatile int cellsBusy; // 初始化cells或者擴容cells都須要獲取鎖,0表示無鎖狀態,1表示其餘線程已經持有鎖

4.LongAdder的add方法解析

add(long x):加上給定的x。
1.一開始只加給base,那麼此時cells必定沒有初始化,此時只會casBase,成功則返回。
2.casBase失敗,意味着多線程寫base發生競爭,進入longAccumulate(x, null, uncontended = true)重試或者初始化cells。dom

3.若是cells已經初始化過了,可是,當前線程對應下標的cell爲空,須要建立。進入longAccumulate(x, null, uncontended = true)建立對應cell。ide

4.若是cells已經初始化過了,同時,當前線程對應的cell 不爲空,cas給當前cell賦值,成功則返回。失敗,意味着當前線程對應的cell 有競爭,進入longAccumulate(x, null, uncontended = false) 重試或者擴容cells。函數

public void add(long x) {
        //as 表示cells引用
        //b 表示獲取的base值
        //v 表示 指望值
        //m 表示 cells 數組的長度
        //a 表示當前線程命中的cell單元格
        Cell[] as; long b, v; int m; Cell a;

        //條件一:true->表示cells已經初始化過了,當前線程應該將數據寫入到對應的cell中
        //       false->表示cells未初始化,當前全部線程應該將數據寫到base中
        //條件二:false->表示當前線程cas替換數據成功,
        //       true->表示發生競爭了,可能須要重試 或者 擴容
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //何時會進來?
            //1.true->表示cells已經初始化過了,當前線程應該將數據寫入到對應的cell中
            //2.true->表示發生競爭了,可能須要重試 或者 擴容
            
            boolean uncontended = true; //true -> 未競爭  false->發生競爭

            //條件一:true->說明 cells 未初始化,也就是多線程寫base發生競爭了
            //       false->說明 cells 已經初始化了,當前線程應該是 找本身的cell 寫值
            //條件二:getProbe() 獲取當前線程的hash值   m表示cells長度-1 cells長度 必定是2的次方數   15= b1111
            //       true-> 說明當前線程對應下標的cell爲空,須要建立 longAccumulate 支持
            //       false-> 說明當前線程對應的cell 不爲空,說明 下一步想要將x值 添加到cell中。
            //條件三:true->表示cas失敗,意味着當前線程對應的cell 有競爭
            //       false->表示cas成功
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                //都有哪些狀況會調用?
                //1.true->說明 cells 未初始化,也就是多線程寫base發生競爭了[重試|初始化cells]
                //2.true-> 說明當前線程對應下標的cell爲空,須要建立 longAccumulate 支持
                //3.true->表示cas失敗,意味着當前線程對應的cell 有競爭[重試|擴容]
                longAccumulate(x, null, uncontended);
        }
    }

5.Striped64的longAccumulate方法解析

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
根據LongAdder的add方法可知,參數x是add函數的傳入參數,即要增長的數;
LongBinaryOperator是一個接口可擴展,重寫applyAsLong方法用於處理cell中值與參數x的關係,此處傳null;
wasUncontended只有在 【cells已經初始化過了,同時,當前線程對應的cell 不爲空,cas給當前cell賦值,競爭修改失敗】的狀況下爲false,其餘爲true。高併發

第一種狀況:寫base發生競爭,此時cells沒有初始化,因此纔會寫到base,不走CASE1;
走Case2,判斷有沒有鎖,沒有鎖的話,嘗試加鎖,成功加鎖後執行初始化cells的邏輯。若是沒有拿到鎖,表示其它線程正在初始化cells,因此當前線程將值累加到base。

第二種狀況:當前線程對應下標的cell爲空,知足CASE1,到達CASE1.1中,建立一個Cell,加鎖,若是成功,對應的位置其餘線程沒有設置過cell,將建立的cell插入相應位置。

第三種狀況:當前線程對應下標的cell已經建立成功,但寫入cell時發生競爭,到達CASE1.2,wasUncontended = true,把發生競爭線程的hash值rehash。
重置後走若CASE1.1,CASE1.2均不知足,到達CASE1.3【當前線程rehash過hash值,而後新命中的cell不爲空】重試cas賦值+x一次,成功則退出。失敗,擴容意向設置成true,rehash當前線程的hash值,再到1.3重試,還失敗走CASE1.6擴容。

注意:CASE1.4要求cells數組長度不能超過cpu數量,由於一個CPU同一時間只能執行一個線程,若是cells數組長度 大於 CPU數量,並不能提升併發數,且形成空間的浪費。

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        //h 表示線程hash值
        int h;
        //條件成立:說明當前線程 還未分配hash值; getProbe()獲取當前線程的Hash值
        if ((h = getProbe()) == 0) {
            //給當前線程分配hash值
            ThreadLocalRandom.current(); // force initialization
            //取出當前線程的hash值 賦值給h
            h = getProbe();
            //爲何? 由於默認狀況下 當前線程hash爲0, 確定是寫入到了 cells[0] 位置。 不把它當作一次真正的競爭
            wasUncontended = true;
        }

        //表示擴容意向 false 必定不會擴容,true 可能會擴容。
        boolean collide = false;                // True if last slot nonempty

        //自旋
        for (;;) {
            //as 表示cells引用
            //a 表示當前線程命中的cell
            //n 表示cells數組長度
            //v 表示 指望值
            Cell[] as; Cell a; int n; long v;

            //CASE1: 表示cells已經初始化了,當前線程應該將數據寫入到對應的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
                // 如下兩種狀況會進入Case1:
                //2.true-> 說明當前線程對應下標的cell爲空,須要建立 longAccumulate 支持
                //3.true->表示cas失敗,意味着當前線程對應的cell 有競爭[重試|擴容]

                //CASE1.1:true->表示當前線程對應的下標位置的cell爲null,須要建立new Cell
                if ((a = as[(n - 1) & h]) == null) {

                    //true->表示當前鎖 未被佔用  false->表示鎖被佔用
                    if (cellsBusy == 0) {       // Try to attach new Cell

                        //拿當前的x建立Cell
                        Cell r = new Cell(x);   // Optimistically create

                        //條件一:true->表示當前鎖 未被佔用  false->表示鎖被佔用
                        //條件二:true->表示當前線程獲取鎖成功  false->當前線程獲取鎖失敗..
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //是否建立成功 標記
                            boolean created = false;
                            try {               // Recheck under lock
                                //rs 表示當前cells 引用
                                //m 表示cells長度
                                //j 表示當前線程命中的下標
                                Cell[] rs; int m, j;

                                //條件一 條件二 恆成立
                                //rs[j = (m - 1) & h] == null 爲了防止其它線程初始化過該位置,而後當前線程再次初始化該位置
                                //致使丟失數據
                                if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    //擴容意向 強制改成了false
                    collide = false;
                }
                // CASE1.2:
                // wasUncontended:只有cells初始化以後,而且當前線程 競爭修改失敗,纔會是false
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //CASE 1.3:當前線程rehash過hash值,而後新命中的cell不爲空
                //true -> 寫成功,退出循環
                //false -> 表示rehash以後命中的新的cell 也有競爭 重試1次 再重試1次
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                    break;
                //CASE 1.4:
                //條件一:n >= NCPU true->擴容意向 改成false,表示不擴容了  false-> 說明cells數組還能夠擴容
                //條件二:cells != as true->其它線程已經擴容過了,當前線程rehash以後重試便可
                else if (n >= NCPU || cells != as)
                    //擴容意向 改成false,表示不擴容了
                    collide = false;            // At max size or stale
                //CASE 1.5:
                //!collide = true 設置擴容意向 爲true 可是不必定真的發生擴容
                else if (!collide)
                    collide = true;
                //CASE 1.6:真正擴容的邏輯
                //條件一:cellsBusy == 0 true->表示當前無鎖狀態,當前線程能夠去競爭這把鎖
                //條件二:casCellsBusy true->表示當前線程 獲取鎖 成功,能夠執行擴容邏輯
                // false->表示當前時刻有其它線程在作擴容相關的操做。
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //cells == as
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        //釋放鎖
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //重置當前線程Hash值
                h = advanceProbe(h);
            }
            //CASE2:前置條件cells還未初始化 as 爲null
            //條件一:true 表示當前未加鎖
            //條件二:cells == as?由於其它線程可能會在你給as賦值以後修改了 cells
            //條件三:true 表示獲取鎖成功 會把cellsBusy = 1,false 表示其它線程正在持有這把鎖
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    //cells == as? 防止其它線程已經初始化了,當前線程再次初始化 致使丟失數據
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //CASE3:
            //1.當前cellsBusy加鎖狀態,表示其它線程正在初始化cells,因此當前線程將值累加到base
            //2.cells被其它線程初始化後,當前線程須要將數據累加到base
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

6.總結

官方文檔是這樣介紹的
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
LongAdder在多個線程更新一個用於收集統計信息的而不是追求同步的公共和的狀況下,是優於AtomicLong類的。在併發度小,低競爭狀況下,兩個類具備類似的性能。可是在高爭用狀況下,LongAdder的預期吞吐量要高得多,代價是更高的空間消耗。

最後,咱們再來看一下sum方法的註釋
Returns the current sum. The returned value is NOT an atomic snapshot; invocation in the absence of concurrent updates returns an accurate result, but concurrent updates that occur while the sum is being calculated might not be incorporated.
sum方法返回值只是一個接近值,並非一個準確值。它在計算總和時,併發的更新並不會被合併在內。

總結:

  • LongAdder是一種以空間換時間的解決方案,其在高併發,競爭大的狀況下性能更優。
  • 可是,sum方法拿到的只是接近值,追求最終一致性。若是業務場景追求高精度,高準確性,用AtomicLong。
相關文章
相關標籤/搜索