Java併發——CAS原理分析

咱們知道多線程操做共享資源時,會出現三個問題:可見性、有序性以及原子性。java

通常狀況下,咱們採用synchronized同步鎖(獨佔鎖、互斥鎖),即同一時間只有一個線程可以修改共享變量,
其餘線程必須等待。可是這樣的話就至關於單線程,體現不出來多線程的優點。

那麼咱們有沒有另外一種方式來解決這三個問題呢?算法

Java中有一個volatile關鍵字,它能夠解決可見性和有序性的問題。並且若是操做的共享變量是基本數據類型,
而且同一時間只對變量進行讀取或者寫入的操做,那麼原子性問題也獲得瞭解決,就不會產生多線程問題了。

可是一般,咱們都要先讀取共享變量,而後操做共享變量,最後寫入共享變量,那麼這個時候怎麼保證整個操做的原子性呢?一種解決方式就是CAS技術。在講解這個以前,先了解兩個重要概念:悲觀鎖與樂觀鎖。數據庫

一. 悲觀鎖與樂觀鎖

  • 悲觀鎖:老是假設最壞的狀況,每次去拿數據的時候都認爲別人會修改,因此每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會阻塞直到它拿到鎖(共享資源每次只給一個線程使用,其它線程阻塞,用完後再把資源轉讓給其它線程)。傳統的關係型數據庫裏邊就用到了不少這種鎖機制,好比行鎖、表鎖等、讀鎖、寫鎖等,都是在作操做以前先上鎖。Java中Synchronized和ReentrantLock等獨佔鎖就是悲觀鎖思想的實現。
// MySQL InnoDB持經過特定的語句進行顯示鎖定
SELECT … FOR UPDATE
  • 樂觀鎖:老是假設最好的狀況,每次去拿數據的時候都認爲別人不會修改,因此不會上鎖,可是在更新的時候會判斷一下在此期間別人有沒有去更新這個數據,能夠使用版本號機制和CAS算法實現樂觀鎖適用於多讀的應用類型,這樣能夠提升吞吐量,像數據庫提供的相似於write_condition機制,其實都是提供的樂觀鎖。在Java中java.util.concurrent.atomic包下面的原子變量類就是使用了樂觀鎖的一種實現方式CAS實現的。

兩種鎖各有優缺點,不可認爲一種好於另外一種,像樂觀鎖適用於寫比較少的狀況下,即衝突真的不多發生的時候,這樣能夠省去了鎖的開銷,加大了系統的整個吞吐量。但若是常常產生衝突,上層應用會不斷的進行retry,這樣反卻是下降了性能,因此這種狀況下用悲觀鎖就比較合適。 編程

悲觀鎖會阻塞其餘線程。樂觀鎖不會阻塞其餘線程,若是發生衝突,採用死循環的方式一直重試,直到更新成功。

能夠參考《樂觀鎖、悲觀鎖,這一篇就夠了!segmentfault

二. CAS算法的實現原理

compare and swap(比較與交換),是一種有名的無鎖算法。無鎖編程,即不使用鎖的狀況下實現多線程之間的變量同步,也就是在沒有線程被阻塞的狀況下實現變量的同步,因此也叫非阻塞同步(Non-blocking Synchronization)。CAS算法包含三個值:當前內存值(V)、預期原來的值(A)以及期待更新的值(B)。安全

若是內存位置V的值與預期原值A相匹配,那麼處理器會自動將該位置值更新爲新值B,返回true。不然處理器不作任何操做,返回false,這時會不斷的重試,直到沒有衝突,更新成功。

實現CAS最重要的一點,就是比較和交換操做的一致性,不然就會產生歧義。多線程

好比當前線程比較成功後,準備更新共享變量值的時候,這個共享變量值被其餘線程更改了,那麼CAS函數必須返回false。

要實現這個需求,java中提供了Unsafe類,它提供了三個函數,分別用來操做基本類型int和long,以及引用類型Object。併發

public final native boolean compareAndSwapObject(Object obj, long valueOffset, Object expect, Object update);

    public final native boolean compareAndSwapInt(Object obj, long valueOffset, int expect, int update);

    public final native boolean compareAndSwapLong(Object obj, long valueOffset, long expect, long update);

