本文轉自 https://blog.csdn.net/u011392897/article/details/60480108 java
高併發下計數,通常最早想到的應該是AtomicLong/AtomicInt,AtmoicXXX使用硬件級別的指令 CAS 來更新計數器的值,這樣能夠避免加鎖,機器直接支持的指令,效率也很高。可是AtomicXXX中的 CAS 操做在出現線程競爭時,失敗的線程會白白地循環一次,在併發很大的狀況下,由於每次CAS都只有一個線程能成功,競爭失敗的線程會很是多。失敗次數越多,循環次數就越多,不少線程的CAS操做愈來愈接近 自旋鎖(spin lock)。計數操做原本是一個很簡單的操做,實際須要耗費的cpu時間應該是越少越好,AtomicXXX在高併發計數時,大量的cpu時間都浪費會在 自旋 上了,這很浪費,也下降了實際的計數效率。數組
// 很簡單的一個類,這個類能夠當作是一個簡化的AtomicLong // 經過cas操做來更新value的值 // @sun.misc.Contended是一個高端的註解,表明使用緩存行填來避免僞共享,能夠本身網上搜下,這個我就不細說了 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics Unsafe相關的初始化 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
四個實現類的區別就上面這兩句話,這裏只講LongAdder一個類。緩存
2、核心實現Striped64多線程
// 很簡單的一個類,這個類能夠當作是一個簡化的AtomicLong // 經過cas操做來更新value的值 // @sun.misc.Contended是一個高端的註解,表明使用緩存行填來避免僞共享,能夠本身網上搜下,這個我就不細說了 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics Unsafe相關的初始化 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
二、Striped64主體代碼併發
abstract class Striped64 extends Number { @sun.misc.Contended static final class Cell { ... } /** Number of CPUS, to place bound on table size */ static final int NCPU = Runtime.getRuntime().availableProcessors(); // cell數組,長度同樣要是2^n,能夠類比爲jdk1.7的ConcurrentHashMap中的segments數組 transient volatile Cell[] cells; // 累積器的基本值,在兩種狀況下會使用: // 一、沒有遇到併發的狀況,直接使用base,速度更快; // 二、多線程併發初始化table數組時,必需要保證table數組只被初始化一次,所以只有一個線程可以競爭成功,這種狀況下競爭失敗的線程會嘗試在base上進行一次累積操做 transient volatile long base; // 自旋標識,在對cells進行初始化,或者後續擴容時,須要經過CAS操做把此標識設置爲1(busy,忙標識,至關於加鎖),取消busy時能夠直接使用cellsBusy = 0,至關於釋放鎖 transient volatile int cellsBusy; Striped64() { } // 使用CAS更新base的值 final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } // 使用CAS將cells自旋標識更新爲1 // 更新爲0時能夠不用CAS,直接使用cellsBusy就行 final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } // 下面這兩個方法是ThreadLocalRandom中的方法,不過由於包訪問關係,這裏又從新寫一遍 // probe翻譯過來是探測/探測器/探針這些,很差理解,它是ThreadLocalRandom裏面的一個屬性, // 不過並不影響對Striped64的理解,這裏能夠把它理解爲線程自己的hash值 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } // 至關於rehash,從新算一遍線程的hash值 static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } /** * 核心方法的實現,此方法建議在外部進行一次CAS操做(cell != null時嘗試CAS更新base值,cells != null時,CAS更新hash值取模後對應的cell.value) * @param x the value 前面我說的二元運算中的第二個操做數,也就是外部提供的那個操做數 * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder). * 外部提供的二元算術操做,實例持有而且只能有一個,生命週期內保持不變,null表明LongAdder這種特殊可是最經常使用的狀況,能夠減小一次方法調用 * @param wasUncontended false if CAS failed before call 若是爲false,代表調用者預先調用的一次CAS操做都失敗了 */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 這個if至關於給線程生成一個非0的hash值 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty 若是hash取模映射獲得的Cell單元不是null,則爲true,此值也能夠看做是擴容意向,感受這個更好理解 for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells已經被初始化了 if ((a = as[(n - 1) & h]) == null) { // hash取模映射獲得的Cell單元還爲null(爲null表示尚未被使用) if (cellsBusy == 0) { // Try to attach new Cell 若是沒有線程正在執行擴容 Cell r = new Cell(x); // Optimistically create 先建立新的累積單元 if (cellsBusy == 0 && casCellsBusy()) { // 嘗試加鎖 boolean created = false; try { // Recheck under lock 在有鎖的狀況下再檢測一遍以前的判斷 Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 考慮別的線程可能執行了擴容,這裏從新賦值從新判斷 rs[j] = r; // 對沒有使用的Cell單元進行累積操做(第一次賦值至關因而累積上一個操做數,求和時再和base執行一次運算就獲得實際的結果) created = true; } } finally { cellsBusy = 0; 清空自旋標識,釋放鎖 } if (created) // 若是本來爲null的Cell單元是由本身進行第一次累積操做,那麼任務已經完成了,因此能夠退出循環 break; continue; // Slot is now non-empty 不是本身進行第一次累積操做,重頭再來 } } collide = false; // 執行這一句是由於cells被加鎖了,不能往下繼續執行第一次的賦值操做(第一次累積),因此還不能考慮擴容 } else if (!wasUncontended) // CAS already known to fail 前面一次CAS更新a.value(進行一次累積)的嘗試已經失敗了,說明已經發生了線程競爭 wasUncontended = true; // Continue after rehash 狀況失敗標識,後面去從新算一遍線程的hash值 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 嘗試CAS更新a.value(進行一次累積) ------ 標記爲分支A break; // 成功了就完成了累積任務,退出循環 else if (n >= NCPU || cells != as) // cell數組已是最大的了,或者中途發生了擴容操做。由於NCPU不必定是2^n,因此這裏用 >= collide = false; // At max size or stale 長度n是遞增的,執行到了這個分支,說明n >= NCPU會永遠爲true,下面兩個else if就永遠不會被執行了,也就永遠不會再進行擴容 // CPU可以並行的CAS操做的最大數量是它的核心數(CAS在x86中對應的指令是cmpxchg,多核須要經過鎖緩存來保證總體原子性),當n >= NCPU時,再出現幾個線程映射到同一個Cell致使CAS競爭的狀況,那就真不關擴容的事了,徹底是hash值的鍋了 else if (!collide) // 映射到的Cell單元不是null,而且嘗試對它進行累積時,CAS競爭失敗了,這時候把擴容意向設置爲true // 下一次循環若是仍是跟這一次同樣,說明競爭很嚴重,那麼就真正擴容 collide = true; // 把擴容意向設置爲true,只有這裏纔會給collide賦值爲true,也只有執行了這一句,纔可能執行後面一個else if進行擴容 else if (cellsBusy == 0 && casCellsBusy()) { // 最後再考慮擴容,能到這一步說明競爭很激烈,嘗試加鎖進行擴容 ------ 標記爲分支B try { if (cells == as) { // Expand table unless stale 檢查下是否被別的線程擴容了(CAS更新鎖標識,處理不了ABA問題,這裏再檢查一遍) Cell[] rs = new Cell[n << 1]; // 執行2倍擴容 for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; // 釋放鎖 } collide = false; // 擴容意向爲false continue; // Retry with expanded table 擴容後重頭再來 } h = advanceProbe(h); // 從新給線程生成一個hash值,下降hash衝突,減小映射到同一個Cell致使CAS競爭的狀況 } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells沒有被加鎖,而且它沒有被初始化,那麼就嘗試對它進行加鎖,加鎖成功進入這個else if boolean init = false; try { // Initialize table if (cells == as) { // CAS避免不了ABA問題,這裏再檢測一次,若是仍是null,或者空數組,那麼就執行初始化 Cell[] rs = new Cell[2]; // 初始化時只建立兩個單元 rs[h & 1] = new Cell(x); // 對其中一個單元進行累積操做,另外一個無論,繼續爲null cells = rs; init = true; } } finally { cellsBusy = 0; // 清空自旋標識,釋放鎖 } if (init) // 若是某個本來爲null的Cell單元是由本身進行第一次累積操做,那麼任務已經完成了,因此能夠退出循環 break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // cells正在進行初始化時,嘗試直接在base上進行累加操做 break; // Fall back on using base 直接在base上進行累積操做成功了,任務完成,能夠退出循環了 } } // double的不講,更long的邏輯基本上是同樣的 final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended); // Unsafe mechanics Unsafe初始化 private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } }
看完這個在來看看第一點中我提的問題:既然Cell這麼簡單,爲何不直接用long value?app
public class LongAdder extends Striped64 implements Serializable { // 構造方法,什麼也不作,直接使用默認值,base = 0, cells = null public LongAdder() { } // add方法,根據父類的longAccumulate方法的要求,這裏要進行一次CAS操做 // (雖然這裏有兩個CAS,可是第一個CAS成功了就不會執行第二個,要執行第二個,第一個就被「短路」了不會被執行) // 在線程競爭不激烈時,這樣作更快 public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } public void increment() { add(1L); } public void decrement() { add(-1L); } // 返回累加的和,也就是「當前時刻」的計數值 // 此返回值可能不是絕對準確的,由於調用這個方法時還有其餘線程可能正在進行計數累加, // 方法的返回時刻和調用時刻不是同一個點,在有併發的狀況下,這個值只是近似準確的計數值 // 高併發時,除非全局加鎖,不然得不到程序運行中某個時刻絕對準確的值,可是全局加鎖在高併發狀況下是下下策 // 在不少的併發場景中,計數操做並非核心,這種狀況下容許計數器的值出現一點誤差,此時可使用LongAdder // 在必須依賴準確計數值的場景中,應該本身處理而不是使用通用的類 public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } // 重置計數器,只應該在明確沒有併發的狀況下調用,能夠用來避免從新new一個LongAdder public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } } // 至關於sum()後再調用reset() public long sumThenReset() { Cell[] as = cells; Cell a; long sum = base; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; a.value = 0L; } } } return sum; } // 其餘的不說了 }