【併發編程】對線程安全性的理解(Atomic、sync、volatile、Lock)

線程安全性

閱讀前須要對JVM內存模型有必定了解。 java

定義算法

當多個線程訪問某個類時,無論運行時環境採用何種調度方式或者這些進程將如何交替執行,而且在主調代碼中不須要任何額外的同步或協同,這個類都能表現出正確的行爲,這個類就是線程安全的。數組

原子性-Atomic包

定義安全

提供互斥訪問,同一時刻只能有一個線程對它進行操做。bash

AtomicXXX:CAS、Unsafe.compareAndSwapInt

計數測試多線程

@Slf4j
public class CountExample2 {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add(){
        count.incrementAndGet();
    }
}
複製代碼

執行結果:併發

屢次執行結果始終是5000,由此咱們能夠認爲這個類是線程安全的。app

由線程不安全到線程安全咱們只是把countint改爲了AtomicInteger,爲了找到具體緣由咱們來看AtomicInteger的源碼。高併發

找到incrementAndGet方法性能

/** * Atomically increments by one the current value. * * @return the updated value */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
複製代碼

incrementAndGet方法實現中使用了一個unsafe的類並調用了其getAndAddInt方法,咱們點進這個方法看一下它的實現

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
複製代碼

這個方法裏使用了一個do-while語句,while的判斷條件調用了compareAndSwapInt方法,咱們進入這個方法看一下

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
複製代碼

能夠看到這個方法是native標識的方法,表示是Java底層的方法,不是用Java實現的。

如今回來看getAndAddInt方法,首先傳入的第一個值var1是一個對象,如計數測試中的count;第二個值var2是當前的值,如執行2 + 1這個操做,那麼當前var2就是2, 第三個參數var4就是1.

接下來看方法內部,var5是提供調用底層方法獲得的底層當前的值,若是沒有其餘線程過來處理var1這個變量時,var5的正常返回值應該是2(在上述例子的背景下),所以傳到compareAndSwapInt方法中的參數分別是:count對象,當前值2,當前從底層傳過來的2,從底層取出的值+增長量(這裏是1)。這個方法但願達到的目標是對於count這個對象,若是當前的值與底層的值相同的話就把它更新成var5 + var4的值。因爲傳入的var2var4可能會被其餘線程更改,所以這裏要判斷當前的var2和當前底層var5是否相等。經過這樣不停地循環判斷來實現指望的值與底層值徹底相同的時候才執行+1的操做覆蓋底層值。

compareAndSwapInt方法的核心思想就是CAS的核心。

AtomicLong、LongAdder

public static AtomicLong count = new AtomicLong(0);
複製代碼

count的類型改爲AtomicLong,執行幾回發現結果跟上面同樣。

LongAdderJDK8中新增的一個類,下面來使用一下

@Slf4j
@ThreadSafe
public class AtomicExample3 {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    public static LongAdder count = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add(){
        count.increment();
    }
}
複製代碼

屢次運行測試後,咱們能夠發現這個類是線程安全的。

AtomicLong與LongAdder對比

CAS的底層實現咱們知道AtomicLong是在一個死循環中不斷嘗試修改目標值,直到修改爲功,在競爭不激烈時修改爲功的機率很高,可是競爭激烈時修改失敗的機率也會很高,在大量修改失敗時就會進行屢次的循環嘗試,所以性能會收到影響。對於普通類型的longdouble變量JVM容許將64位的讀寫操做拆分紅兩個32位的操做。

LongAdder的核心是將熱點數據分離,如將AtomicLong的內部核心數據value分離成一個數組,每一個線程訪問時經過哈希等算法映射到其中一個數字進行計數,最終的計數結果爲這個數組的求和累加。其中熱點數據value會被分離成多個單元的cell,每一個cell獨自維護內部的值,當前對象的實際值由全部cell累計合成。

這樣熱點就實現了有效分離並提升了並行度,LongAdder就至關於在AtomicLong的基礎上把單點的更新壓力分散到各個節點上,在低併發時經過對base的直接更新能夠很好地保證和Atomic的性能基本一致;在高併發時經過分散提升了性能。

可是LongAdder也有缺點,在統計時若是有併發更新可能會致使統計的數據有些偏差。

在線程競爭很低的時候使用LongAdder仍是更簡單,效率稍高一點。

在須要準確的數值如序列號生成的時候就須要AtomicLong來保證準確性。

AtomicReference、AtomicReferenceFieldUpdater

AtomicReference

