AtomicLong
經過 CAS 提供了非阻塞的原子性操做,性能比使用同步鎖好多了。可是在高併發狀況下,大量線程爭奪同一個原子變量,只有一個線程的 CAS 能操做成功,其餘線程會不停地 CAS 自旋,極度浪費 CPU 資源。java
爲了解決這個問題,JDK8 提供了一個類 LongAdder
。把一個變量分紅多個變量,讓一樣多的線程去競爭多個資源,就解決了性能問題。數組
LongAdder
在內部維護了多個 Cell
原子變量,另外,多個線程在爭奪同一個 Cell
原子變量時若是失敗了,並非在當前 Cell
變量上一直嘗試,而是嘗試對其餘 Cell
變量進行 CAS 操做。最後,在獲取 LongAdder
當前值時,是把全部 Cell
變量的 value 值累加後再加上 base 返回的。下面咱們來看看 LongAdder
的使用及源碼。安全
public class LongAdderTest { static LongAdder longAdder = new LongAdder(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new Thread(()->{ for (int j = 0; j < 10; j++) { longAdder.add(10); } }).start(); } Thread.sleep(2000); // 保證前面的線程執行完 System.out.println(longAdder.sum()); } } 複製代碼
毫無疑問,輸出的結果確定是 1000,LongAdder
是線程安全的。markdown
LongAdder
是繼承自 Striped64
,咱們來看看 Striped64
的主要成員變量。多線程
abstract class Striped64 extends Number { // 大小是 2 的 n 次方 transient volatile Cell[] cells; // 基礎值 transient volatile long base; // 一個標識,狀態只有 0 和 1,爲 1 表示 cells 數組在初始化或者擴容 transient volatile int cellsBusy; @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe 機制 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; // 獲取 Cell 實例中變量 value 的內存偏移量 valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } // Unsafe 機制 private static final sun.misc.Unsafe UNSAFE; // 記錄 Striped64 實例中變量 base 的偏移量 private static final long BASE; // 記錄 Striped64 實例中變量 cellsBusy 的偏移量 private static final long CELLSBUSY; // 記錄 Thread 實例中變量 threadLocalRandomProbe 的偏移量,用於計算訪問下標 private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } } 複製代碼
Unsafe 機制如今應該很是熟悉了,就是用於獲取變量在實例中的偏移量,用於 CAS 操做。併發
@sun.misc.Contended
能夠解決僞共享問題, Cell
內部有一個聲明爲 volatile
的變量,並經過 CAS 更新該值,保證了更新操做的原子性。Cell
是 AtomicLong
的優化。保證了線程操做 Cell
元素的原子性。app
Cell
數組的大小必定爲 2 的 n 次方。dom
threadLocalRandomProbe
用來計算當前線程訪問 Cell
數組的下標。ide
cellsBusy
做爲標識鎖變量,狀態只有 0 和 1 ,爲 1 表示 Cell
數組正在初始化或者擴容,其餘線程則不能進行初始化或者擴容。高併發
咱們應該關注下面幾個問題:
Cell
數組裏的哪一個元素。Cell
數組。Cell
數組什麼時候擴容Cell
元素有衝突時怎麼辦public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { // 若是 Cell 數組爲空,則直接返回 base for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } 複製代碼
累加全部 Cell
內部的 value 值,而後再累加上 base。因爲沒有對 Cell
數組進行加鎖,因此在累加的過程當中,可能有其餘線程對 Cell
數組的內容進行了修改,返回的值可能不是很精確。
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { // ( 1 ) boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || // ( 2 ) (a = as[getProbe() & m]) == null || // ( 3 ) !(uncontended = a.cas(v = a.value, v + x))) // ( 4 ) longAccumulate(x, null, uncontended); // ( 5 ) } } 複製代碼
( 1 ) 判斷 cells 是否爲 null,若是爲 null,則直接在 base 上進行累加,此時相似 AtomicLong
,若是 cells 不爲 null 或者 CAS 操做失敗,則進行下面的操做。剛開始併發線程較少時,全部的累加操做都是對 base 變量進行,當某個線程第一次 CAS 操做失敗時,則進行初始化 Cell
數組。
( 2 ) 、 ( 3 ) 決定當前線程應該訪問 Cell
數組裏的哪一個元素 (解決了問題 1 ) ,經過 getProbe() & m
計算的,getProbe
用於獲取當前線程的 ThreadLocalRandomProbe
的值, m 是 Cell
數組的長度。若是訪問的 Cell
數組元素爲 null,則執行代碼 ( 5 )。
若是該 Cell
元素存在則執行代碼 ( 4 ),經過 CAS 更新 Cell
元素的 value 值,若是更新失敗則執行代碼 ( 5 ),而且 uncontended
的值爲 false。
longAccumulate
是涉及處理初始化、擴容、解決衝突的方法,在 Striped64
類中。咱們能在裏面找到問題 2 、 3 、 4 的答案。
int h; // 記錄當前線程的 threadLocalRandomProbe 的值 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // 初始化當前線程的 threadLocalRandomProbe 的值 h = getProbe(); wasUncontended = true; } 複製代碼
最開始是判斷當前線程的 threadLocalRandomProbe
值是否爲 0 ,爲 0 則初始化 threadLocalRandomProbe
。這個變量用於計算當前線程應該被分配到 Cell
數組的哪一個下標。
boolean collide = false; for (;;) { // 無限循環 Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells 不爲 null 時 ... } // cells 爲 null 則進行初始化 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; // 初始化大小爲2 rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // 爲了能找到一個空閒的 Cell,從新計算 hash 值 h = advanceProbe(h); } 複製代碼
Cell
數組爲 null 時,則進行初始化。 初始化時經過 casCellsBusy()
將 cellsBusy
的值設爲 1,這時其餘線程就不能進行擴容或者初始化了。
初始化 Cell
數組的大小爲 2,並經過 h & 1
計算出當前線程訪問的 Cell
元素,並進行賦值操做。(回答了問題 2 )
boolean collide = false; // 爲 true 表示衝突 for (;;) { // 無限循環 Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { // 當前線程訪問的 cell 元素爲 null if (cellsBusy == 0) { Cell r = new Cell(x); // 初始化 Cell 元素 (採用了延遲加載,用到時才初始化 Cell) if (cellsBusy == 0 && casCellsBusy()) { // 獲取鎖變量 boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) // 以前 CAS 更新失敗 wasUncontended = true; // 當前線程訪問的 Cell 元素存在了,則進行 CAS 加操做 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // NCPU 表明當前機器的 CPU 個數,Cell 數組的元素不能超過 NCPU else if (n >= NCPU || cells != as) ( 7 ) collide = false; else if (!collide) ( 8 ) collide = true; // 擴容操做 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } // 爲了能找到一個空閒的 Cell,從新計算 hash 值 h = advanceProbe(h); ( 9 ) } } 複製代碼
( 7 ) 、 ( 8 ) 都不符合條件時,纔會執行擴容操做。即:當前 Cell
數組的個數小於 CPU 的個數 而且 多個線程間發生了衝突。
( 7 ) 處表示只有當前 Cell
數組的長度小於 CPU 的個數,才能擴容。爲何要作這樣的限制呢 :只有當每一個 CPU 運行一個線程時纔會使多線程的效果最佳,也就是當 Cell
數組元素的個數與 CPU 個數一致時,每一個 Cell
元素都用一個 CPU 處理,效果最佳。
( 8 ) 表示多個線程訪問了 Cell
數組中的同一個元素,或者多個線程嘗試獲取 cellsBusy
鎖變量 致使了衝突。
擴容時先獲取 cellsBusy
鎖變量,而後大小擴充爲原來的兩倍再複製原來的變量到新數組。(解決了問題 3)。
( 9 ) 處對 CAS 失敗的線程從新計算 threadLocalRandomProb
, 減小衝突機會。(解決了問題 4)
LongAccumulate()
源碼錶面複雜,可是隻要圍繞文章開頭題的四個問題去看,也不是很難。