參數的意義:框架

  • obj 和 valueOffset:表示這個共享變量的內存地址。這個共享變量是obj對象的一個成員屬性,valueOffset表示這個共享變量在obj類中的內存偏移量。因此經過這兩個參數就能夠直接在內存中修改和讀取共享變量值。
  • expect: 表示預期原來的值。
  • update: 表示期待更新的值。

接下來咱們來看看Java併發框架下的atomic包是如何使用CAS的。ide

三. JUC併發框架下的原子類(atomic)

調用JUC併發框架下原子類的方法時,不須要考慮多線程問題。那麼咱們分析它是怎麼解決多線程問題的。以AtomicInteger類爲例。

3.1 成員變量

// 經過它來實現CAS操做的。由於是int類型,因此調用它的compareAndSwapInt方法
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    // value這個共享變量在AtomicInteger對象上內存偏移量,
    // 經過它直接在內存中修改value的值,compareAndSwapInt方法中須要這個參數
    private static final long valueOffset;

    // 經過靜態代碼塊,在AtomicInteger類加載時就會調用
    static {
        try {
            // 經過unsafe類,獲取value變量在AtomicInteger對象上內存偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    // 共享變量,AtomicInteger就保證了對它多線程操做的安全性。
    // 使用volatile修飾,解決了可見性和有序性問題。
    private volatile int value;

有三個重要的屬性:

  • unsafe: 經過它實現CAS操做,由於共享變量是int類型,因此調用compareAndSwapInt方法。
  • valueOffset: 共享變量value在AtomicInteger對象上內存偏移量
  • value: 共享變量,使用volatile修飾,解決了可見性和有序性問題。

3.2 重要方法

3.2.1 get與set方法

// 直接讀取。由於是volatile關鍵子修飾的,老是能看到(任意線程)對這個volatile變量最新的寫入
    public final int get() {
        return value;
    }

    // 直接寫入。由於是volatile關鍵子修飾的,因此它修改value變量也會當即被別的線程讀取到。
    public final void set(int newValue) {
        value = newValue;
    }

由於value變量是volatile關鍵字修飾的,它老是能讀取(任意線程)對這個volatile變量最新的寫入。它修改value變量也會當即被別的線程讀取到。

3.2.2 compareAndSet方法

// 若是value變量的當前值(內存值)等於指望值(expect),那麼就把update賦值給value變量,返回true。
    // 若是value變量的當前值(內存值)不等於指望值(expect),就什麼都不作,返回false。
    // 這個就是CAS操做,使用unsafe.compareAndSwapInt方法,保證整個操做過程的原子性
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

經過調用unsafe的compareAndSwapInt方法實現CAS函數的。可是CAS函數只能保證比較並交換操做的原子性,可是更新操做並不必定會執行。好比咱們想讓共享變量value自增。
共享變量value自增是三個操做,1.讀取value值,2.計算value+1的值,3.將value+1的值賦值給value。分析這三個操做:

  • 讀取value值,由於value變量是volatile關鍵字修飾的,可以讀取到任意線程對它最後一次修改的值,因此沒問題。
  • 計算value+1的值:這個時候就有問題了,可能在計算這個值的時候,其餘線程更改了value值,由於沒有加同步鎖,因此其餘線程能夠更改value值。
  • 將value+1的值賦值給value: 使用CAS函數,若是返回false,說明在當前線程讀取value值到調用CAS函數方法前,共享變量被其餘線程修改了,那麼value+1的結果值就不是咱們想要的了,由於要從新計算。

3.2.3 getAndAddInt方法

public final int getAndAddInt(Object obj, long valueOffset, int var) {
        int expect;
        // 利用循環,直到更新成功才跳出循環。
        do {
            // 獲取value的最新值
            expect = this.getIntVolatile(obj, valueOffset);
            // expect + var表示須要更新的值,若是compareAndSwapInt返回false,說明value值被其餘線程更改了。
            // 那麼就循環重試,再次獲取value最新值expect,而後再計算須要更新的值expect + var。直到更新成功
        } while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + var));

        // 返回當前線程在更改value成功後的,value變量原先值。並非更改後的值
        return expect;
    }