AtomicReferenceAtomicInteger很是相似,不一樣之處就在於AtomicInteger是對整數的封裝,底層採用的是compareAndSwapInt實現CAS,比較的是數值是否相等,而AtomicReference則對應普通的對象引用,底層使用的是compareAndSwapObject實現CAS,比較的是兩個對象的地址是否相等。也就是它能夠保證你在修改對象引用時的線程安全性。

引用類型的賦值是原子的。雖然虛擬機規範中說64位操做能夠不是原子性的,能夠分爲兩個32位的原子操做,可是目前商用的虛擬機幾乎都實現了64位原子操做。

首先咱們來看一下AtomicReference源碼中的compareAndSet方法

/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */
public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
複製代碼

能夠看到這個方法的底層也是使用CAS實現,這個方法的做用是噹噹前值與第一個參數值相等時將其更新爲第二個參數的值。

@Slf4j
@ThreadSafe
public class AtomicExample4 {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2);// 2
        count.compareAndSet(0, 1);// no
        count.compareAndSet(1, 3);// no
        count.compareAndSet(2, 4);// 4
        count.compareAndSet(3, 5);// no
        log.info("count:{}", count);
    }
}
複製代碼

執行結果:

執行過程當中count的值已在註釋中標出。

AtomicReferenceFieldUpdater

這裏以AtomicIntegerFieldUpdater爲例

@Slf4j
@ThreadSafe
public class AtomicExample5 {

    @Getter
    public volatile int count = 100;

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater
            .newUpdater(AtomicExample5.class, "count");

    public static void main(String[] args) {
        AtomicExample5 example5 = new AtomicExample5();

        if (updater.compareAndSet(example5, 100, 120)){
            log.info("update success, {}", example5.getCount());
        }
    }
}
複製代碼

AtomicStampReference: CAS 的 ABA 問題

ABA問題:指在CAS操做的時候其餘線程將變量值A改爲了B可是又改回了A,當線程使用指望值A與當前變量比較的時候發現當前變量沒有變,因而CAS就將A值進行了交換操做。

解決思路:每次變量更新時把變量版本號加1.

看一下AtomicStampReference源碼中是怎麼實現的

/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}
複製代碼

這裏的compareAndSet方法與以前的區別是加入了stamp值的比較,用法與以前相同。

下面來看AtomicBoolean類中的compareAndSet方法

/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */
public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
複製代碼

根據這個方法寫一個例子

@Slf4j
@ThreadSafe
public class AtomicExample6 {

    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)){
            log.info("execute");
        }
    }

}
複製代碼

輸出結果

由結果能夠知道盡管循環執行了5000次可是日誌只輸出了1次,緣由是compareAndSet是原子性操做,它能保證從false變成true只會執行一次。

這個方法可讓一段代碼只執行一次,不會重複執行。

能保證同一時間只有一個線程進行操做的除了Atomic包以外還有鎖。

Java中的鎖主要有如下兩種:

1.synchronized:依賴JVM

2.Lock:依賴特殊的CPU指令,代碼實現。
複製代碼

synchronized

修飾對象:

1.代碼塊:做用範圍大括號括起來的代碼,做用於調用的對象。

2.方法:做用範圍整個方法,做用於調用的對象,稱爲同步方法。

3.靜態方法:做用範圍整個靜態方法,做用於全部對象。

4.類:做用範圍括號括起來的部分,做用於全部對象。
複製代碼

測試修飾代碼塊

@Slf4j
public class SynchronizedExample1 {

    /** * 修飾一個代碼塊 */
    public void test1(int j){
        synchronized (this){
            for (int i = 0; i < 10; i++){
                log.info("test1 {} - {}", j, i);
            }
        }
    }

    /** * 修飾一個方法 */
    public synchronized void test2(){
        for (int i = 0; i < 10; i++){
            log.info("test2 - {}", i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            example1.test1(1);
        });
        executorService.execute(() -> {
            example2.test1(2);
        });
    }
}
複製代碼

運行結果

這個結果就驗證了同步代碼塊做用於當前對象,不一樣對象間是互不影響的。

測試修飾方法

public static void main(String[] args) {
    SynchronizedExample1 example1 = new SynchronizedExample1();
    SynchronizedExample1 example2 = new SynchronizedExample1();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        example1.test2(1);
    });
    executorService.execute(() -> {
        example2.test2(2);
    });
}
複製代碼

運行結果於上面類似,代表修飾方法時也是做用於調用對象的,不一樣對象間互不影響。

