閱讀前須要對JVM內存模型有必定了解。 java
定義算法
當多個線程訪問某個類時,無論運行時環境採用何種調度方式或者這些進程將如何交替執行,而且在主調代碼中不須要任何額外的同步或協同,這個類都能表現出正確的行爲,這個類就是線程安全的。數組
定義安全
提供互斥訪問,同一時刻只能有一個線程對它進行操做。bash
計數測試多線程
@Slf4j
public class CountExample2 {
/** * 請求總數 */
public static int clientTotal = 5000;
/** * 同時併發執行線程數 */
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count.get());
}
private static void add(){
count.incrementAndGet();
}
}
複製代碼
執行結果:併發
屢次執行結果始終是5000
,由此咱們能夠認爲這個類是線程安全的。app
由線程不安全到線程安全咱們只是把count
從int
改爲了AtomicInteger
,爲了找到具體緣由咱們來看AtomicInteger
的源碼。高併發
找到incrementAndGet
方法性能
/** * Atomically increments by one the current value. * * @return the updated value */
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
複製代碼
在incrementAndGet
方法實現中使用了一個unsafe
的類並調用了其getAndAddInt
方法,咱們點進這個方法看一下它的實現
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
複製代碼
這個方法裏使用了一個do-while
語句,while
的判斷條件調用了compareAndSwapInt
方法,咱們進入這個方法看一下
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
複製代碼
能夠看到這個方法是native
標識的方法,表示是Java
底層的方法,不是用Java
實現的。
如今回來看getAndAddInt
方法,首先傳入的第一個值var1
是一個對象,如計數測試中的count
;第二個值var2
是當前的值,如執行2 + 1
這個操做,那麼當前var2
就是2, 第三個參數var4
就是1.
接下來看方法內部,var5
是提供調用底層方法獲得的底層當前的值,若是沒有其餘線程過來處理var1
這個變量時,var5
的正常返回值應該是2(在上述例子的背景下),所以傳到compareAndSwapInt
方法中的參數分別是:count
對象,當前值2,當前從底層傳過來的2,從底層取出的值+增長量(這裏是1)。這個方法但願達到的目標是對於count
這個對象,若是當前的值與底層的值相同的話就把它更新成var5 + var4
的值。因爲傳入的var2
和var4
可能會被其餘線程更改,所以這裏要判斷當前的var2
和當前底層var5
是否相等。經過這樣不停地循環判斷來實現指望的值與底層值徹底相同的時候才執行+1的操做覆蓋底層值。
compareAndSwapInt
方法的核心思想就是CAS
的核心。
public static AtomicLong count = new AtomicLong(0);
複製代碼
把count
的類型改爲AtomicLong
,執行幾回發現結果跟上面同樣。
LongAdder
是JDK8
中新增的一個類,下面來使用一下
@Slf4j
@ThreadSafe
public class AtomicExample3 {
/** * 請求總數 */
public static int clientTotal = 5000;
/** * 同時併發執行線程數 */
public static int threadTotal = 200;
public static LongAdder count = new LongAdder();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add(){
count.increment();
}
}
複製代碼
屢次運行測試後,咱們能夠發現這個類是線程安全的。
由CAS
的底層實現咱們知道AtomicLong
是在一個死循環中不斷嘗試修改目標值,直到修改爲功,在競爭不激烈時修改爲功的機率很高,可是競爭激烈時修改失敗的機率也會很高,在大量修改失敗時就會進行屢次的循環嘗試,所以性能會收到影響。對於普通類型的long
和double
變量JVM
容許將64位的讀寫操做拆分紅兩個32位的操做。
LongAdder
的核心是將熱點數據分離,如將AtomicLong
的內部核心數據value
分離成一個數組,每一個線程訪問時經過哈希等算法映射到其中一個數字進行計數,最終的計數結果爲這個數組的求和累加。其中熱點數據value
會被分離成多個單元的cell
,每一個cell
獨自維護內部的值,當前對象的實際值由全部cell
累計合成。
這樣熱點就實現了有效分離並提升了並行度,LongAdder
就至關於在AtomicLong
的基礎上把單點的更新壓力分散到各個節點上,在低併發時經過對base
的直接更新能夠很好地保證和Atomic
的性能基本一致;在高併發時經過分散提升了性能。
可是LongAdder
也有缺點,在統計時若是有併發更新可能會致使統計的數據有些偏差。
在線程競爭很低的時候使用LongAdder
仍是更簡單,效率稍高一點。
在須要準確的數值如序列號生成的時候就須要AtomicLong
來保證準確性。
AtomicReference
AtomicReference
和AtomicInteger
很是相似,不一樣之處就在於AtomicInteger
是對整數的封裝,底層採用的是compareAndSwapInt
實現CAS
,比較的是數值是否相等,而AtomicReference
則對應普通的對象引用,底層使用的是compareAndSwapObject
實現CAS
,比較的是兩個對象的地址是否相等。也就是它能夠保證你在修改對象引用時的線程安全性。
引用類型的賦值是原子的。雖然虛擬機規範中說64位操做能夠不是原子性的,能夠分爲兩個32位的原子操做,可是目前商用的虛擬機幾乎都實現了64位原子操做。
首先咱們來看一下AtomicReference
源碼中的compareAndSet
方法
/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
複製代碼
能夠看到這個方法的底層也是使用CAS實現,這個方法的做用是噹噹前值與第一個參數值相等時將其更新爲第二個參數的值。
@Slf4j
@ThreadSafe
public class AtomicExample4 {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
count.compareAndSet(0, 2);// 2
count.compareAndSet(0, 1);// no
count.compareAndSet(1, 3);// no
count.compareAndSet(2, 4);// 4
count.compareAndSet(3, 5);// no
log.info("count:{}", count);
}
}
複製代碼
執行結果:
執行過程當中count
的值已在註釋中標出。
AtomicReferenceFieldUpdater
這裏以AtomicIntegerFieldUpdater
爲例
@Slf4j
@ThreadSafe
public class AtomicExample5 {
@Getter
public volatile int count = 100;
private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater
.newUpdater(AtomicExample5.class, "count");
public static void main(String[] args) {
AtomicExample5 example5 = new AtomicExample5();
if (updater.compareAndSet(example5, 100, 120)){
log.info("update success, {}", example5.getCount());
}
}
}
複製代碼
ABA問題:指在CAS操做的時候其餘線程將變量值A改爲了B可是又改回了A,當線程使用指望值A與當前變量比較的時候發現當前變量沒有變,因而CAS就將A值進行了交換操做。
解決思路:每次變量更新時把變量版本號加1.
看一下AtomicStampReference
源碼中是怎麼實現的
/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
複製代碼
這裏的compareAndSet
方法與以前的區別是加入了stamp
值的比較,用法與以前相同。
下面來看AtomicBoolean
類中的compareAndSet
方法
/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
複製代碼
根據這個方法寫一個例子
@Slf4j
@ThreadSafe
public class AtomicExample6 {
private static AtomicBoolean isHappened = new AtomicBoolean(false);
/** * 請求總數 */
public static int clientTotal = 5000;
/** * 同時併發執行線程數 */
public static int threadTotal = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
executorService.execute(() -> {
try {
semaphore.acquire();
test();
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappened:{}", isHappened.get());
}
private static void test() {
if (isHappened.compareAndSet(false, true)){
log.info("execute");
}
}
}
複製代碼
輸出結果
由結果能夠知道盡管循環執行了5000
次可是日誌只輸出了1
次,緣由是compareAndSet
是原子性操做,它能保證從false
變成true
只會執行一次。
這個方法可讓一段代碼只執行一次,不會重複執行。
能保證同一時間只有一個線程進行操做的除了Atomic包以外還有鎖。
Java中的鎖主要有如下兩種:
1.synchronized:依賴JVM
2.Lock:依賴特殊的CPU指令,代碼實現。
複製代碼
synchronized
修飾對象:
1.代碼塊:做用範圍大括號括起來的代碼,做用於調用的對象。
2.方法:做用範圍整個方法,做用於調用的對象,稱爲同步方法。
3.靜態方法:做用範圍整個靜態方法,做用於全部對象。
4.類:做用範圍括號括起來的部分,做用於全部對象。
複製代碼
測試修飾代碼塊
@Slf4j
public class SynchronizedExample1 {
/** * 修飾一個代碼塊 */
public void test1(int j){
synchronized (this){
for (int i = 0; i < 10; i++){
log.info("test1 {} - {}", j, i);
}
}
}
/** * 修飾一個方法 */
public synchronized void test2(){
for (int i = 0; i < 10; i++){
log.info("test2 - {}", i);
}
}
public static void main(String[] args) {
SynchronizedExample1 example1 = new SynchronizedExample1();
SynchronizedExample1 example2 = new SynchronizedExample1();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
example1.test1(1);
});
executorService.execute(() -> {
example2.test1(2);
});
}
}
複製代碼
運行結果
這個結果就驗證了同步代碼塊做用於當前對象,不一樣對象間是互不影響的。
測試修飾方法
public static void main(String[] args) {
SynchronizedExample1 example1 = new SynchronizedExample1();
SynchronizedExample1 example2 = new SynchronizedExample1();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
example1.test2(1);
});
executorService.execute(() -> {
example2.test2(2);
});
}
複製代碼
運行結果於上面類似,代表修飾方法時也是做用於調用對象的,不一樣對象間互不影響。
注意:當子類繼承父類時,父類中帶
synchronized
的方法在子類中不能帶synchronized
。若是子類也想使用synchronized
,則須要在方法上顯式聲明synchronized
。
測試修飾靜態方法
/** * 修飾一個靜態方法 */
public static synchronized void test2(int j){
for (int i = 0; i < 10; i++){
log.info("test2 {} - {}", j, i);
}
}
public static void main(String[] args) {
SynchronizedExample2 example1 = new SynchronizedExample2();
SynchronizedExample2 example2 = new SynchronizedExample2();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
example1.test2(1);
});
executorService.execute(() -> {
example2.test2(2);
});
}
複製代碼
運行結果
從這個結果能夠知道修飾靜態方法時是做用於所有對象的,即一個對象執行完後才能執行第二個,不能同步進行。
測試修飾類
/** * 修飾一個類 */
public static void test1(int j){
synchronized (SynchronizedExample2.class){
for (int i = 0; i < 10; i++){
log.info("test1 {} - {}", j, i);
}
}
}
public static void main(String[] args) {
SynchronizedExample2 example1 = new SynchronizedExample2();
SynchronizedExample2 example2 = new SynchronizedExample2();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
example1.test1(1);
});
executorService.execute(() -> {
example2.test1(2);
});
}
複製代碼
運行結果於上面相同,於預期一致。
只需在以前的add
方法前加上synchronized
修飾便可。
private synchronized static void add(){
count++;
}
複製代碼
執行結果始終是5000
synchronized:不可中斷鎖,適合競爭不激烈,可讀性好。
Lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態。
Atomic:競爭激烈時能維持常態,比Lock性能好,但只能同步一個值。
定義:一個線程對主內存的修改能夠及時地被其餘線程觀察到。
致使共享變量在線程間不可見的緣由:
1.線程交叉執行
2.重排序結合線程交叉執行
3.共享變量更新後的值沒有在工做內存與主內存間及時更新
JMM關於synchronized的兩條規定:
1.線程解鎖前,必須把共享變量的最新值刷新到主內存。
2.線程加鎖時,將清空工做內存中共享變量的值,從而使用共享變量時須要從主內存中從新讀取最新值。
經過加入內存屏障和禁止重排序優化來實現。
實現方法:
對volatile變量寫操做時,會在寫操做後加入一條store屏障指令,將本地內存中的共享變量值刷新到主內存。
對volatile變量讀操做時,會在讀操做前加入一條load屏障指令,從主內存中讀取共享變量。
示意圖:
這些過程都是在CPU指令級別進行操做,咱們在使用時直接使用volatile修飾須要的地方便可。
使用條件:
1.對變量的寫操做不依賴於當前值。
2.該變量沒有包含在具備其餘變量的不變的式子中。
定義
Java內存模型中容許編譯器和處理器對指令進行重排序,可是重排序過程不會影響到單線程程序的執行,卻會影響到多線程併發執行的正確性。
來自《深刻理解Java虛擬機》
1.程序次序原則:一個線程內,按照代碼順序,書寫在前面的操做先行發生於書寫在後面的操做。可能會發生重排序,可是重排序不會影響到最終結果,所以看起來仍是順序執行的。
2.鎖定規則:一個unLock操做先行發生於後面對同一個鎖的lock操做。
3.volatile變量規則:對一個變量的寫操做先行發生於後面對這個變量的讀操做。
4.傳遞規則:若是操做A先行發生於B,而操做B又先行發生於C,則能夠得出操做A先行發生於C.
5.線程啓動規則:Thread對象的start()方法先行發生於此線程的每個動做。
6.線程中斷規則:對線程的interrupt()方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生。
7.線程終結規則:線程中全部的操做都先行發生於線程的終止檢測,咱們能夠經過Thread.join()方法結束、Thread.isAlive()的返回值手段檢測線程是否已終止運行。
8.對象終結規則:一個對象的初始化完成先行發生於他的finalize()方法的開始。
若是兩個操做的執行次序沒法從happens-before原則中推導出來就不能保證它們的有序性,虛擬機就能夠隨意地對它們進行重排序。
Written by Autu.
2019.7.11