JAVA原子組件和同步組件

前言

在使用多線程併發編程的時,常常會遇到對共享變量修改操做。此時咱們能夠選擇ConcurrentHashMap,ConcurrentLinkedQueue來進行安全地存儲數據。但若是單單是涉及狀態的修改,線程執行順序問題,使用Atomic開頭的原子組件或者ReentrantLock、CyclicBarrier之類的同步組件,會是更好的選擇,下面將一一介紹它們的原理和用法java

  • 原子組件的實現原理CAS
  • AtomicBoolean、AtomicIntegerArray等原子組件的用法、
  • 同步組件的實現原理
  • ReentrantLock、CyclicBarrier等同步組件的用法

應用場景

  • 可用來實現變量、狀態在多線程下的原子性操做
  • 可用於實現同步鎖(ReentrantLock)

原子組件

  • 原子組件的原子性操做是靠使用cas來自旋操做volatile變量實現的
  • volatile的類型變量保證變量被修改時,其餘線程都能看到最新的值
  • cas則保證value的修改操做是原子性的,不會被中斷

基本類型原子類

AtomicBoolean //布爾類型
AtomicInteger //正整型數類型
AtomicLong      //長整型類型
複製代碼
  • 使用示例
public static void main(String[] args) throws Exception {
    AtomicBoolean atomicBoolean = new AtomicBoolean(false);
    //異步線程修改atomicBoolean
    CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
        try {
            Thread.sleep(1000); //保證異步線程是在主線程以後修改atomicBoolean爲false
            atomicBoolean.set(false);
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });
    atomicBoolean.set(true);
    future.join();
    System.out.println("boolean value is:"+atomicBoolean.get());
}
---------------輸出結果------------------
boolean value is:false
複製代碼

引用類原子類

AtomicReference
//加時間戳版本的引用類原子類
AtomicStampedReference
//至關於AtomicStampedReference,AtomicMarkableReference關心的是
//變量是否仍是原來變量,中間被修改過也無所謂
AtomicMarkableReference
複製代碼
  • AtomicReference的源碼以下,它內部定義了一個volatile V value,並藉助VarHandle(具體子類是FieldInstanceReadWrite)實現原子操做,MethodHandles會幫忙計算value在類的偏移位置,最後在VarHandle調用Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)方法原子修改對象的屬性
public class AtomicReference<V> implements java.io.Serializable {
    private static final long serialVersionUID = -1848883965231344442L;
    private static final VarHandle VALUE;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    private volatile V value;
    ....
複製代碼

ABA問題