注意:當子類繼承父類時,父類中帶synchronized的方法在子類中不能帶synchronized。若是子類也想使用synchronized,則須要在方法上顯式聲明synchronized

測試修飾靜態方法

/** * 修飾一個靜態方法 */
public static synchronized void test2(int j){
    for (int i = 0; i < 10; i++){
        log.info("test2 {} - {}", j, i);
    }
}

public static void main(String[] args) {
    SynchronizedExample2 example1 = new SynchronizedExample2();
    SynchronizedExample2 example2 = new SynchronizedExample2();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        example1.test2(1);
    });
    executorService.execute(() -> {
        example2.test2(2);
    });
}
複製代碼

運行結果

從這個結果能夠知道修飾靜態方法時是做用於所有對象的,即一個對象執行完後才能執行第二個,不能同步進行。

測試修飾類

/** * 修飾一個類 */
public static void test1(int j){
    synchronized (SynchronizedExample2.class){
        for (int i = 0; i < 10; i++){
            log.info("test1 {} - {}", j, i);
        }
    }
}

public static void main(String[] args) {
    SynchronizedExample2 example1 = new SynchronizedExample2();
    SynchronizedExample2 example2 = new SynchronizedExample2();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        example1.test1(1);
    });
    executorService.execute(() -> {
        example2.test1(2);
    });
}
複製代碼

運行結果於上面相同,於預期一致。

使用synchronized保證計數安全

只需在以前的add方法前加上synchronized修飾便可。

private synchronized static void add(){
    count++;
}
複製代碼

執行結果始終是5000

原子性-對比

  • synchronized:不可中斷鎖,適合競爭不激烈,可讀性好。

  • Lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態。

  • Atomic:競爭激烈時能維持常態,比Lock性能好,但只能同步一個值。

可見性

定義:一個線程對主內存的修改能夠及時地被其餘線程觀察到。

致使共享變量在線程間不可見的緣由:

  • 1.線程交叉執行

  • 2.重排序結合線程交叉執行

  • 3.共享變量更新後的值沒有在工做內存與主內存間及時更新

可見性 - synchronized

JMM關於synchronized的兩條規定:

  • 1.線程解鎖前,必須把共享變量的最新值刷新到主內存。

  • 2.線程加鎖時,將清空工做內存中共享變量的值,從而使用共享變量時須要從主內存中從新讀取最新值。

可見性 - volatile

經過加入內存屏障和禁止重排序優化來實現。

實現方法:

對volatile變量寫操做時,會在寫操做後加入一條store屏障指令,將本地內存中的共享變量值刷新到主內存。

對volatile變量讀操做時,會在讀操做前加入一條load屏障指令,從主內存中讀取共享變量。

示意圖:

這些過程都是在CPU指令級別進行操做,咱們在使用時直接使用volatile修飾須要的地方便可。

使用條件:

  • 1.對變量的寫操做不依賴於當前值。

  • 2.該變量沒有包含在具備其餘變量的不變的式子中。

有序性

定義

Java內存模型中容許編譯器和處理器對指令進行重排序,可是重排序過程不會影響到單線程程序的執行,卻會影響到多線程併發執行的正確性。

happens-before原則

來自《深刻理解Java虛擬機》

  • 1.程序次序原則:一個線程內,按照代碼順序,書寫在前面的操做先行發生於書寫在後面的操做。可能會發生重排序,可是重排序不會影響到最終結果,所以看起來仍是順序執行的。

  • 2.鎖定規則:一個unLock操做先行發生於後面對同一個鎖的lock操做。

  • 3.volatile變量規則:對一個變量的寫操做先行發生於後面對這個變量的讀操做。

  • 4.傳遞規則:若是操做A先行發生於B,而操做B又先行發生於C,則能夠得出操做A先行發生於C.

  • 5.線程啓動規則:Thread對象的start()方法先行發生於此線程的每個動做。

  • 6.線程中斷規則:對線程的interrupt()方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生。

  • 7.線程終結規則:線程中全部的操做都先行發生於線程的終止檢測,咱們能夠經過Thread.join()方法結束、Thread.isAlive()的返回值手段檢測線程是否已終止運行。

  • 8.對象終結規則:一個對象的初始化完成先行發生於他的finalize()方法的開始。

若是兩個操做的執行次序沒法從happens-before原則中推導出來就不能保證它們的有序性,虛擬機就能夠隨意地對它們進行重排序。

Written by Autu.

2019.7.11

相關文章
相關標籤/搜索