本節咱們來研究下併發包中的Atomic類型。java
先來一把性能測試,對比一下AtomicLong(1.5出來的)、LongAdder(1.8出來的)和LongAccumulator(1.8出來的)用於簡單累加的性能。git
程序邏輯比較簡單,能夠看到咱們在最大併發10的狀況下,去作10億次操做測試:github
@Slf4j public class AccumulatorBenchmark { private StopWatch stopWatch = new StopWatch(); static final int threadCount = 100; static final int taskCount = 1000000000; static final AtomicLong atomicLong = new AtomicLong(); static final LongAdder longAdder = new LongAdder(); static final LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0L); @Test public void test() { Map<String, IntConsumer> tasks = new HashMap<>(); tasks.put("atomicLong", i -> atomicLong.incrementAndGet()); tasks.put("longAdder", i -> longAdder.increment()); tasks.put("longAccumulator", i -> longAccumulator.accumulate(1L)); tasks.entrySet().forEach(item -> benchmark(threadCount, taskCount, item.getValue(), item.getKey())); log.info(stopWatch.prettyPrint()); Assert.assertEquals(taskCount, atomicLong.get()); Assert.assertEquals(taskCount, longAdder.longValue()); Assert.assertEquals(taskCount, longAccumulator.longValue()); } private void benchmark(int threadCount, int taskCount, IntConsumer task, String name) { stopWatch.start(name); ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task)); forkJoinPool.shutdown(); try { forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); } }
結果以下:
安全
和官網說的差很少,在高併發的狀況下LongAdder性能會比AtomicLong好不少。微信
在不少開源代碼中咱們有看到AtomicReference的身影,它到底是幹什麼的呢?咱們來寫一段測試程序,在這個程序中咱們定一了一個Switch類型,做爲一個開關,而後寫三個死循環的線程來測試,當開關有效的時候會持續死循環,在2秒後關閉全部的三個開關:多線程
@Slf4j public class AtomicReferenceTest { private Switch rawValue = new Switch(); private volatile Switch volatileValue = new Switch(); private AtomicReference<Switch> atomicValue = new AtomicReference<>(new Switch()); @Test public void test() throws InterruptedException { new Thread(() -> { log.info("Start:rawValue"); while (rawValue.get()) { } log.info("Done:rawValue"); }).start(); new Thread(() -> { log.info("Start:volatileValue"); while (volatileValue.get()) { } log.info("Done:volatileValue"); }).start(); new Thread(() -> { log.info("Start:atomicValue"); while (atomicValue.get().get()) { } log.info("Done:atomicValue"); }).start(); Executors.newSingleThreadScheduledExecutor().schedule(rawValue::off, 2, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().schedule(volatileValue::off, 2, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().schedule(atomicValue.get()::off, 2, TimeUnit.SECONDS); TimeUnit.HOURS.sleep(1); } class Switch { private boolean enable = true; public boolean get() { return enable; } public void off() { enable = false; } } }
運行程序:
能夠看到2秒後有一個開關卡住了,線程沒有退出。這是一個可見性的問題,AtomicReference以及volatile能夠確保線程對數據的更新刷新到內存。由於咱們對於開關的關閉是在另外一個定時任務線程作的,若是咱們不使用volatile或AtomicReference來定義對象,那麼對象的操做可能沒法被其它線程感知到。固然,AtomicReference除了解決可見性問題還有更多AtomicXXX提供的其它功能。併發
下面咱們來看一下AtomicInteger的compareAndSet()功能。首先說明這個程序沒有任何意義,只是測試一下功能。在這個程序裏,咱們亂序開啓10個線程,每個線程的任務就是按照次序來累加數字。咱們使用AtomicInteger的compareAndSet()來確保亂序的線程也能按照咱們要的順序操做累加。dom
@Slf4j public class AtomicIntegerTest { @Test public void test() throws InterruptedException { AtomicInteger atomicInteger = new AtomicInteger(0); List<Thread> threadList = IntStream.range(0,10).mapToObj(i-> { Thread thread = new Thread(() -> { log.debug("Wait {}->{}", i, i+1); while (!atomicInteger.compareAndSet(i, i + 1)) { try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("Done {}->{}", i, i+1); }); thread.setName(UUID.randomUUID().toString()); return thread; }).sorted(Comparator.comparing(Thread::getName)).collect(Collectors.toList()); for (Thread thread : threadList) { thread.start(); } for (Thread thread : threadList) { thread.join(); } log.info("result:{}", atomicInteger.get()); } }
執行結果以下:高併發
11:46:30.611 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 4->5 11:46:30.611 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 5->6 11:46:30.612 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 9->10 11:46:30.612 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 7->8 11:46:30.613 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 6->7 11:46:30.611 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 3->4 11:46:30.611 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 1->2 11:46:30.611 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 8->9 11:46:30.611 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 0->1 11:46:30.611 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 2->3 11:46:30.627 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 0->1 11:46:30.681 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 1->2 11:46:30.681 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 2->3 11:46:30.734 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 3->4 11:46:30.780 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 4->5 11:46:30.785 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 5->6 11:46:30.785 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 6->7 11:46:30.787 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 7->8 11:46:30.838 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 8->9 11:46:30.890 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 9->10 11:46:30.890 [main] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - result:10
能夠看到,Wait的輸出是亂序的,最後Done的輸出是順序的。性能
AtomicStampedReference能夠用來解決ABA問題,什麼是ABA問題咱們看這個例子:
線程1讀取了數字以後,等待1秒,而後嘗試把1修改成3。
線程2後啓動,讀取到數字1後修改2,稍等一下又修改回1。
雖然AtomicInteger確保多個線程的原子性操做,可是沒法確保1就是原先讀取到的那個1,沒有通過別人修改。
能夠再換一個例子來講,若是咱們如今帳上有100元,要修改成200元,在修改以前帳戶已經被操做過了從100元充值到了150而後提現到了100,雖然最後仍是回到了100,可是這個時候嚴格一點的話,咱們應該認爲這個100不是原先的100,這個帳戶的版本發生了變化,若是咱們使用樂觀行鎖的話,雖然餘額都是100可是行鎖的版本確定不一致,AtomicStampedReference就是相似行樂觀鎖的概念。
@Test public void test() throws InterruptedException { AtomicInteger atomicInteger = new AtomicInteger(1); Thread thread1 = new Thread(() -> { int value = atomicInteger.get(); log.info("thread 1 read value: " + value); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (atomicInteger.compareAndSet(value, 3)) { log.info("thread 1 update from " + value + " to 3"); } else { log.info("thread 1 update fail!"); } }); thread1.start(); Thread thread2 = new Thread(() -> { int value = atomicInteger.get(); log.info("thread 2 read value: " + value); if (atomicInteger.compareAndSet(value, 2)) { log.info("thread 2 update from " + value + " to 2"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } value = atomicInteger.get(); log.info("thread 2 read value: " + value); if (atomicInteger.compareAndSet(value, 1)) { log.info("thread 2 update from " + value + " to 1"); } } }); thread2.start(); thread1.join(); thread2.join(); }
看下運行結果:
11:56:20.373 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1 11:56:20.381 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 2 11:56:20.373 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1 11:56:20.483 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2 11:56:20.484 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 1 11:56:21.386 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update from 1 to 3
下面咱們使用AtomicStampedReference來修復這個問題:
@Test public void test2() throws InterruptedException { AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1); Thread thread1 = new Thread(() -> { int[] stampHolder = new int[1]; int value = atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; log.info("thread 1 read value: " + value + ", stamp: " + stamp); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (atomicStampedReference.compareAndSet(value, 3, stamp, stamp + 1)) { log.info("thread 1 update from " + value + " to 3"); } else { log.info("thread 1 update fail!"); } }); thread1.start(); Thread thread2 = new Thread(() -> { int[] stampHolder = new int[1]; int value = atomicStampedReference.get(stampHolder); int stamp = stampHolder[0]; log.info("thread 2 read value: " + value + ", stamp: " + stamp); if (atomicStampedReference.compareAndSet(value, 2, stamp, stamp + 1)) { log.info("thread 2 update from " + value + " to 2"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } value = atomicStampedReference.get(stampHolder); stamp = stampHolder[0]; log.info("thread 2 read value: " + value + ", stamp: " + stamp); if (atomicStampedReference.compareAndSet(value, 1, stamp, stamp + 1)) { log.info("thread 2 update from " + value + " to 1"); } value = atomicStampedReference.get(stampHolder); stamp = stampHolder[0]; log.info("thread 2 read value: " + value + ", stamp: " + stamp); } }); thread2.start(); thread1.join(); thread2.join(); }
運行結果以下:
11:59:11.946 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 1 11:59:11.951 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 2 11:59:11.946 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1, stamp: 1 11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2, stamp: 2 11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 1 11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 3 11:59:12.954 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update fail!
能夠看到,如今咱們修改數據的時候不只僅是拿着值來修改了,還要提供版本號,讀取數據的時候能夠讀取到數據以及版本號。這樣的話,雖然數值不變,可是線程2通過兩次修改後數據的版本從1變爲了3,回過頭來線程1再要拿着版本號1來修改數據的話必然失敗。
本文比較短,咱們再來看網友以前問的一個有意思的問題,程序以下。
@Slf4j public class InterestingProblem { int a = 1; int b = 1; void add() { a++; b++; } void compare() { if (a < b) log.info("a:{},b:{},{}", a, b, a>b); } @Test public void test() throws InterruptedException { new Thread(() -> { while (true) add(); }).start(); new Thread(() -> { while (true) compare(); }).start(); TimeUnit.MILLISECONDS.sleep(100); } }
這位網友是這麼問的,他說見鬼了,不但能看到日誌輸出,並且我發現以前判斷過一次a<b,以後輸出a>b竟然是成立的,結果裏能夠看到true,JVM出現Bug了可能:
他以爲a和b不是靜態的,爲啥會出現併發問題呢因而問了同事:
這位網友實際上是沒有搞清楚多線程狀況下,可見性問題、原子性問題解決的事情,同事也把各類併發的概念混淆在一塊兒了。
咱們這麼來看這段代碼,這段代碼裏一個線程不斷操做a和b進行累加操做,一個線程判斷a和b,而後輸出結果。出現這個問題的緣由本質上是由於a<b是三步操做,取a,取b以及比較,不是原子性的,在整個過程當中可能穿插了add線程的操做a和b。若是先獲取a,而後a++ b++,而後獲取b,這個時候a<b,若是先a++,而後獲取a,獲取b,最後b++,這個時候a>b。咱們來看一下compare()方法的字節碼,能夠很明顯看到a
b的比較分明是4行指令,咱們不能以代碼行數來判斷操做是不是原子的,不是原子意味着操做過程當中可能被穿插了其它線程的其它代碼:
0 aload_0 1 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a> 4 aload_0 5 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b> 8 if_icmpge 67 (+59) 11 getstatic #4 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.log> 14 ldc #5 <a:{},b:{},{}> 16 iconst_3 17 anewarray #6 <java/lang/Object> 20 dup 21 iconst_0 22 aload_0 23 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a> 26 invokestatic #7 <java/lang/Integer.valueOf> 29 aastore 30 dup 31 iconst_1 32 aload_0 33 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b> 36 invokestatic #7 <java/lang/Integer.valueOf> 39 aastore 40 dup 41 iconst_2 42 aload_0 43 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a> 46 aload_0 47 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b> 50 if_icmple 57 (+7) 53 iconst_1 54 goto 58 (+4) 57 iconst_0 58 invokestatic #8 <java/lang/Boolean.valueOf> 61 aastore 62 invokeinterface #9 <org/slf4j/Logger.info> count 3 67 return
因此這位網友的理解有幾個問題:
咱們再來看看他三位同事的說法:
因此要進行簡單修復這個問題的話就是爲add()和compare()都加上synchronized關鍵字,除了這個鎖的方式有沒有其它方式呢?你能夠想一想。
本文簡單測試了一下java.util.concurrent.atomic包下面的一些經常使用Atomic操做類,最後分享了一個網友的問題和疑惑,但願文本對你有用。
一樣,代碼見個人Github,歡迎clone後本身把玩,歡迎點贊。
歡迎關注個人微信公衆號:隨緣主人的園子