爲何Hystrix使用LongAdder而不是AtomicLong呢?在LongAdder的Java doc中有java
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
public class TestAtomic { private static final int TASK_NUM = 1000; private static final int INCREMENT_PER_TASK = 10000; private static final int REPEAT = 10; private static long l = 0; public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); } public static void testAtomicLong() { AtomicLong al = new AtomicLong(0); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> al.incrementAndGet())); } public static void testLong() { l = 0; execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testLongAdder() { LongAdder adder = new LongAdder(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> adder.add(1))); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s\n", Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void execute(int n, Runnable task) { try { CountDownLatch latch = new CountDownLatch(n); ExecutorService service = Executors.newFixedThreadPool(100); Runnable taskWrapper = () -> { task.run(); latch.countDown(); }; service.invokeAll(cloneTask(n, taskWrapper)); latch.await(); service.shutdown(); } catch (Exception e) {} } private static Collection<Callable<Void>> cloneTask(int n, Runnable task) { return ntimes(n).mapToObj(x -> new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }).collect(Collectors.toList()); } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }
total: 1939, [258, 196, 200, 174, 186, 178, 204, 189, 185, 169]
total: 613, [57, 45, 47, 53, 69, 61, 80, 67, 64, 70]
total: 1131, [85, 67, 77, 81, 280, 174, 108, 67, 99, 93]
從上往下依次是AtomicLong, LongAdder, long。less
從結果能看到LongAdder確實性能高於AtomicLong,不過還有一個讓我很是吃驚的結果,就是LongAdder居然比直接累加long還快(固然直接累加long最終獲得的結果是錯誤的,由於沒有同步),這個有些反常識了,其實這裏涉及到了一些隱藏的問題,就是cache的false sharing,由於平時編程時不太會關注cache這些,因此碰到這個結果會出乎預料,詳細的解釋在後面的第三節。
LongAdder的父類Striped64的註釋中已經將整個類的設計講的很清楚的了,類中主要維護兩個值,一個long型的base屬性,一個Cell數組,它們值的和纔是真正的結果。Cell是對long的一個包裝,爲何將long包裝起來,猜想有兩個緣由:1)能夠在類中添加padding數據,避免false sharing,2)包裝起來纔好使用cas。
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // (1) if ((a = as[(n - 1) & h]) == null) { // (1.1) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (1.2) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // (1.3) try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // (2) boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (3) break; // Fall back on using base } }
上面只是對源碼作了粗略的分析,詳細的每一個分支的含義我也不知道,不過這些咱們都不須要較真去弄的很是清楚,畢竟世界上只有一個Doug Lea,咱們只須要知道LongAdder是怎麼比AtomicLong快的就行,實際就是用多個long來分擔壓力,一羣人到十個盤子裏夾菜固然比到一個盤子裏夾菜衝突小。
public class MyLong { private static final int LEN = 2 << 5; private AtomicLong[] atomicLongs = new AtomicLong[LEN]; public MyLong() { for (int i = 0; i < LEN; ++i) { atomicLongs[i] = new AtomicLong(0); } } public void add(long l) { atomicLongs[hash(Thread.currentThread()) & (LEN - 1)].addAndGet(l); } public void increment() { add(1); } public long get() { return Arrays.stream(atomicLongs).mapToLong(al -> al.get()).sum(); } // 從HashMap裏抄過來的 private static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } }
public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testMyLong()); } public static void testMyLong() { MyLong myLong = new MyLong(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> myLong.increment())); }
total: 1907, [176, 211, 192, 182, 195, 173, 199, 229, 184, 166] total: 641, [67, 50, 45, 53, 73, 58, 80, 63, 69, 83] total: 947, [90, 82, 70, 72, 87, 78, 136, 107, 77, 148] total: 670, [81, 80, 73, 67, 57, 94, 62, 49, 57, 50]
在計算機中尋址是以字節爲單位,可是cache從內存中複製數據是以行爲單位的,一個行會包含多個字節,通常爲64字節,每一個CPU有本身的L一、L2 cache,如今有兩個變量x、y在同一行中,若是CPU1修改x,緩存一致性要求數據修改須要立刻反應到其餘對應副本上,CPU2 cache對應行從新刷新,而後CPU2才能訪問y,若是CPU1一直修改x,CPU2一直訪問y,那麼CPU2得一直等到cache刷新後才能訪問y,帶來性能降低,產生這個問題的緣由有兩方面:1)x、y位於同一行,2)兩個CPU會頻繁的訪問這兩個數據,若是這兩個條件其中一個不成立,那就不會產生問題。更多關於僞共享的概念參考僞共享(False Sharing)和(false sharing(wiki)。
Java中的話,在Java7以前須要手動添加padding數據,後來JEP 142提案提出應該爲程序員提供某種方式來標明哪些字段是會存在緩存競爭的,而且虛擬機可以根據這些標識來避免這些字段位於同一行中,程序員不用再手動填充padding數據。
@Contended就是應JEP 142而生的,在字段或類上標準該註解,就表示編譯器或虛擬機須要在這些數據周圍添加padding數據。Java8的僞共享和緩存行填充--@Contended註釋中詳細解釋了@Contended註解的使用方法,在百度或者谷歌上搜索 jep 142 site:mail.openjdk.java.net
下面實驗一下來觀察false sharing:
public class TestContended { private static int NCPU = Runtime.getRuntime().availableProcessors(); private static ForkJoinPool POOL = new ForkJoinPool(NCPU); private static int INCREMENT_PER_TASK = 1000000; private static final int REPEAT = 10; private static long l = 0; private static long l1 = 0; private static long l2 = 0; private static long cl1 = 0; private static volatile long q0, q1, q2, q3, q4, q5, q6; private static long cl2 = 0; public static void main(String[] args) { repeatWithStatics(REPEAT, () -> testLongWithSingleThread()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testTwoLong()); repeatWithStatics(REPEAT, () -> testTwoContendedLong()); } public static void testLongWithSingleThread() { repeat(2 * INCREMENT_PER_TASK, () -> l++); } public static void testLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l++), () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testTwoLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l1++), () -> repeat(INCREMENT_PER_TASK, () -> l2++)); } public static void testTwoContendedLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> cl1++), () -> repeat(INCREMENT_PER_TASK, () -> cl2++)); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s\n", Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void asyncExecute2Task(Runnable task1, Runnable task2) { try { CompletableFuture.runAsync(task1, POOL) .thenCombine(CompletableFuture.runAsync(task2, POOL), (r1, r2) -> 0).get(); } catch (Exception e) {} } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }
total: 16, [9, 5, 1, 0, 0, 0, 0, 1, 0, 0] total: 232, [35, 35, 33, 24, 25, 23, 13, 15, 15, 14] total: 148, [17, 15, 14, 16, 14, 15, 13, 17, 12, 15] total: 94, [8, 8, 8, 8, 15, 9, 10, 11, 8, 9]
從上面的結果看,padding仍是頗有效的。結果2相比於1,不只會有線程切換代價還會有false sharing問題,對於純計算型任務線程個數不要超過CPU個數。不過有一點想不通的是,結果2和3爲何差距這麼大。