這個方法在Unsafe類中,利用do_while循環,先利用當前值,計算更新值,而後經過compareAndSwapInt方法設置value變量,若是compareAndSwapInt方法返回失敗,表示value變量的值被別的線程更改了,因此循環獲取value變量最新值,再經過compareAndSwapInt方法設置value變量。直到設置成功。跳出循環,返回更新前的值。

// 將value的值當前值的基礎上加1,並返回當前值
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    // 將value的值當前值的基礎上加-1,並返回當前值
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

   
    // 將value的值當前值的基礎上加delta,並返回當前值
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    
    // 將value的值當前值的基礎上加1,並返回更新後的值(即當前值加1)
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    // 將value的值當前值的基礎上加-1,並返回更新後的值(即當前值加-1)
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    // 將value的值當前值的基礎上加delta,並返回更新後的值(即當前值加delta)
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

都是利用unsafe.getAndAddInt方法實現的。

四.重要示例

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

class Data {
    AtomicInteger num;

    public Data(int num) {
        this.num = new AtomicInteger(num);
    }

    public int getAndDecrement() {
        return num.getAndDecrement();
    }
}

class MyRun implements Runnable {

    private Data data;
    /**
     * 用來記錄全部賣出票的編號
     */
    private List<Integer> list;
    private CountDownLatch latch;

    public MyRun(Data data, List<Integer> list, CountDownLatch latch) {
        this.data = data;
        this.list = list;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            action();
        }  finally {
            // 釋放latch共享鎖
            latch.countDown();
        }
    }

    /**
     * 進行買票操做,注意這裏沒有使用data.num>0做爲判斷條件,直到賣完線程退出。
     * 那麼作會致使這兩處使用了共享變量data.num,那麼作多線程同步時,就要考慮更多條件。
     * 這裏只for循環了5次,表示每一個線程只賣5張票,並將全部賣出去編號存入list集合中。
     */
    public void action() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int newNum = data.getAndDecrement();

            System.out.println("線程"+Thread.currentThread().getName()+"  num=="+newNum);
            list.add(newNum);
        }
    }
}

public class ThreadTest {
    public static void startThread(Data data, String name, List<Integer> list,CountDownLatch latch) {
        Thread t = new Thread(new MyRun(data, list, latch), name);
        t.start();
    }

    public static void main(String[] args) {
        // 使用CountDownLatch來讓主線程等待子線程都執行完畢時,才結束
        CountDownLatch latch = new CountDownLatch(6);

        long start = System.currentTimeMillis();
        // 這裏用併發list集合
        List<Integer> list = new CopyOnWriteArrayList<>();
        Data data = new Data(30);
        startThread(data, "t1", list, latch);
        startThread(data, "t2", list, latch);
        startThread(data, "t3", list, latch);
        startThread(data, "t4", list, latch);
        startThread(data, "t5", list, latch);
        startThread(data, "t6", list, latch);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 處理一下list集合,進行排序和翻轉
        Collections.sort(list);
        Collections.reverse(list);
        System.out.println(list);

        long time = System.currentTimeMillis() - start;
        // 輸出一共花費的時間
        System.out.println("\n主線程結束 time=="+time);
    }
}

結果輸出

線程t2  num==30
線程t1  num==25
線程t5  num==29
線程t6  num==26
線程t4  num==28
線程t3  num==27
線程t4  num==24
線程t2  num==19
線程t1  num==20
線程t3  num==22
線程t5  num==21
線程t6  num==23
線程t5  num==17
線程t1  num==14
線程t6  num==13
線程t3  num==15
線程t2  num==18
線程t4  num==16
線程t4  num==10
線程t1  num==7
線程t6  num==12
線程t3  num==8
線程t2  num==9
線程t5  num==11
線程t5  num==6
線程t1  num==1
線程t6  num==2
線程t2  num==3
線程t4  num==4
線程t3  num==5
[30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]

主線程結束 time==57

咱們使用AtomicInteger,代替同步鎖來解決多線程安全的。

相關文章
相關標籤/搜索