Java原子類中CAS的底層實現

Java原子類中CAS的底層實現

從Java到c++到彙編, 深刻講解cas的底層原理.html

介紹原理前, 先來一個Demo

以AtomicBoolean類爲例.先來一個調用cas的demo.java

主線程在for語句裏cas忙循環, 直到cas操做成功返回true爲止.linux

而新開的一個縣城new Thread 會在4秒後,將flag設置爲true, 爲了讓主線程可以設置成功.(由於cas的預期值是true, 而flag被初始化爲了false)ios

現象就是主線程一直在跑for循環. 4秒後, 主線程將會設置成功, 而後輸出時間差, 而後終止for循環.c++

public class TestAtomicBoolean {
    public static void main(String[] args) {
        AtomicBoolean flag = new AtomicBoolean(false);

        long start = System.currentTimeMillis();

        new Thread(()->{
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag.set(true);
        }).start();

        for(;;){
            if(flag.compareAndSet(true,false)){
                System.out.println(System.currentTimeMillis() - start);
                System.out.println("inner loop OK!");
                break;
            }
        }
    }
}

這裏只是舉了一個例子, 也許這個例子也不太恰當, 本文只是列出了這個api的調用方法而已, 重點在於介紹compareAndSet()方法的底層原理.windows

Java級源碼AtomicBoolean.java

發現AtomicBoolean的compareAndSet()調用的是unsafe裏的compareAndSwapInt()方法.api

Java級源碼Unsafe.java

有的同窗可能好奇, 其中的unsafe是怎麼來的.緩存

在AtomicBoolean類中的靜態成員變量:多線程

若是還要細究Unsafe.getUnsafe()是怎麼實現的話....那麼我再貼一份Unsafe類裏的getUnsafe的代碼:函數

首先, 在Unsafe類裏, 本身就有了一個本身的實例.(並且是單例的)

而後Unsafe類裏的getUnsafe()方法會進行檢查, 最終會return這個單例 theUnsafe.

剛剛跑去取介紹了getUnsafe()方法...接下來繼續講解cas...

剛纔說到了AtomicBoolean類裏的compareAndSet()方法內部其實調用了Unsafe類裏的compareAndSwapInt()方法.

Unsafe類裏的compareAndSwapInt源碼以下:

(OpenJDK8的源碼里路徑: openjdk/jdk/src/share/classes/sun/misc/Unsafe.java)

發現這裏是一段native方法.說明繼續看源碼的話, 從這裏就開始脫離Java語言了....

c++級源碼Unsafe.cpp

本源碼在OpenJDK8裏的路徑爲: openjdk/hotspot/src/share/vm/prims/unsafe.cpp

(這裏臨時跑題一下:   若是說要細究 UNSAFE_ENTRY 是什麼的話...UNSAFE_ENTRY 就是 JVM_ENTRY, 而 JVM_ENTRY 在interfaceSupport.hpp裏面定義了, jni相關.若是想看的話, 源碼路徑在OpenJDK8中的路徑是這個: 

openjdk/hotspot/src/share/vm/runtime/interfaceSupport.hpp)

回到本文的主題cas....上面截圖的這段代碼, 看後面那一句return, 發現其中的使用到的是Atomic下的cmpxchg()方法.

c++級源碼atomic.cpp

本段源碼對應OpenJDK8的路徑是這個: openjdk/hotspot/src/share/vm/runtime/atomic.cpp

其中的cmpxchg爲核心內容. 可是這句代碼根據操做系統和處理器的不一樣, 使用不一樣的底層代碼. 

而atomic.inline.hpp裏聲明以下:

可見 ...不一樣不一樣操做系統, 不一樣的處理器, 都要走不一樣的cmpxchg()方法的實現.

我們接下來以其中的linux操做系統 x86處理器爲例 , atomic_linux_x86.inline.hpp

彙編級源碼atomic_linux_x86.inline.hpp

OpenJDK中路徑以下: openjdk/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

看到了__asm__, 說明c++要開始內聯彙編了,說明繼續看代碼的話, 將會是彙編語言.

這是一段內聯彙編:

其中 __asm__ volatile 指示了編譯器不要改動優化後面的彙編語句, 若是進行了優化(優化是爲了減小訪問內存, 直接經過緩存, 加快取讀速度), 那麼就在這段函數的週期內, 某幾個變量就至關於常亮了, 其值可能會與內存中真實的值有差別.


2018.6.7更新: 10天沒更新了, 因爲實習結束, 這幾天手頭沒電腦, 並且租房火車回學校等各類問題繁雜...

瞭解彙編指令cmpxchg

環境

