前幾天刷朋友圈的時候,看到一段話:若是如今我是傻逼,那麼我如今無論怎麼努力,也仍是傻逼,由於我如今的傻逼是由之前決定的,如今努力,是爲了讓之後的本身再也不傻逼。話糙理不糙,若是妄想如今努力一下,立刻就再也不傻逼,那是不可能的,須要積累,須要沉澱,才能慢慢的再也不傻逼。mysql
好了,雞湯喝完。程序員
今天咱們的內容是CAS以及原子操做類應用與源碼淺析,還會利用CAS來完成一個單例模式,還涉及到僞共享等。由於CAS是併發框架的基石,因此至關重要,這篇博客是一個長文,請作好準備。面試
說到CAS,不得不提到兩個專業詞語:悲觀鎖,樂觀鎖。咱們先來看看什麼是悲觀鎖,什麼是樂觀鎖。算法
第一次看到悲觀鎖,樂觀鎖的時候,應該是在應付面試,看面試題的時候。有這麼一個例子:如何避免多線程對數據庫中的同一條記錄進行修改。sql
若是是mysql數據庫,利用for update關鍵字+事務。這樣的效果就是當A線程走到for update的時候,會把指定的記錄上鎖,而後B線程過來,就只能等待,A線程修改完數據以後,提交事務,鎖就被釋放了,這個時候B線程終於能夠繼續作他的事情了。悲觀鎖每每是互斥的:只有我一我的能夠進來,其餘人都給我等着。這麼作是至關影響性能的。數據庫
在數據表中加一個版本號的字段:version,這個字段不須要程序員手動維護,是數據庫主動維護的,每次修改數據,version都會發生更改。數組
當version如今是1:緩存
樂觀鎖其實不能叫鎖,它沒有鎖的概念。安全
在Java中,也有悲觀鎖,樂觀鎖的概念,悲觀鎖的典型表明就是Synchronized,而樂觀鎖的典型表明就是今天要說的CAS。而說CAS以前,先要說下原子操做類,由於CAS是原子操做類的基石,咱們先要看看原子操做類的強大之處,從而產生探究CAS的興趣。bash
咱們先來看看原子操做類的應用。在Java中提供了不少原子操做類,好比AtomicInteger,其中有一個自增方法。
public class Main {
public static void main(String[] args) {
Thread[] threads = new Thread[20];
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 20; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.incrementAndGet();
}
});
threads[i].start();
}
join(threads);
System.out.println("x=" + atomicInteger.get());
}
private static void join(Thread[] threads) {
for (int i = 0; i < 20; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複製代碼
運行結果:
這就是原子操做類的神奇之處了,在高併發的狀況下,這種方法會比Synchronized更有優點,畢竟Synchronized關鍵字會讓代碼串行化,失去了多線程優點。
咱們再來看個案例:
若是有一個需求,一個字段的初始值爲0,開三個線程:
public static void main(String[] args) {
AtomicInteger atomicInteger=new AtomicInteger();
new Thread(() -> {
if(!atomicInteger.compareAndSet(0,100)){
System.out.println("0-100:失敗");
}
}).start();
new Thread(() -> {
try {
Thread.sleep(500);////注意這裏睡了一下子,目的是讓第三個線程先執行判斷的操做,從而讓第三個線程修改失敗
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!atomicInteger.compareAndSet(100,50)){
System.out.println("100-50:失敗");
}
}).start();
new Thread(() -> {
if(!atomicInteger.compareAndSet(50,60)){
System.out.println("50-60:失敗");
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製代碼
運行結果也是同樣的:
這個例子好像沒有什麼意思啊,甚至有點無聊,爲何要舉這個例子呢,由於在這裏,我所調用的方法compareAndSet,首字母就是CAS,並且傳遞了兩個參數,這兩個參數是在原生CAS操做中必需要傳遞的,離原生的CAS操做更近一些。
既然原子操做類那麼牛逼,咱們頗有必要探究下原子操做類的基石:CAS。
CAS的全稱是Compare And Swap,即比較交換,固然還有一種說法:Compare And Set,調用原生CAS操做須要肯定三個值:
其中,要更新的字段(變量)有時候會被拆分紅兩個參數:1.實例 2.偏移地址。
也許你看到這裏,會以爲雲裏霧裏,不知道我在說什麼,不要緊,繼續硬着頭皮看下去。
咱們先來看看compareAndSet的源碼。
首先,調用這個方法須要傳遞兩個參數,一個是預期值,一個是新值,這個預期值就至關於數據庫樂觀鎖版本號的概念,新值就是咱們但願修改的值(是值,不是字段)。咱們來看看這個方法的內部實現:
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
複製代碼
調用了unsafe下的compareAndSwapInt方法,除了傳遞了咱們傳到此方法的兩個參數以外,又傳遞了兩個參數,這兩個參數就是我上面說的實例和偏移地址,this表明是當前類的實例,即AtomicInteger類的實例,這個偏移地址又是什麼鬼呢,說的簡單點,就是肯定咱們須要修改的字段在實例的哪一個位置。知道了實例,知道了咱們的須要修改的字段是在實例的哪一個位置,就能夠肯定這個字段了。不過,這個肯定的過程不是在Java中作的,而是在更底層作的。
偏移地址是在本類的靜態代碼塊中得到的:
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
複製代碼
unsafe.objectFieldOffset接收的是Field類型的參數,獲得的就是對應字段的偏移地址了,這裏就是得到value字段在本類,即AtomicInteger中的偏移地址。
咱們在來看看value字段的定義:
private volatile int value;
複製代碼
volatile是爲了保證內存的可見性。
你們確定想一探究竟compareAndSwapInt和objectFieldOffset這兩個方法中作了什麼事情,很遺憾,我的水平有限,目前尚未能力去探究,只知道這種寫法是JNI,會調用到C或者C++,最終會把對應的指令發送給CPU,這是能夠保證原子性的。
咱們能夠看下這兩個方法的定義:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public native long objectFieldOffset(Field var1);
複製代碼
這兩個方法被native標記了。
咱們來爲compareAndSwapInt方法作一個比較形象的解釋:
當咱們執行compareAndSwapInt方法,傳入10和100,Java會和更底層進行通訊:老鐵,我給你了字段的所屬實例和偏移地址,你幫我看下這個字段的值是否是10,若是是10的話,你就改爲100,而且返回true,若是不是的話,不用修改,返回false把。
其中比較的過程就是compare,修改的值的過程就是swap,由於是把舊值替換成新值,因此咱們把這樣的操做稱爲CAS。
咱們再來看看incrementAndGet的源碼。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
複製代碼
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
複製代碼
incrementAndGet方法會調到用getAndAddInt方法,這裏有三個參數:
getAndAddInt方法內部有一個while循環,循環體內部根據實例和偏移地址得到對應的值,這裏先稱爲A,再來看看while裏面的判斷內容,JDK和更底層進行通信:嘿,我把實例和偏移地址給你,你幫我看下這個值是否是A,若是是的話,幫我修改爲A+1,返回true,若是不是的話,返回false吧。
這裏要思考一個問題:爲何須要while循環?
好比同時有兩個線程執行到了getIntVolatile方法,拿到的值都是10,其中線程A執行native方法,修改爲功,可是線程B就修改失敗了啊,由於CAS操做是能夠保證原子性的,因此線程B只能苦逼的再一次循環,這一次拿到的值是11,又去執行native方法,修改爲功。
像這樣的while循環,有一個高大上的稱呼:CAS自旋。
讓咱們試想一下,若是如今併發真的很高很高,會出現什麼事情?大量的線程在進行CAS自旋,這太浪費CPU了吧。因此在Java8以後,對原子操做類進行了必定的優化,這個咱們後面再說。
可能你們對於原子操做類的底層實現,仍是比較迷茫,仍是不知道unsafe下面的方法究竟是什麼意思,畢竟剛纔只是簡單的讀了下代碼,俗話說「紙上得來終覺淺,絕知此事要躬行」,因此咱們須要本身調用下unsafe下面的方法,來加深理解。
Unsafe:不安全的,既然有這樣的命名,說明這個類是比較危險的,Java官方也不推薦咱們直接操做Unsafe類,可是畢竟如今是學習階段,寫寫demo而已,只要不是發佈到生產環境,又有什麼關係呢?
Unsafe下面的方法仍是比較多的,咱們選擇幾個方法來看下,最終咱們會利用這幾個方法來完成一個demo。
objectFieldOffset:接收一個Field類型的數據,返回偏移地址。 compareAndSwapInt:比較交換,接收四個參數:實例,偏移地址,預期值,新值。 getIntVolatile:得到值,支持Volatile,接收兩個參數:實例,偏移地址。
這三個方法在上面的源碼淺析中,已經出現過了,也進行了必定的解釋,這裏再解釋一下,就是爲了加深印象,我在學習CAS的時候,也是反覆的看博客,看源碼,忽然恍然大悟。咱們須要用這三個方法來完成一個demo:寫一個原子操做自增的方法,自增的值能夠自定義,沒錯,這個方法上面我已經分析過了。下面直接放出代碼:
public class MyAtomicInteger {
private volatile int value;
private static long offset;//偏移地址
private static Unsafe unsafe;
static {
try {
Field theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafeField.setAccessible(true);
unsafe = (Unsafe) theUnsafeField.get(null);
Field field = MyAtomicInteger.class.getDeclaredField("value");
offset = unsafe.objectFieldOffset(field);//得到偏移地址
} catch (Exception e) {
e.printStackTrace();
}
}
public void increment(int num) {
int tempValue;
do {
tempValue = unsafe.getIntVolatile(this, offset);//拿到值
} while (!unsafe.compareAndSwapInt(this, offset, tempValue, value + num));//CAS自旋
}
public int get() {
return value;
}
}
複製代碼
public class Main {
public static void main(String[] args) {
Thread[] threads = new Thread[20];
MyAtomicInteger atomicInteger = new MyAtomicInteger();
for (int i = 0; i < 20; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.increment(1);
}
});
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("x=" + atomicInteger.get());
}
}
複製代碼
運行結果:
你可能會有疑問,爲何須要用反射來獲取theUnsafe,其實這是JDK爲了保護咱們,讓咱們沒法方便的得到unsafe,若是咱們和JDK同樣來得到unsafe會報錯:
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");//若是咱們也以getUnsafe來得到theUnsafe,會拋出異常
} else {
return theUnsafe;
}
}
複製代碼
對的,你沒看錯,我也沒寫錯,用CAS也能夠完成單例模式,雖然在正常開發中,不會有人用CAS來完成單例模式,可是是檢驗是否學會CAS的一個很好的題目。
public class Singleton {
private Singleton() {
}
private static AtomicReference<Singleton> singletonAtomicReference = new AtomicReference<>();
public static Singleton getInstance() {
while (true) {
Singleton singleton = singletonAtomicReference.get();// 得到singleton
if (singleton != null) {// 若是singleton不爲空,就返回singleton
return singleton;
}
// 若是singleton爲空,建立一個singleton
singleton = new Singleton();
// CAS操做,預期值是NULL,新值是singleton
// 若是成功,返回singleton
// 若是失敗,進入第二次循環,singletonAtomicReference.get()就不會爲空了
if (singletonAtomicReference.compareAndSet(null, singleton)) {
return singleton;
}
}
}
}
複製代碼
註釋寫的已經比較清楚了,能夠對着註釋,再好好理解一下。
compareAndSet方法,上面已經寫過一個demo,你們能夠也試着分析下源碼,我就再也不分析了,我之因此要再次提到compareAndSet方法,是爲了引出一個問題。
假設有三個步驟:
請仔細看,這三個步驟作的事情,一個變量剛開始是150,修改爲了50,後來又被修改爲了150!(又改回去了),最後若是這個變量是150,再改爲90。這就是CAS中ABA的問題。
第三步,判斷這個值是不是150,有兩種不一樣的需求:
針對於第二個需求,咱們能夠用AtomicStampedReference來解決這個問題,AtomicStampedReference支持泛型,其中有一個stamp的概念。下面直接貼出代碼:
public static void main(String[] args) {
try {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(150, 0);
Thread thread1 = new Thread(() -> {
Integer oldValue = atomicStampedReference.getReference();
int stamp = atomicStampedReference.getStamp();
if (atomicStampedReference.compareAndSet(oldValue, 50, 0, stamp + 1)) {
System.out.println("150->50 成功:" + (stamp + 1));
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(1000);//睡一下子,是爲了保證線程1 執行完畢
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer oldValue = atomicStampedReference.getReference();
int stamp = atomicStampedReference.getStamp();
if (atomicStampedReference.compareAndSet(oldValue, 150, stamp, stamp + 1)) {
System.out.println("50->150 成功:" + (stamp + 1));
}
});
thread2.start();
Thread thread3 = new Thread(() -> {
try {
Thread.sleep(2000);//睡一下子,是爲了保證線程1,線程2 執行完畢
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer oldValue = atomicStampedReference.getReference();
int stamp = atomicStampedReference.getStamp();
if (atomicStampedReference.compareAndSet(oldValue, 90, 0, stamp + 1)) {
System.out.println("150->90 成功:" + (stamp + 1));
}
});
thread3.start();
thread1.join();
thread2.join();
thread3.join();
System.out.println("如今的值是" + atomicStampedReference.getReference() + ";stamp是" + atomicStampedReference.getStamp());
} catch (Exception e) {
e.printStackTrace();
}
}
複製代碼
在進行incrementAndGet源碼解析的時候,說到一個問題:在高併發之下,N多線程進行自旋競爭同一個字段,這無疑會給CPU形成必定的壓力,因此在Java8中,提供了更完善的原子操做類:LongAdder。
咱們簡單的說下它作了下什麼優化,它內部維護了一個數組Cell[]和base,Cell裏面維護了value,在出現競爭的時候,JDK會根據算法,選擇一個Cell,對其中的value進行操做,若是仍是出現競爭,會換一個Cell再次嘗試,最終把Cell[]裏面的value和base相加,獲得最終的結果。
由於其中的代碼比較複雜,我就選擇幾個比較重要的問題,帶着問題去看源碼:
這是LongAdder類的UML圖:
add方法:
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {//第一行
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||//第二行
(c = cs[getProbe() & m]) == null ||//第三行
!(uncontended = c.cas(v = c.value, v + x)))//第四行
longAccumulate(x, null, uncontended);//第五行
}
}
複製代碼
第一行: ||判斷,前者是判斷cs=cells是否【不爲空】,後者是判斷CAS是否【不成功】 。 casBase作什麼了?
final boolean casBase(long cmp, long val) {
return BASE.compareAndSet(this, cmp, val);
}
複製代碼
這個比較簡單,就是調用compareAndSet方法,判斷是否成功:
再回到第一行,總體解釋下這個判斷:若是cell[]已經被初始化了,或者有競爭,纔會進入到第二行代碼。若是沒有競爭,也沒有初始化,就不會進入到第二行代碼。
這就回答了第二個問題:若是沒有競爭,只會對base進行操做,是從這裏看出來的。
第二行代碼: ||判斷,前者判斷cs是否【爲NULL】,後者判斷(cs的長度-1)是否【大於0】。這兩個判斷,應該都是判斷Cell[]是否初始化的。若是沒有初始化,會進入第五行代碼。
第三行代碼: 若是cell進行了初始化,經過【getProbe() & m】算法獲得一個數字,判斷cs[數字]是否【爲NULL】,而且把cs[數字]賦值給了c,若是【爲NULL】,會進入第五行代碼。 咱們須要簡單的看下getProbe() 中作了什麼:
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
}
private static final VarHandle THREAD_PROBE;
複製代碼
咱們只要知道這個算法是根據THREAD_PROBE算出來的便可。
第四行代碼: 對c進行了CAS操做,看是否成功,而且把返回值賦值給uncontended,若是當前沒有競爭,就會成功,若是當前有競爭,就會失敗,在外面有一個!(),因此CAS失敗了,會進入第五行代碼。須要注意的是,這裏已是對Cell元素進行操做了。
第五行代碼: 這方法內部很是複雜,咱們先看下方法的總體:
有三個if: 1.判斷cells是否被初始化了,若是被初始化了,進入這個if。
這裏面又包含了6個if,真可怕,可是在這裏,咱們不用所有關注,由於咱們的目標是解決上面提出來的問題。
咱們仍是先總體看下:
第一個判斷:根據算法,拿出cs[]中的一個元素,而且賦值給c,而後判斷是否【爲NULL】,若是【爲NULL】,進入這個if。
if (cellsBusy == 0) { // 若是cellsBusy==0,表明如今「不忙」,進入這個if
Cell r = new Cell(x); //建立一個Cell
if (cellsBusy == 0 && casCellsBusy()) {//再次判斷cellsBusy ==0,加鎖,這樣只有一個線程能夠進入這個if
//把建立出來Cell元素加入到Cell[]
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;//表明如今「不忙」
}
continue; // Slot is now non-empty
}
}
collide = false;
複製代碼
這就對第一個問題進行了補充,初始化Cell[]的時候,其中一個元素是NULL,這裏對這個爲NULL的元素進行了初始化,也就是隻有用到了這個元素,纔去初始化。
第六個判斷:判斷cellsBusy是否爲0,而且加鎖,若是成功,進入這個if,對Cell[]進行擴容。
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue;
複製代碼
這就回答了第五個問題的一半:擴容Cell[]的時候,利用CAS加了鎖,因此保證線程的安全性。
那麼第四個問題呢?首先你要注意,最外面是一個for (;;)死循環,只有break了,才終止循環。
一開始collide爲false,在第三個if中,對cell進行CAS操做,若是成功,就break了,因此咱們須要假設它是失敗的,進入第四個if,第四個if中會判斷Cell[]的長度是否大於CPU核心數, 若是小於核心數,會進入第五個判斷,這個時候collide爲false,會進入這個if,把collide改成true,表明有衝突,而後跑到advanceProbe方法,生成一個新的THREAD_PROBE,再次循環。若是在第三個if中,CAS仍是失敗,再次判斷Cell[]的長度是否大於核心數,若是小於核心數,會進入第五個判斷,這個時候collide爲true,因此不會進入第五個if中去了,這樣就進入了第六個判斷,進行擴容。是否是很複雜。
簡單的來講,Cell[]擴容的時機是:當Cell[]的長度小於CPU核心數,而且已經兩次Cell CAS失敗了。
2.前面兩個判斷很好理解,主要看第三個判斷:
final boolean casCellsBusy() {
return CELLSBUSY.compareAndSet(this, 0, 1);
}
複製代碼
cas設置CELLSBUSY爲1,能夠理解爲加了個鎖,由於立刻就要進行初始化了。
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
複製代碼
初始化Cell[],能夠看到長度爲2,根據算法,對其中的一個元素進行初始化,也就是此時Cell[]的長度爲2,可是裏面有一個元素仍是NULL,如今只是對其中一個元素進行了初始化,最終把cellsBusy修改爲了0,表明如今「不忙了」。
這就回答了 第一個問題:當出現競爭,且Cell[]尚未被初始化的時候,會初始化Cell[]。 第四個問題:初始化的規則是建立長度爲2的數組,可是隻會初始化其中一個元素,另一個元素爲NULL。 第五個問題的一半:在對Cell[]進行初始化的時候,是利用CAS加了鎖,因此能夠保證線程安全。
3.若是上面的都失敗了,對base進行CAS操做。
若是你們跟着我一塊兒在看源碼,會發現一個可能之前歷來也沒有見過的註解:
這個註解是幹什麼的?Contended是用來解決僞共享的。
好了,又引出來一個知識盲區,僞共享爲什麼物。
咱們知道CPU和內存之間的關係:當CPU須要一個數據,會先去緩存中找,若是緩存中沒有,會去內存找,找到了,就把數據複製到緩存中,下次直接去緩存中取出便可。
可是這種說法,並不完善,在緩存中的數據,是以緩存行的形式存儲的,什麼意思呢?就是一個緩存行可能不止一個數據。假如一個緩存行的大小是64字節,CPU去內存中取數據,會把臨近的64字節的數據都取出來,而後複製到緩存。
這對於單線程,是一種優化。試想一下,若是CPU須要A數據,把臨近的BCDE數據都從內存中取出來,而且放入緩存了,CPU若是再須要BCDE數據,就能夠直接去緩存中取了。
但在多線程下就有劣勢了,由於同一緩存行的數據,同時只能被一個線程讀取,這就叫僞共享了。
有沒有辦法能夠解決這問題呢?聰明的開發者想到了一個辦法:若是緩存行的大小是64字節,我能夠加上一些冗餘字段來填充到64字節。
好比我只須要一個long類型的字段,如今我再加上6個long類型的字段做爲填充,一個long佔8字節,如今是7個long類型的字段,也就是56字節,另外對象頭也佔8個字節,正好64字節,正好夠一個緩存行。
可是這種辦法不夠優雅,因此在Java8中推出了@jdk.internal.vm.annotation.Contended註解,來解決僞共享的問題。可是若是開發者想用這個註解, 須要添加 JVM 參數,具體參數我在這裏就不說了,由於我沒有親測過。
這一章的篇幅至關長,幾乎涵蓋了CAS中大部分常見的問題。
併發框架,是很是難學的,由於在開發中,不多會真正用到併發方面的知識,可是併發對於提升程序的性能,吞吐量是很是有效的手段,因此併發是值得花時間去學習,去研究的。