Hystrix須要根據過去一段時間內失敗的請求次數來判斷是否打開熔斷開關,因此它會維護一個時間窗口,並不斷向該窗口中累加失敗請求次數,在多線程環境下通常會使用AtomicLong,可是Hystrix中使用的是LongAdder。查了一下,發如今Hystrix,Guava,JDK8中都有這個類,應該是Java8中才加到標準庫中,其餘庫要兼容老版本只能本身複製一份了。Hystrix和Java8中的LongAdder具體實現有細微差異,不過總體思路是同樣的,下面的分析都是以jdk爲準的。html
爲何Hystrix使用LongAdder而不是AtomicLong呢?在LongAdder的Java doc中有java
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
在存在高度競爭的條件下,LongAdder的性能會遠遠好於AtomicLong,不過會消耗更多空間。高度競爭固然是指在多線程條件下。程序員
咱們知道AtomicLong是經過cas來更新值的,按理說是很快的,LongAdder爲何會比它更快,是還有其餘什麼更快的手段嗎?先無論這些,直接實驗一下,看是否是真的更快。編程
public class TestAtomic { private static final int TASK_NUM = 1000; private static final int INCREMENT_PER_TASK = 10000; private static final int REPEAT = 10; private static long l = 0; public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); } public static void testAtomicLong() { AtomicLong al = new AtomicLong(0); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> al.incrementAndGet())); } public static void testLong() { l = 0; execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testLongAdder() { LongAdder adder = new LongAdder(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> adder.add(1))); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s\n", Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void execute(int n, Runnable task) { try { CountDownLatch latch = new CountDownLatch(n); ExecutorService service = Executors.newFixedThreadPool(100); Runnable taskWrapper = () -> { task.run(); latch.countDown(); }; service.invokeAll(cloneTask(n, taskWrapper)); latch.await(); service.shutdown(); } catch (Exception e) {} } private static Collection<Callable<Void>> cloneTask(int n, Runnable task) { return ntimes(n).mapToObj(x -> new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }).collect(Collectors.toList()); } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }
上面是用1000個併發任務,每一個任務對數據累加10000次,每一個實驗測試10次。數組
輸出:緩存
total: 1939, [258, 196, 200, 174, 186, 178, 204, 189, 185, 169]
多線程
total: 613, [57, 45, 47, 53, 69, 61, 80, 67, 64, 70]
併發
total: 1131, [85, 67, 77, 81, 280, 174, 108, 67, 99, 93]
app
從上往下依次是AtomicLong, LongAdder, long。less
從結果能看到LongAdder確實性能高於AtomicLong,不過還有一個讓我很是吃驚的結果,就是LongAdder居然比直接累加long還快(固然直接累加long最終獲得的結果是錯誤的,由於沒有同步),這個有些反常識了,其實這裏涉及到了一些隱藏的問題,就是cache的false sharing,由於平時編程時不太會關注cache這些,因此碰到這個結果會出乎預料,詳細的解釋在後面的第三節。
先來分析一下LongAdder爲何會比AtomicLong快,是否是用到了什麼比cas還快的東西。
LongAdder的父類Striped64的註釋中已經將整個類的設計講的很清楚的了,類中主要維護兩個值,一個long型的base屬性,一個Cell數組,它們值的和纔是真正的結果。Cell是對long的一個包裝,爲何將long包裝起來,猜想有兩個緣由:1)能夠在類中添加padding數據,避免false sharing,2)包裝起來纔好使用cas。
LongAdder.add的流程簡單描述就是,先嚐試經過cas修改base,成功則返回,失敗則根據當前線程hash值從Cell數組中選擇一個Cell,而後向Cell中add數據。Cell數組是動態增加的,而且是用時才初始化的,這是爲了不佔用過多空間。
看到註釋大概能猜到爲何快了,LongAdder仍然用的cas,快是由於在高度競爭的條件下,對一個值進行修改,衝突的機率很高,須要不斷cas,致使時間浪費在循環上,若是將一個值拆分爲多個值,分散壓力,那麼性能就會有所提升。
下面來看源碼,進入LongAdder的add方法:
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
上面先對base進行cas操做,而後判斷Cell數組是否爲空,不爲空則根據當前線程probe值(相似hash值)選擇Cell並進行cas,都不成功進入longAccumulate方法。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // (1) if ((a = as[(n - 1) & h]) == null) { // (1.1) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (1.2) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // (1.3) try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // (2) boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (3) break; // Fall back on using base } }
Cell數組不爲空時進入分支(1),若是根據當前線程hash得到的Cell爲null,則進入(1.1)開始實例化該Cell,不然進入(1.2)對Cell進行cas,不成功的話表示衝突比較多,開始進入(1.3)對Cell數組擴容了,cellsBusy是用cas實現的一個spinlock;
Cell數組爲空且獲取到cellsBusy時進入分支(2),開始初始化Cell數組;
分支(1)和(2)都進不去,沒辦法,只能再次對base進行cas。
上面只是對源碼作了粗略的分析,詳細的每一個分支的含義我也不知道,不過這些咱們都不須要較真去弄的很是清楚,畢竟世界上只有一個Doug Lea,咱們只須要知道LongAdder是怎麼比AtomicLong快的就行,實際就是用多個long來分擔壓力,一羣人到十個盤子裏夾菜固然比到一個盤子裏夾菜衝突小。
知道了原理,那咱們就本身來實現一個簡陋的LongAdder。
public class MyLong { private static final int LEN = 2 << 5; private AtomicLong[] atomicLongs = new AtomicLong[LEN]; public MyLong() { for (int i = 0; i < LEN; ++i) { atomicLongs[i] = new AtomicLong(0); } } public void add(long l) { atomicLongs[hash(Thread.currentThread()) & (LEN - 1)].addAndGet(l); } public void increment() { add(1); } public long get() { return Arrays.stream(atomicLongs).mapToLong(al -> al.get()).sum(); } // 從HashMap裏抄過來的 private static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } }
在最上面的TestAtomic類中加上方法:
public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testMyLong()); } public static void testMyLong() { MyLong myLong = new MyLong(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> myLong.increment())); }
輸出:
total: 1907, [176, 211, 192, 182, 195, 173, 199, 229, 184, 166] total: 641, [67, 50, 45, 53, 73, 58, 80, 63, 69, 83] total: 947, [90, 82, 70, 72, 87, 78, 136, 107, 77, 148] total: 670, [81, 80, 73, 67, 57, 94, 62, 49, 57, 50]
能夠看到性能比AtomicLong好多了。
前面解釋了LongAdder比AtomicLong快,可是爲何它還會比long快?解答這個問題以前要先介紹僞共享的概念。
在計算機中尋址是以字節爲單位,可是cache從內存中複製數據是以行爲單位的,一個行會包含多個字節,通常爲64字節,每一個CPU有本身的L一、L2 cache,如今有兩個變量x、y在同一行中,若是CPU1修改x,緩存一致性要求數據修改須要立刻反應到其餘對應副本上,CPU2 cache對應行從新刷新,而後CPU2才能訪問y,若是CPU1一直修改x,CPU2一直訪問y,那麼CPU2得一直等到cache刷新後才能訪問y,帶來性能降低,產生這個問題的緣由有兩方面:1)x、y位於同一行,2)兩個CPU會頻繁的訪問這兩個數據,若是這兩個條件其中一個不成立,那就不會產生問題。更多關於僞共享的概念參考僞共享(False Sharing)和(false sharing(wiki)。
既然這個問題出現了,那確定是有解決辦法的。通常就是添加padding數據,來將x、y隔開,讓它們不會位於同一行中。
Java中的話,在Java7以前須要手動添加padding數據,後來JEP 142提案提出應該爲程序員提供某種方式來標明哪些字段是會存在緩存競爭的,而且虛擬機可以根據這些標識來避免這些字段位於同一行中,程序員不用再手動填充padding數據。
@Contended就是應JEP 142而生的,在字段或類上標準該註解,就表示編譯器或虛擬機須要在這些數據周圍添加padding數據。Java8的僞共享和緩存行填充--@Contended註釋中詳細解釋了@Contended註解的使用方法,在百度或者谷歌上搜索 jep 142 site:mail.openjdk.java.net
能找到不少@Contended相關資料。
下面實驗一下來觀察false sharing:
public class TestContended { private static int NCPU = Runtime.getRuntime().availableProcessors(); private static ForkJoinPool POOL = new ForkJoinPool(NCPU); private static int INCREMENT_PER_TASK = 1000000; private static final int REPEAT = 10; private static long l = 0; private static long l1 = 0; private static long l2 = 0; private static long cl1 = 0; private static volatile long q0, q1, q2, q3, q4, q5, q6; private static long cl2 = 0; public static void main(String[] args) { repeatWithStatics(REPEAT, () -> testLongWithSingleThread()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testTwoLong()); repeatWithStatics(REPEAT, () -> testTwoContendedLong()); } public static void testLongWithSingleThread() { repeat(2 * INCREMENT_PER_TASK, () -> l++); } public static void testLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l++), () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testTwoLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l1++), () -> repeat(INCREMENT_PER_TASK, () -> l2++)); } public static void testTwoContendedLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> cl1++), () -> repeat(INCREMENT_PER_TASK, () -> cl2++)); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s\n", Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void asyncExecute2Task(Runnable task1, Runnable task2) { try { CompletableFuture.runAsync(task1, POOL) .thenCombine(CompletableFuture.runAsync(task2, POOL), (r1, r2) -> 0).get(); } catch (Exception e) {} } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }
不知道爲何我用不了@Contended註解,即便啓動參數加上-XX:-RestrictContended也不行,因此只能手工添加padding數據。目前緩存行大小通常爲64字節(也能夠經過CPUZ來查看),也就是填充7個long就能夠將兩個long型數據隔離在兩個緩存行中了。
輸出:
total: 16, [9, 5, 1, 0, 0, 0, 0, 1, 0, 0] total: 232, [35, 35, 33, 24, 25, 23, 13, 15, 15, 14] total: 148, [17, 15, 14, 16, 14, 15, 13, 17, 12, 15] total: 94, [8, 8, 8, 8, 15, 9, 10, 11, 8, 9]
從上往下依次爲:1)單線程累加一個long;2)兩個線程累加一個long;3)兩個線程累加兩個long,這兩個long位於同一緩存行中;4)兩個線程累加兩個long,且它們位於不一樣緩存行中。
從上面的結果看,padding仍是頗有效的。結果2相比於1,不只會有線程切換代價還會有false sharing問題,對於純計算型任務線程個數不要超過CPU個數。不過有一點想不通的是,結果2和3爲何差距這麼大。
以上轉自公司同事「yuanzhongcheng」的分享