  • 線程X準備將變量的值從A改成B,然而這期間線程Y將變量的值從A改成C,而後再改成A;最後線程X檢測變量值是A,並置換爲B。但實際上,A已經再也不是原來的A了
  • 解決方法,是把變量定爲惟一類型。值能夠加上版本號,或者時間戳。如加上版本號,線程Y的修改變爲A1->B2->A3,此時線程X再更新則能夠判斷出A1不等於A3
  • AtomicStampedReference的實現和AtomicReference差很少,不過它原子修改的變量是volatile Pair<V> pair;,Pair是其內部類。AtomicStampedReference能夠用來解決ABA問題
public class AtomicStampedReference<V> {
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
    private volatile Pair<V> pair;
複製代碼
  • 若是咱們不關心變量在中間過程是否被修改過,而只是關心當前變量是否仍是原先的變量,則可使用AtomicMarkableReference
  • AtomicStampedReference的使用示例
public class Main {
    public static void main(String[] args) throws Exception {
        Test old = new Test("hello"), newTest = new Test("world");
        AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1);
        reference.compareAndSet(old, newTest,1,2);
        System.out.println("對象:"+reference.getReference().name+";版本號:"+reference.getStamp());
    }
}
class Test{
    Test(String name){ this.name = name; }
    public String name;
}
---------------輸出結果------------------
對象:world;版本號:2
複製代碼

數組原子類

AtomicIntegerArray     //整型數組
AtomicLongArray         //長整型數組
AtomicReferenceArray    //引用類型數組
複製代碼
  • 數組原子類內部會初始一個final的數組,它把整個數組當作一個對象,而後根據下標index計算法元素偏移量,再調用UNSAFE.compareAndSetReference進行原子操做。數組並沒被volatile修飾,爲了保證元素類型在不一樣線程的可見,獲取元素使用到了UNSAFEpublic native Object getReferenceVolatile(Object o, long offset)方法來獲取實時的元素值
  • 使用示例
//元素默認初始化爲0
AtomicIntegerArray array = new AtomicIntegerArray(2);
// 下標爲0的元素,期待值是0,更新值是1
array.compareAndSet(0,0,1);
System.out.println(array.get(0));
---------------輸出結果------------------
1
複製代碼

屬性原子更新類

AtomicIntegerFieldUpdater 
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
複製代碼
  • 若是操做對象是某一類型的屬性,可使用AtomicIntegerFieldUpdater原子更新,不過類的屬性須要定義成volatile修飾的變量,保證該屬性在各個線程的可見性,不然會報錯
  • 使用示例
public class Main {
    public static void main(String[] args) {
        AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
        Test test = new Test("hello world");
        fieldUpdater.compareAndSet(test,"hello world","siting");
        System.out.println(fieldUpdater.get(test));
        System.out.println(test.name);
    }
}
class Test{
    Test(String name){ this.name = name; }
    public volatile String name;
}
---------------輸出結果------------------
siting
siting
複製代碼

累加器

Striped64
LongAccumulator
LongAdder
//accumulatorFunction:運算規則,identity:初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) 複製代碼
  • LongAccumulator和LongAdder都繼承於Striped64,Striped64的主要思想是和ConcurrentHashMap有點相似,分段計算,單個變量計算併發性能慢時,咱們能夠把數學運算分散在多個變量,而須要計算總值時,再一一累加起來
  • LongAdder至關於LongAccumulator一個特例實現
  • LongAccumulator的示例
public static void main(String[] args) throws Exception {
    LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
    for(int i=0;i<100000;i++){
        CompletableFuture.runAsync(() -> accumulator.accumulate(1));
    }
    Thread.sleep(1000); //等待所有CompletableFuture線程執行完成,再獲取
    System.out.println(accumulator.get());
}
---------------輸出結果------------------
100000
複製代碼

同步組件的實現原理

  • java的多數同步組件會在內部維護一個狀態值,和原子組件同樣,修改狀態值時通常也是經過cas來實現。而狀態修改的維護工做被Doug Lea抽象出AbstractQueuedSynchronizer(AQS)來實現
  • AQS的原理能夠看下以前寫的一篇文章:詳解鎖原理,synchronized、volatile+cas底層實現

同步組件

ReentrantLock、ReentrantReadWriteLock

  • ReentrantLock、ReentrantReadWriteLock都是基於AQS(AbstractQueuedSynchronizer)實現的。由於它們有公平鎖和非公平鎖的區分,所以沒直接繼承AQS,而是使用內部類去繼承,公平鎖和非公平鎖各自實現AQS,ReentrantLock、ReentrantReadWriteLock再借助內部類來實現同步
  • ReentrantLock的使用示例
ReentrantLock lock = new ReentrantLock();
if(lock.tryLock()){
    //業務邏輯
    lock.unlock();
}
複製代碼
  • ReentrantReadWriteLock的使用示例
public static void main(String[] args) throws Exception {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    if(lock.readLock().tryLock()){ //讀鎖
        //業務邏輯
        lock.readLock().unlock();
    }
    if(lock.writeLock().tryLock()){ //寫鎖
        //業務邏輯
        lock.writeLock().unlock();
    }
}
複製代碼

Semaphore實現原理和使用場景

