JAVA 中的 CAS

原文地址:www.xilidou.com/2018/02/01/…java

CAS 是現代操做系統,解決併發問題的一個重要手段,最近在看 eureka 的源碼的時候。遇到了不少 CAS 的操做。今天就係統的回顧一下 Java 中的CAS。linux

閱讀這篇文章你將會了解到:spring

  • 什麼是 CAS
  • CAS 實現原理是什麼?
  • CAS 在現實中的應用
    • 自旋鎖
    • 原子類型
    • 限流器
  • CAS 的缺點

什麼是 CAS

CAS: 全稱Compare and swap,字面意思:」比較並交換「,一個 CAS 涉及到如下操做:編程

咱們假設內存中的原數據V,舊的預期值A,須要修改的新值B。安全

  1. 比較 A 與 V 是否相等。(比較)
  2. 若是比較相等,將 B 寫入 V。(交換)
  3. 返回操做是否成功。

當多個線程同時對某個資源進行CAS操做,只能有一個線程操做成功,可是並不會阻塞其餘線程,其餘線程只會收到操做失敗的信號。可見 CAS 實際上是一個樂觀鎖。微信

CAS 是怎麼實現的

跟隨AtomInteger的代碼咱們一路往下,就能發現最終調用的是 sum.misc.Unsafe 這個類。看名稱 Unsafe 就是一個不安全的類,這個類是利用了 Java 的類和包在可見性的的規則中的一個恰到好到處的漏洞。Unsafe 這個類爲了速度,在Java的安全標準上作出了必定的妥協。多線程

再往下尋找咱們發現 Unsafe的compareAndSwapInt 是 Native 的方法:架構

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
複製代碼

也就是說,這幾個 CAS 的方法應該是使用了本地的方法。因此這幾個方法的具體實現須要咱們本身去 jdk 的源碼中搜索。併發

因而我下載一個 OpenJdk 的源碼繼續向下探索,咱們發如今 /jdk9u/hotspot/src/share/vm/unsafe.cpp 中有這樣的代碼:框架

{CC "compareAndSetInt",   CC "(" OBJ "J""I""I"")Z",  FN_PTR(Unsafe_CompareAndSetInt)},
複製代碼

這個涉及到,JNI 的調用,感興趣的同窗能夠自行學習。咱們搜索 Unsafe_CompareAndSetInt後發現:

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);

  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
} UNSAFE_END
複製代碼

最終咱們終於看到了核心代碼 Atomic::cmpxchg

繼續向底層探索,在文件java/jdk9u/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.hpp有這樣的代碼:

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value, cmpxchg_memory_order order) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
複製代碼

咱們經過文件名能夠知道,針對不一樣的操做系統,JVM 對於 Atomic::cmpxchg 應該有不一樣的實現。因爲咱們服務基本都是使用的是64位linux,因此咱們就看看linux_x86 的實現。

咱們繼續看代碼:

  • __asm__ 的意思是這個是一段內嵌彙編代碼。也就是在 C 語言中使用匯編代碼。
  • 這裏的 volatile和 JAVA 有一點相似,但不是爲了內存的可見性,而是告訴編譯器對訪問該變量的代碼就再也不進行優化。
  • LOCK_IF_MP(%4) 的意思就比較簡單,就是若是操做系統是多線程的,那就增長一個 LOCK。
  • cmpxchgl 就是彙編版的「比較並交換」。可是咱們知道比較並交換,有三個步驟,不是原子的。因此在多核狀況下加一個 LOCK,由CPU硬件保證他的原子性。
  • 咱們再看看 LOCK 是怎麼實現的呢?咱們去Intel的官網上看看,能夠知道LOCK在的早期實現是直接將 cup 的總線阻塞,這樣的實現可見效率是很低下的。後來優化爲X86 cpu 有鎖定一個特定內存地址的能力,當這個特定內存地址被鎖定後,它就能夠阻止其餘的系統總線讀取或修改這個內存地址。

關於 CAS 的底層探索咱們就到此爲止。咱們總結一下 JAVA 的 cas 是怎麼實現的:

  • java 的 cas 利用的的是 unsafe 這個類提供的 cas 操做。
  • unsafe 的cas 依賴了的是 jvm 針對不一樣的操做系統實現的 Atomic::cmpxchg
  • Atomic::cmpxchg 的實現使用了彙編的 cas 操做,並使用 cpu 硬件提供的 lock信號保證其原子性

CAS 的應用

瞭解了 CAS 的原理咱們繼續就看看 CAS 的應用:

自旋鎖

public class SpinLock {

  private AtomicReference<Thread> sign =new AtomicReference<>();

  public void lock(){
    Thread current = Thread.currentThread();
    while(!sign .compareAndSet(null, current)){
    }
  }

  public void unlock (){
    Thread current = Thread.currentThread();
    sign .compareAndSet(current, null);
  }
}
複製代碼