懟代碼以前...先把環境介紹一下...彙編的指令細節方便會有所不一樣, 好比__asm__  和asm不必定同樣, 又好比 asm(...)  和  asm{...}  的括號問題, 又好比 彙編指令的首操做數和第2操做數的含義是顛倒的, 也就是`mov 操做數1, 操做數2` , 要改寫成`mov  操做數2, 操做數1`...反正很亂..我也不專門搞彙編, 我只能保證我這裏使用g++進行編譯能正常運行.(linux下或者mac下用g++編譯是沒問題的, windows就不知道了...)

內聯彙編格式

有些同窗可能會對上面突如其來的彙編語句感到迷茫...

先來一下內聯彙編的語法格式:

asm volatile("Instruction List"

: Output

: Input

: Clobber/Modify);

其中的Output和Input你們很好理解, 可是Clobber/Modify是什麼意思呢:

(下面關於Clobber/Modify的內容引用自:https://blog.csdn.net/dlh0313/article/details/52172833)
有時候,當你想通知GCC當前內聯彙編語句可能會對某些寄存器或內存進行修改,但願GCC在編譯時可以將這一點考慮進去;那麼你就能夠在Clobber/Modify部分聲明這些寄存器或內存

你們能夠看看上面圖片中openJDK中的彙編代碼, 裏面的Clobber/Modify部分是"cc"和"memory", memory好理解, 就是內存. 那麼"cc"是什麼呢?

當一個內聯彙編中包含影響標誌寄存器eflags的條件,那麼也須要在Clobber/Modify部分中使用"cc"來向GCC聲明這一點

內聯彙編的簡單例子

接下來展現一個簡單的c++內聯彙編的程序: a是1000, 經過彙編來給b變量也賦值爲1000

#include<iostream>

using namespace std;

int main() {
    int a = 1000, b = 0;

    asm("movl %1,%%eax\n"
        "movl %%eax,%0\n"
    : "=r"(b)
    : "r" (a)
    : "%eax");
    cout << "a := " << a << endl;
    cout << "b := " << b << endl;
    return 0;
}

首先在c++裏定義了a=1000, b=0;

而後看asm裏的第一行冒號, 這裏表示Output    ` : "=r"(b) `, b是第0個參數, 等號(=)表示當前輸出表達式的屬性爲只寫, r表示寄存器

而後看asm裏的第一行冒號, 這裏表示Input        `: "r" (a)`, a是第1個參數, r表示寄存器.

而後看asm裏的第一行指令   `movl %1,%%eax\n`     , 將第0個參數的值(變量a)傳入到寄存器eax中

而後看asm裏的第二行指令   `movl %%eax,%0\n`     , 將寄存器eax的值傳給第一個參數(變量b)

接下來使用c++的cout進行輸出, 查看是否賦值成功, 結果以下:

再來一個簡單彙編例子

用匯編給a變量賦值爲100

#include<iostream>

using namespace std;

int main() {
    int a = 0;

    asm("movl $100,%%eax\n"
        "movl %%eax,%0\n"
    : "=r"(a)
    : /*no input*/
    : "%eax", "memory");
    cout << "a := " << a << endl;
    return 0;
}

(固然若是用的熟練, 就不須要使用兩句mov來實現這個功能. 能夠直接 movl $100, %0 ,就能夠實現把100賦值給a變量了. 這裏只是爲了演示使用寄存器, 因此先給寄存器eax存上100, 再把寄存器的值賦值給a, 用寄存器eax作了一箇中間的臨時存儲)

程序運行結果以下:

內聯彙編cmpxchg(cmpxchg比對成功)

(有同窗不會編譯運行g++下的cpp, 我順便也在這裏作一個示範吧.....)

#include<iostream>

using namespace std;

int main() {
    int cpp_eax = 0;
    int cpp_ebx = 0;
    int expect = 2222;
    int target = 8888;
    asm("movl    %2,    %%eax \n"  /*將2222存入到eax寄存器中*/
        "movl    %3,    %%ebx \n"  /*將8888存入到ebx寄存器中*/
        "cmpxchg %%ebx, %2    \n"  /*若是變量expect的值與寄存器eax的值相等(成功), 那麼ebx的值就賦給expect*/
        "movl    %%eax, %0    \n"  /*將寄存器eax的值賦值給變量cpp_eax*/
        "movl    %%ebx, %1    \n"  /*將寄存器ebx的值賦值給變量cpp_ebx*/
    :"=r"(cpp_eax), "=r"(cpp_ebx), "+r"(expect)   /*等於號表示可寫, 加號表示可寫可讀*/
    :"r"(target)
    /*cpp_eax是%0, cpp_ebx是%1, expect是%2, target是%3*/
    :"%eax", "%ebx", "memory", "cc");
    cout << "eax := " << cpp_eax << endl;
    cout << "ebx := " << cpp_ebx << endl;
    cout << "expect := " << expect << endl;
    cout << "target := " << target << endl;
}

 運行方式和結果以下:

內聯彙編cmpxchg(cmpxchg比對失敗)

若是cmpxchg指令中的第二個操做數與寄存器eax進行比對, 發現值不同的時候, 就會比對失敗, 那麼expect的值就會賦值給eax, 而expect的值保持不變.

#include<iostream>

using namespace std;

int main() {
    int cpp_eax = 0;
    int cpp_ebx = 0;
    int expect = 2222;
    int target = 8888;
    asm("movl    $77,   %%eax \n"  /*將字面量77存入到eax寄存器中*/
        "movl    %3,    %%ebx \n"  /*將8888存入到ebx寄存器中*/
        "cmpxchg %%ebx, %2    \n"  /*若是變量expect的值與寄存器eax的值不相等(失敗), 那麼expect的值就賦給eax*/
        "movl    %%eax, %0    \n"  /*將寄存器eax的值賦值給變量cpp_eax*/
        "movl    %%ebx, %1    \n"  /*將寄存器ebx的值賦值給變量cpp_ebx*/
    :"=r"(cpp_eax), "=r"(cpp_ebx), "+r"(expect)   /*等於號表示可寫, 加號表示可寫可讀*/
    :"r"(target)
    /*cpp_eax是%0, cpp_ebx是%1, expect是%2, target是%3*/
    :"%eax", "%ebx", "memory", "cc");
    cout << "eax := " << cpp_eax << endl;
    cout << "ebx := " << cpp_ebx << endl;
    cout << "expect := " << expect << endl;
    cout << "target := " << target << endl;
}

 

 cmpxchg指令結論

     1.指令格式: CMPXCHG 操做數1 (8位/16位/32位寄存器),  操做數2(能夠是任意寄存器或者內存memory)

 

     2.指令做用: 將累加器AL/AX/EAX(也就是%eax)中的值與第2操做數(目的操做數)比較,若是相等,第首操做數(源操做數)的值裝載到第2操做數,zf置1。若是不等, 第2操做數的值裝載到AL/AX/EAX並將zf清0

 

     3.該指令只能用於486及其後繼機型。

lock前綴

本段的文字理論介紹引用自(http://www.weixianmanbu.com/article/736.html):

 在單處理器系統中是不須要加lock的,由於可以在單條指令中完成的操做均可以認爲是原子操做,中斷只能發生在指令與指令之間。

 在多處理器系統中,因爲系統中有多個處理器在獨立的運行,即便在能單條指令中完成的操做也可能受到干擾。

在全部的 X86 CPU 上都具備鎖定一個特定內存地址的能力,當這個特定內存地址被鎖定後,它就能夠阻止其餘的系統總線讀取或修改這個內存地址。這種能力是經過 LOCK 指令前綴再加上下面的彙編指令來實現的。當使用 LOCK 指令前綴時,它會使 CPU 宣告一個 LOCK# 信號,這樣就能確保在多處理器系統或多線程競爭的環境下互斥地使用這個內存地址。當指令執行完畢,這個鎖定動做也就會消失。

#include<iostream>

using namespace std;

int main() {
    int cpp_eax = 0;
    int cpp_ebx = 0;
    int expect = 2222;
    int target = 8888;

    __asm__ __volatile__("movl    $2222, %%eax \n"  /*將字面量2222存入到eax寄存器中*/
                         "movl    %3   , %%ebx \n"  /*將8888存入到ebx寄存器中*/
                 "lock;" "cmpxchg %%ebx, %2    \n"  /*若是變量expect的值與寄存器eax的值不相等(失敗), 那麼expect的值就賦給eax*/
                         "movl    %%eax, %0    \n"  /*將寄存器eax的值賦值給變量cpp_eax*/
                         "movl    %%ebx, %1    \n"  /*將寄存器ebx的值賦值給變量cpp_ebx*/
    :"=r"(cpp_eax), "=r"(cpp_ebx), "=m"(expect)     /*等於號表示可寫, 加號表示可寫可讀*/
    :"r"(target)
    /*cpp_eax是%0, cpp_ebx是%1, expect是%2, target是%3*/
    :"%eax", "%ebx", "memory", "cc");
    cout << "eax := " << cpp_eax << endl;
    cout << "ebx := " << cpp_ebx << endl;
    cout << "expect := " << expect << endl;
    cout << "target := " << target << endl;
}

 結果以下:

注意, 若是加lock前綴的話, 指令的第2操做數必須是存儲在內存中的, 語句格式是這樣`  lock; cmpxchg 操做數1(寄存器), 操做數2(內存) ` .因此, expect做爲output的時候是這樣聲明的: "=m"(expect) 

 若是不是操做的內存, 那麼會報異常. 操做數2必須是內存...在這裏卡了好久

相關文章
相關標籤/搜索