  • Semaphore和ReentrantLock同樣,也有公平和非公平競爭鎖的策略,同樣也是經過內部類繼承AQS來實現同步
  • 通俗解釋:假設有一口井,最多有三我的的位置打水。每有一我的打水,則須要佔用一個位置。當三個位置所有佔滿時,第四我的須要打水,則要等待前三我的中一個離開打水位,才能繼續獲取打水的位置
  • 使用示例
public static void main(String[] args) throws Exception {
    Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 3; i++)
        CompletableFuture.runAsync(() -> {
            try {
                System.out.println(Thread.currentThread().toString() + " start ");
                if(semaphore.tryAcquire(1)){
                    Thread.sleep(1000);
                    semaphore.release(1);
                    System.out.println(Thread.currentThread().toString() + " 無阻塞結束 ");
                }else {
                    System.out.println(Thread.currentThread().toString() + " 被阻塞結束 ");
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    //保證CompletableFuture 線程被執行,主線程再結束
    Thread.sleep(2000);
}
---------------輸出結果------------------
Thread[ForkJoinPool.commonPool-worker-19,5,main] start 
Thread[ForkJoinPool.commonPool-worker-5,5,main] start 
Thread[ForkJoinPool.commonPool-worker-23,5,main] start 
Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞結束 
Thread[ForkJoinPool.commonPool-worker-5,5,main] 無阻塞結束 
Thread[ForkJoinPool.commonPool-worker-19,5,main] 無阻塞結束 
複製代碼
  • 能夠看出三個線程,由於信號量設定爲2,第三個線程是沒法獲取信息成功的,會打印阻塞結束

CountDownLatch實現原理和使用場景

  • CountDownLatch也是靠AQS實現的同步操做
  • 通俗解釋:玩遊戲時,假如主線任務須要靠完成五個小任務,主線任務才能繼續進行時。此時能夠用CountDownLatch,主線任務阻塞等待,每完成一小任務,就done一次計數,直到五個小任務所有被執行才能觸發主線
  • 使用示例
public static void main(String[] args) throws Exception {
    CountDownLatch count = new CountDownLatch(2);
    for (int i = 0; i < 2; i++)
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(" CompletableFuture over ");
                count.countDown();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    //等待CompletableFuture線程的完成
    count.await();
    System.out.println(" main over ");
}
---------------輸出結果------------------
 CompletableFuture over 
 CompletableFuture over 
 main over 
複製代碼

CyclicBarrier實現原理和使用場景

  • CyclicBarrier則是靠ReentrantLock lockCondition trip屬性來實現同步
  • 通俗解釋:CyclicBarrier須要阻塞所有線程到await狀態,而後所有線程再所有被喚醒執行。想象有一個欄杆攔住五隻羊,須要當五隻羊一塊兒站在欄杆時,欄杆纔會被拉起,此時全部的羊均可以飛跑出羊圈
  • 使用示例
public static void main(String[] args) throws Exception {
    CyclicBarrier barrier = new CyclicBarrier(2);
    CompletableFuture.runAsync(()->{
        try {
            System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
            barrier.await(); //須要等待main線程也執行到await狀態才能繼續執行
            System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });
    Thread.sleep(1000);
    //和CompletableFuture線程相互等待
    barrier.await();
    System.out.println("main run over!");
}
---------------輸出結果------------------
CompletableFuture run start-1609822588881
main run over!
CompletableFuture run over-1609822589880
複製代碼

StampedLock

  • StampedLock不是藉助AQS,而是本身內部維護多個狀態值,並配合cas實現的
  • StampedLock具備三種模式:寫模式、讀模式、樂觀讀模式
  • StampedLock的讀寫鎖能夠相互轉換
//獲取讀鎖,自旋獲取,返回一個戳值
public long readLock()
//嘗試加讀鎖,不成功返回0
public long tryReadLock()
//解鎖
public void unlockRead(long stamp) 
//獲取寫鎖,自旋獲取,返回一個戳值
public long writeLock()
//嘗試加寫鎖,不成功返回0
public long tryWriteLock()
//解鎖
public void unlockWrite(long stamp)
//嘗試樂觀讀讀取一個時間戳,並配合validate方法校驗時間戳的有效性
public long tryOptimisticRead()
//驗證stamp是否有效
public boolean validate(long stamp) 複製代碼
  • 使用示例
public static void main(String[] args) throws Exception {
    StampedLock stampedLock = new StampedLock();
    long stamp = stampedLock.tryOptimisticRead();
    //判斷版本號是否生效
    if (!stampedLock.validate(stamp)) {
        //獲取讀鎖,會空轉
        stamp = stampedLock.readLock();
        long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
        if (writeStamp != 0) { //成功轉爲寫鎖
            //fixme 業務操做
            stampedLock.unlockWrite(writeStamp);
        } else {
            stampedLock.unlockRead(stamp);
            //嘗試獲取寫讀
            stamp = stampedLock.tryWriteLock();
            if (stamp != 0) {
                //fixme 業務操做
                stampedLock.unlockWrite(writeStamp);
            }
        }
    }
}    
複製代碼

歡迎指正文中錯誤

參考:《2020最新Java基礎精講視頻教程和學習路線!》算法

連接:https://juejin.cn/post/691446...編程

相關文章
相關標籤/搜索