(1)java8中爲何要新增LongAdder?java
(2)LongAdder的實現方式?數組
(3)LongAdder與AtomicLong的對比?多線程
LongAdder是java8中新增的原子類,在多線程環境中,它比AtomicLong性能要高出很多,特別是寫多的場景。app
它是怎麼實現的呢?讓咱們一塊兒來學習吧。less
LongAdder的原理是,在最初無競爭時,只更新base的值,當有多線程競爭時經過分段的思想,讓不一樣的線程更新不一樣的段,最後把這些段相加就獲得了完整的LongAdder存儲的值。dom
LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內部類和各重要屬性。ide
// Striped64中的內部類,使用@sun.misc.Contended註解,說明裏面的值消除僞共享 @sun.misc.Contended static final class Cell { // 存儲元素的值,使用volatile修飾保證可見性 volatile long value; Cell(long x) { value = x; } // CAS更新value的值 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe實例 private static final sun.misc.Unsafe UNSAFE; // value字段的偏移量 private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
Cell類使用@sun.misc.Contended註解,說明是要避免僞共享的。源碼分析
使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性。性能
關於Unsafe的介紹請查看【死磕 java魔法類之Unsafe解析】。學習
關於僞共享的介紹請查看【雜談 什麼是僞共享(false sharing)?】。
// 這三個屬性都在Striped64中 // cells數組,存儲各個段的值 transient volatile Cell[] cells; // 最初無競爭時使用的,也算一個特殊的段 transient volatile long base; // 標記當前是否有線程在建立或擴容cells,或者在建立Cell // 經過CAS更新該值,至關因而一個鎖 transient volatile int cellsBusy;
最初無競爭或有其它線程在建立cells數組時使用base更新值,有過競爭時使用cells更新值。
最初無競爭是指一開始沒有線程之間的競爭,但也有多是多線程在操做,只是這些線程沒有同時去更新base的值。
有過競爭是指只要出現過競爭無論後面有沒有競爭都使用cells更新值,規則是不一樣的線程hash到不一樣的cell上去更新,減小競爭。
add(x)方法是LongAdder的主要方法,使用它可使LongAdder中存儲的值增長x,x可爲正可爲負。
public void add(long x) { // as是Striped64中的cells屬性 // b是Striped64中的base屬性 // v是當前線程hash到的Cell中存儲的值 // m是cells的長度減1,hash時做爲掩碼使用 // a是當前線程hash到的Cell Cell[] as; long b, v; int m; Cell a; // 條件1:cells不爲空,說明出現過競爭,cells已經建立 // 條件2:cas操做base失敗,說明其它線程先一步修改了base,正在出現競爭 if ((as = cells) != null || !casBase(b = base, b + x)) { // true表示當前競爭還不激烈 // false表示競爭激烈,多個線程hash到同一個Cell,可能要擴容 boolean uncontended = true; // 條件1:cells爲空,說明正在出現競爭,上面是從條件2過來的 // 條件2:應該不會出現 // 條件3:當前線程所在的Cell爲空,說明當前線程尚未更新過Cell,應初始化一個Cell // 條件4:更新當前線程所在的Cell失敗,說明如今競爭很激烈,多個線程hash到了同一個Cell,應擴容 if (as == null || (m = as.length - 1) < 0 || // getProbe()方法返回的是線程中的threadLocalRandomProbe字段 // 它是經過隨機數生成的一個值,對於一個肯定的線程這個值是固定的 // 除非刻意修改它 (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) // 調用Striped64中的方法處理 longAccumulate(x, null, uncontended); } }
(1)最初無競爭時只更新base;
(2)直到更新base失敗時,建立cells數組;
(3)當多個線程競爭同一個Cell比較激烈時,可能要擴容;
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 存儲線程的probe值 int h; // 若是getProbe()方法返回0,說明隨機數未初始化 if ((h = getProbe()) == 0) { // 強制初始化 ThreadLocalRandom.current(); // force initialization // 從新獲取probe值 h = getProbe(); // 都未初始化,確定還不存在競爭激烈 wasUncontended = true; } // 是否發生碰撞 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // cells已經初始化過 if ((as = cells) != null && (n = as.length) > 0) { // 當前線程所在的Cell未初始化 if ((a = as[(n - 1) & h]) == null) { // 當前無其它線程在建立或擴容cells,也沒有線程在建立Cell if (cellsBusy == 0) { // Try to attach new Cell // 新建一個Cell,值爲當前須要增長的值 Cell r = new Cell(x); // Optimistically create // 再次檢測cellsBusy,並嘗試更新它爲1 // 至關於當前線程加鎖 if (cellsBusy == 0 && casCellsBusy()) { // 是否建立成功 boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // 從新獲取cells,並找到當前線程hash到cells數組中的位置 // 這裏必定要從新獲取cells,由於as並不在鎖定範圍內 // 有可能已經擴容了,這裏要從新獲取 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 把上面新建的Cell放在cells的j位置處 rs[j] = r; // 建立成功 created = true; } } finally { // 至關於釋放鎖 cellsBusy = 0; } // 建立成功了就返回 // 值已經放在新建的Cell裏面了 if (created) break; continue; // Slot is now non-empty } } // 標記當前未出現衝突 collide = false; } // 當前線程所在的Cell不爲空,且更新失敗了 // 這裏簡單地設爲true,至關於簡單地自旋一次 // 經過下面的語句修改線程的probe再從新嘗試 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 再次嘗試CAS更新當前線程所在Cell的值,若是成功了就返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 若是cells數組的長度達到了CPU核心數,或者cells擴容了 // 設置collide爲false並經過下面的語句修改線程的probe再從新嘗試 else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 上上個elseif都更新失敗了,且上個條件不成立,說明出現衝突了 else if (!collide) collide = true; // 明確出現衝突了,嘗試佔有鎖,並擴容 else if (cellsBusy == 0 && casCellsBusy()) { try { // 檢查是否有其它線程已經擴容過了 if (cells == as) { // Expand table unless stale // 新數組爲原數組的兩倍 Cell[] rs = new Cell[n << 1]; // 把舊數組元素拷貝到新數組中 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 從新賦值cells爲新數組 cells = rs; } } finally { // 釋放鎖 cellsBusy = 0; } // 已解決衝突 collide = false; // 使用擴容後的新數組從新嘗試 continue; // Retry with expanded table } // 更新失敗或者達到了CPU核心數,從新生成probe,並重試 h = advanceProbe(h); } // 未初始化過cells數組,嘗試佔有鎖並初始化cells數組 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 是否初始化成功 boolean init = false; try { // Initialize table // 檢測是否有其它線程初始化過 if (cells == as) { // 新建一個大小爲2的Cell數組 Cell[] rs = new Cell[2]; // 找到當前線程hash到數組中的位置並建立其對應的Cell rs[h & 1] = new Cell(x); // 賦值給cells數組 cells = rs; // 初始化成功 init = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 初始化成功直接返回 // 由於增長的值已經同時建立到Cell中了 if (init) break; } // 若是有其它線程在初始化cells數組中,就嘗試更新base // 若是成功了就返回 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
(1)若是cells數組未初始化,當前線程會嘗試佔有cellsBusy鎖並建立cells數組;
(2)若是當前線程嘗試建立cells數組時,發現有其它線程已經在建立了,就嘗試更新base,若是成功就返回;
(3)經過線程的probe值找到當前線程應該更新cells數組中的哪一個Cell;
(4)若是當前線程所在的Cell未初始化,就佔有佔有cellsBusy鎖並在相應的位置建立一個Cell;
(5)嘗試CAS更新當前線程所在的Cell,若是成功就返回,若是失敗說明出現衝突;
(5)當前線程更新Cell失敗後並非當即擴容,而是嘗試更新probe值後再重試一次;
(6)若是在重試的時候仍是更新失敗,就擴容;
(7)擴容時當前線程佔有cellsBusy鎖,並把數組容量擴大到兩倍,再遷移原cells數組中元素到新數組中;
(8)cellsBusy在建立cells數組、建立Cell、擴容cells數組三個地方用到;
sum()方法是獲取LongAdder中真正存儲的值的大小,經過把base和全部段相加獲得。
public long sum() { Cell[] as = cells; Cell a; // sum初始等於base long sum = base; // 若是cells不爲空 if (as != null) { // 遍歷全部的Cell for (int i = 0; i < as.length; ++i) { // 若是所在的Cell不爲空,就把它的value累加到sum中 if ((a = as[i]) != null) sum += a.value; } } // 返回sum return sum; }
能夠看到sum()方法是把base和全部段的值相加獲得,那麼,這裏有一個問題,若是前面已經累加到sum上的Cell的value有修改,不是就無法計算到了麼?
答案確實如此,因此LongAdder能夠說不是強一致性的,它是最終一致性的。
直接上代碼:
public class LongAdderVSAtomicLongTest { public static void main(String[] args){ testAtomicLongVSLongAdder(1, 10000000); testAtomicLongVSLongAdder(10, 10000000); testAtomicLongVSLongAdder(20, 10000000); testAtomicLongVSLongAdder(40, 10000000); testAtomicLongVSLongAdder(80, 10000000); } static void testAtomicLongVSLongAdder(final int threadCount, final int times){ try { System.out.println("threadCount:" + threadCount + ", times:" + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms"); long start2 = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } } static void testAtomicLong(final int threadCount, final int times) throws InterruptedException { AtomicLong atomicLong = new AtomicLong(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ atomicLong.incrementAndGet(); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } static void testLongAdder(final int threadCount, final int times) throws InterruptedException { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ longAdder.add(1); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } }
運行結果以下:
threadCount:1, times:10000000 LongAdder elapse:158ms AtomicLong elapse:64ms threadCount:10, times:10000000 LongAdder elapse:206ms AtomicLong elapse:2449ms threadCount:20, times:10000000 LongAdder elapse:429ms AtomicLong elapse:5142ms threadCount:40, times:10000000 LongAdder elapse:840ms AtomicLong elapse:10506ms threadCount:80, times:10000000 LongAdder elapse:1369ms AtomicLong elapse:20482ms
能夠看到當只有一個線程的時候,AtomicLong反而性能更高,隨着線程愈來愈多,AtomicLong的性能急劇降低,而LongAdder的性能影響很小。
(1)LongAdder經過base和cells數組來存儲值;
(2)不一樣的線程會hash到不一樣的cell上去更新,減小了競爭;
(3)LongAdder的性能很是高,最終會達到一種無競爭的狀態;
在longAccumulate()方法中有個條件是n >= NCPU
就不會走到擴容邏輯了,而n是2的倍數,那是否是表明cells數組最大隻能達到大於等於NCPU的最小2次方?
答案是明確的。由於同一個CPU核心同時只會運行一個線程,而更新失敗了說明有兩個不一樣的核心更新了同一個Cell,這時會從新設置更新失敗的那個線程的probe值,這樣下一次它所在的Cell很大機率會發生改變,若是運行的時間足夠長,最終會出現同一個核心的全部線程都會hash到同一個Cell(大機率,但不必定全在一個Cell上)上去更新,因此,這裏cells數組中長度並不須要太長,達到CPU核心數足夠了。
好比,筆者的電腦是8核的,因此這裏cells的數組最大隻會到8,達到8就不會擴容了。
歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。