從 LongAdder 中窺見併發組件的設計思路

原文地址java

最近在看阿里的 Sentinel 的源碼的時候。發現使用了一個類 LongAdder 來在併發環境中計數。這個時候就提出了疑問,JDK 中已經有 AtomicLong 了,爲啥還要使用 LongAdder ? AtomicLong 已是基於 CAS 的無鎖結構,已經有很好的並發表現了,爲啥還要用 LongAdder ?因而趕快找來源碼一探究竟。git

AtomicLong 的缺陷

你們能夠閱讀我以前寫的 JAVA 中的 CAS 詳細瞭解 AtomicLong 的實現原理。須要注意的一點是,AtomicLong 的 Add() 是依賴自旋不斷的 CAS 去累加一個 Long 值。若是在競爭激烈的狀況下,CAS 操做不斷的失敗,就會有大量的線程不斷的自旋嘗試 CAS 會形成 CPU 的極大的消耗。github

LongAdder 解決方案

經過閱讀 LongAdder 的 Javadoc 咱們瞭解到:數組

This class is usually preferable to {@link AtomicLong} when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.微信

大概意思就是,LongAdder 功能相似 AtomicLong ,在低併發狀況下兩者表現差很少,在高併發狀況下 LongAdder 的表現就會好不少。併發

LongAdder 到底用了什麼黑科技能作到高性比 AtomicLong 還要好呢呢?對於一樣的一個 add() 操做,上文說到 AtomicLong 只對一個 Long 值進行 CAS 操做。而 LongAdder 是針對 Cell 數組的某個 Cell 進行 CAS 操做 ,把線程的名字的 hash 值,做爲 Cell 數組的下標,而後對 Cell[i] 的 long 進行 CAS 操做。簡單粗暴的分散了高併發下的競爭壓力。app

LongAdder 的實現細節

雖然原理簡單粗暴,可是代碼寫得卻至關細緻和精巧。less

java.util.concurrent.atomic 包下面咱們能夠看到 LongAdder 的源碼。首先看 add() 方法的源碼。dom

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
複製代碼

看到這個 add() 方法,首先須要瞭解 Cell 是什麼?ide

Cell 是 java.util.concurrent.atomicStriped64 的一個內部類。

@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // unsafe 機制
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
複製代碼

首先 Cell 被 @sun.misc.Contended 修飾。意思是讓Java編譯器和JRE運行時來決定如何填充。不理解沒關係,不影響理解。

其實一個 Cell 的本質就是一個 volatile 修飾的 long 值,且這個值可以進行 cas 操做。

回到咱們的 add() 方法。

這裏涉及四個額外的方法 casBase() , getProbe() , a.cas() , longAccumulate();

咱們看名字就知道 casBase() 和 a.cas() 都是對參數的 cas 操做。

getProbe() 的做用,就是根據當前線程 hash 出一個 int 值。

longAccumlate() 的做用比較複雜,以後咱們會講解。

因此這個 add() 操做概括之後就是:

  1. 若是 cells 數組不爲空,對參數進行 casBase 操做,若是 casBase 操做失敗。多是競爭激烈,進入第二步。
  2. 若是 cells 爲空,直接進入 longAccumulate();
  3. m = cells 數組長度減一,若是數組長度小於 1,則進入 longAccumulate()
  4. 若是都沒有知足以上條件,則對當前線程進行某種 hash 生成一個數組下標,對下標保存的值進行 cas 操做。若是操做失敗,則說明競爭依然激烈,則進入 longAccumulate().

可見,操做的核心思想仍是基於 cas。可是 cas 失敗後,並非傻乎乎的自旋,而是逐漸升級。升級的 cas 都無論用了則進入 longAccumulate() 這個方法。

下面就開始揭開 longAccumulate 的神祕面紗。

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //若是操做的cell 爲空,double check 新建 cell
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }

                // cas 失敗 繼續循環
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash

                // 若是 cell cas 成功 break
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;

                // 若是 cell 的長度已經大於等於 cpu 的數量,擴容意義不大,就不用標記衝突,重試
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                // 獲取鎖,上鎖擴容,將衝突標記爲否,繼續執行 
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 無法獲取鎖,重散列,嘗試其餘槽
                h = advanceProbe(h);
            }

            // 獲取鎖,初始化 cell 數組
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }

            // 表未被初始化,可能正在初始化,回退使用 base。
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
複製代碼

longAccumulate 看上去比較複雜。咱們慢慢分析。

回憶一下,什麼狀況會進入到這個 longAccumulate 方法中,

  • cell[] 數組爲空,
  • cell[i] 數據的某個下標元素爲空,
  • casBase 失敗,
  • a.cas 失敗,
  • cell.length - 1 < 0

在 longAccumulate 中有幾個標記位,咱們也先理解一下

  • cellsBusy cells 的操做標記位,若是正在修改、新建、操做 cells 數組中的元素會,會將其 cas 爲 1,不然爲0。
  • wasUncontended 表示 cas 是否失敗,若是失敗則考慮操做升級。
  • collide 是否衝突,若是衝突,則考慮擴容 cells 的長度。

整個 for(;;) 死循環,都是以 cas 操做成功而了結。不然則會修改上述描述的幾個標記位,從新進入循環。

因此整個循環包括以下幾種狀況:

  1. cells 不爲空

    1. 若是 cell[i] 某個下標爲空,則 new 一個 cell,並初始化值,而後退出
    2. 若是 cas 失敗,繼續循環
    3. 若是 cell 不爲空,且 cell cas 成功,退出
    4. 若是 cell 的數量,大於等於 cpu 數量或者已經擴容了,繼續重試。(擴容沒意義)
    5. 設置 collide 爲 true。
    6. 獲取 cellsBusy 成功就對 cell 進行擴容,獲取 cellBusy 失敗則從新 hash 再重試。
  2. cells 爲空且獲取到 cellsBusy ,init cells 數組,而後賦值退出。

  3. cellsBusy 獲取失敗,則進行 baseCas ,操做成功退出,不成功則重試。

至此 longAccumulate 就分析完了。之因此這個方法那麼複雜,我認爲有兩個緣由

  1. 是由於併發環境下要考慮各類操做的原子性,因此對於鎖都進行了 double check。
  2. 操做都是逐步升級,以最小的代價實現功能。

最後說說 LongAddr 的 sum() 方法,這個就很簡單了。

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
複製代碼

就是遍歷 cell 數組,累加 value 就行。LongAdder 餘下的方法就比較簡單,沒有什麼能夠討論的了。

LongAdder VS AtomicLong

看上去 LongAdder 性能全面超越了 AtomicLong。爲何 jdk 1.8 中仍是保留了 AtomicLong 的實現呢?

其實咱們能夠發現,LongAdder 使用了一個 cell 列表去承接併發的 cas,以提高性能,可是 LongAdder 在統計的時候若是有併發更新,可能致使統計的數據有偏差。

若是用於自增 id 的生成,就不適合使用 LongAdder 了。這個時候使用 AtomicLong 就是一個明智的選擇。

而在 Sentinel 中 LongAdder 承擔的只是統計任務,且容許偏差。

總結

LongAdder 使用了一個比較簡單的原理,解決了 AtomicLong 類,在極高競爭下的性能問題。可是 LongAdder 的具體實現卻很是精巧和細緻,分散競爭,逐步升級競爭的解決方案,至關漂亮,值得咱們細細品味。

歡迎關注個人微信公衆號:

二維碼
相關文章
相關標籤/搜索