所謂自旋鎖,我以爲這個名字至關的形象,在lock()的時候,一直while()循環,直到 cas 操做成功爲止。

AtomicInteger 的 incrementAndGet()

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }
複製代碼

與自旋鎖有殊途同歸之妙,就是一直while,直到操做成功爲止。

令牌桶限流器

所謂令牌桶限流器,就是系統以恆定的速度向桶內增長令牌。每次請求前從令牌桶裏面獲取令牌。若是獲取到令牌就才能夠進行訪問。當令牌桶內沒有令牌的時候,拒絕提供服務。咱們來看看 eureka 的限流器是如何使用 CAS 來維護多線程環境下對 token 的增長和分發的。

public class RateLimiter {

    private final long rateToMsConversion;

    private final AtomicInteger consumedTokens = new AtomicInteger();
    private final AtomicLong lastRefillTime = new AtomicLong(0);

    @Deprecated
    public RateLimiter() {
        this(TimeUnit.SECONDS);
    }

    public RateLimiter(TimeUnit averageRateUnit) {
        switch (averageRateUnit) {
            case SECONDS:
                rateToMsConversion = 1000;
                break;
            case MINUTES:
                rateToMsConversion = 60 * 1000;
                break;
            default:
                throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
        }
    }

    //提供給外界獲取 token 的方法
    public boolean acquire(int burstSize, long averageRate) {
        return acquire(burstSize, averageRate, System.currentTimeMillis());
    }

    public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
        if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
            return true;
        }

        //添加token
        refillToken(burstSize, averageRate, currentTimeMillis);

        //消費token
        return consumeToken(burstSize);
    }

    private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
        long refillTime = lastRefillTime.get();
        long timeDelta = currentTimeMillis - refillTime;

        //根據頻率計算須要增長多少 token
        long newTokens = timeDelta * averageRate / rateToMsConversion;
        if (newTokens > 0) {
            long newRefillTime = refillTime == 0
                    ? currentTimeMillis
                    : refillTime + newTokens * rateToMsConversion / averageRate;

            // CAS 保證有且僅有一個線程進入填充
            if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
                while (true) {
                    int currentLevel = consumedTokens.get();
                    int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
                    int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
                    // while true 直到更新成功爲止
                    if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
                        return;
                    }
                }
            }
        }
    }

    private boolean consumeToken(int burstSize) {
        while (true) {
            int currentLevel = consumedTokens.get();
            if (currentLevel >= burstSize) {
                return false;
            }

            // while true 直到沒有token 或者 獲取到爲止
            if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
                return true;
            }
        }
    }

    public void reset() {
        consumedTokens.set(0);
        lastRefillTime.set(0);
    }
}

複製代碼

因此梳理一下 CAS 在令牌桶限流器的做用。就是保證在多線程狀況下,不阻塞線程的填充token 和消費token。

概括

經過上面的三個應用咱們概括一下 CAS 的應用場景:

  • CAS 的使用可以避免線程的阻塞。
  • 多數狀況下咱們使用的是 while true 直到成功爲止。

CAS 缺點

  1. ABA 的問題,就是一個值從A變成了B又變成了A,使用CAS操做不能發現這個值發生變化了,處理方式是可使用攜帶相似時間戳的版本AtomicStampedReference
  2. 性能問題,咱們使用時大部分時間使用的是 while true 方式對數據的修改,直到成功爲止。優點就是相應極快,但當線程數不停增長時,性能降低明顯,由於每一個線程都須要執行,佔用CPU時間。

總結

CAS 是整個編程重要的思想之一。整個計算機的實現中都有CAS的身影。微觀上看彙編的 CAS 是實現操做系統級別的原子操做的基石。從編程語言角度來看 CAS 是實現多線程非阻塞操做的基石。宏觀上看,在分佈式系統中,咱們可使用 CAS 的思想利用相似Redis的外部存儲,也能實現一個分佈式鎖。

從某個角度來講架構就將微觀的實現放大,或者底層思想就是將宏觀的架構進行微縮。計算機的思想是想通的,因此說了解底層的實現能夠提高架構能力,提高架構的能力一樣可加深對底層實現的理解。計算機知識浩如煙海,可是套路有限。抓住基礎的幾個套路突破,從思想和思惟的角度學習計算機知識。不要將本身的精力花費在不停的追求新技術的腳步上,跟隨‘start guide line’只能寫一個demo,所得也就是一個demo而已。

停下腳步,回顧基礎和經典或許對於技術的提高更大一些。

但願這篇文章對你們有所幫助。

徒手擼框架系列文章地址:

徒手擼框架--高併發環境下的請求合併

徒手擼框架--實現IoC

徒手擼框架--實現Aop

歡迎關注個人微信公衆號

二維碼
相關文章
相關標籤/搜索