CAS與ABA問題產生和優雅解決

本人免費整理了Java高級資料,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分佈式等教程,一共30G,須要本身領取。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Qjava

 

獨佔鎖:是一種悲觀鎖,synchronized就是一種獨佔鎖,會致使其它全部須要鎖的線程掛起,等待持有鎖的線程釋放鎖。算法

樂觀鎖:每次不加鎖,假設沒有衝突去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。數據結構

1、CAS 操做併發

樂觀鎖用到的機制就是CAS,Compare and Swap。分佈式

CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。ide

一、非阻塞算法 (nonblocking algorithms)高併發

一個線程的失敗或者掛起不該該影響其餘線程的失敗或掛起的算法。

現代的CPU提供了特殊的指令,能夠自動更新共享數據,並且可以檢測到其餘線程的干擾,而 compareAndSet() 就用這些代替了鎖定。性能

二、AtomicInteger示例this

拿出AtomicInteger來研究在沒有鎖的狀況下是如何作到數據正確性的。atom

private volatile int value;

 

在沒有鎖的機制下須要藉助volatile原語,保證線程間的數據是可見的(共享的)。

這樣才獲取變量的值的時候才能直接讀取。

public final int get() {
    return value;
}

 

而後來看看 ++i 是怎麼作到的。

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

 

在這裏採用了CAS操做,每次從內存中讀取數據而後將此數據和+1後的結果進行CAS操做,若是成功就返回結果,不然重試直到成功爲止。

而compareAndSet利用JNI來完成CPU指令的操做。

public final boolean compareAndSet(int expect, int update) {   
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

 

總體的過程就是這樣子的,利用CPU的CAS指令,同時藉助JNI來完成Java的非阻塞算法。其它原子操做都是利用相似的特性完成的。

而整個J.U.C都是創建在CAS之上的,所以對於synchronized阻塞算法,J.U.C在性能上有了很大的提高。參考資料的文章中介紹了若是利用CAS構建非阻塞計數器、隊列等數據結構。

2、ABA問題

CAS看起來很爽,可是會致使「ABA問題」。

CAS算法實現一個重要前提須要取出內存中某時刻的數據,而在下時刻比較並替換,那麼在這個時間差類會致使數據的變化。

好比說一個線程one從內存位置V中取出A,這時候另外一個線程two也從內存中取出A,而且two進行了一些操做變成了B,而後two又將V位置的數據變成A,這時候線程one進行CAS操做發現內存中仍然是A,而後one操做成功。儘管線程one的CAS操做成功,可是不表明這個過程就是沒有問題的。

若是鏈表的頭在變化了兩次後恢復了原值,可是不表明鏈表就沒有變化。所以前面提到的原子操做AtomicStampedReference/AtomicMarkableReference就頗有用了。這容許一對變化的元素進行原子操做。

在運用CAS作Lock-Free操做中有一個經典的ABA問題:

線程1準備用CAS將變量的值由A替換爲B,在此以前,線程2將變量的值由A替換爲C,又由C替換爲A,而後線程1執行CAS時發現變量的值仍然爲A,因此CAS成功。但實際上這時的現場已經和最初不一樣了,儘管CAS成功,但可能存在潛藏的問題,例以下面的例子:

現有一個用單向鏈表實現的堆棧,棧頂爲A,這時線程T1已經知道A.next爲B,而後但願用CAS將棧頂替換爲B:

head.compareAndSet(A,B);

在T1執行上面這條指令以前,線程T2介入,將A、B出棧,再pushD、C、A,此時堆棧結構以下圖,而對象B此時處於遊離狀態:

此時輪到線程T1執行CAS操做,檢測發現棧頂仍爲A,因此CAS成功,棧頂變爲B,但實際上B.next爲null,因此此時的狀況變爲:

其中堆棧中只有B一個元素,C和D組成的鏈表再也不存在於堆棧中,無緣無故就把C、D丟掉了。

以上就是因爲ABA問題帶來的隱患,各類樂觀鎖的實現中一般都會用版本戳version來對記錄或對象標記,避免併發操做帶來的問題,在Java中,AtomicStampedReference<E>也實現了這個做用,它經過包裝[E,Integer]的元組來對對象標記版本戳stamp,從而避免ABA問題,例以下面的代碼分別用AtomicInteger和AtomicStampedReference來對初始值爲100的原子整型變量進行更新,AtomicInteger會成功執行CAS操做,而加上版本戳的AtomicStampedReference對於ABA問題會執行CAS失敗:

package concur.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class ABA {
    
    private static AtomicInteger atomicInt = new AtomicInteger(100);
    private static AtomicStampedReference<Integer> atomicStampedRef = 
            new AtomicStampedReference<Integer>(100, 0);
    
    public static void main(String[] args) throws InterruptedException {
        Thread intT1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicInt.compareAndSet(100, 101);
                atomicInt.compareAndSet(101, 100);
            }
        });
        
        Thread intT2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean c3 = atomicInt.compareAndSet(100, 101);
                System.out.println(c3);        //true
            }
        });
        
        intT1.start();
        intT2.start();
        intT1.join();
        intT2.join();
        
        Thread refT1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicStampedRef.compareAndSet(100, 101, 
                        atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
                atomicStampedRef.compareAndSet(101, 100, 
                        atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
            }
        });
        
        Thread refT2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int stamp = atomicStampedRef.getStamp();
                System.out.println("before sleep : stamp = " + stamp);    // stamp = 0
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("after sleep : stamp = " + atomicStampedRef.getStamp());//stamp = 1
                boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp+1);
                System.out.println(c3);        //false
            }
        });
        
        refT1.start();
        refT2.start();
    }

}
相關文章
相關標籤/搜索