在[高併發Java 一] 前言中已經提到了無鎖的概念,因爲在jdk源碼中有大量的無鎖應用,因此在這裏介紹下無鎖。 java
CAS算法的過程是這樣:它包含3個參數CAS(V,E,N)。V表示要更新的變量,E表示預期值,N表示新值。僅當V
值等於E值時,纔會將V的值設爲N,若是V值和E值不一樣,則說明已經有其餘線程作了更新,則當前線程什麼
都不作。最後,CAS返回當前V的真實值。CAS操做是抱着樂觀的態度進行的,它老是認爲本身能夠成功完成
操做。當多個線程同時使用CAS操做一個變量時,只有一個會勝出,併成功更新,其他均會失敗。失敗的線程
不會被掛起,僅是被告知失敗,而且容許再次嘗試,固然也容許失敗的線程放棄操做。基於這樣的原理,CAS
操做即時沒有鎖,也能夠發現其餘線程對當前線程的干擾,並進行恰當的處理。 算法
咱們會發現,CAS的步驟太多,有沒有可能在判斷V和E相同後,正要賦值時,切換了線程,更改了值。形成了數據不一致呢? 設計模式
事實上,這個擔憂是多餘的。CAS整一個操做過程是一個原子操做,它是由一條CPU指令完成的。 數組
CAS的CPU指令是cmpxchg 安全
指令代碼以下: 多線程
/* accumulator = AL, AX, or EAX, depending on whether a byte, word, or doubleword comparison is being performed */ if(accumulator == Destination) { ZF = 1; Destination = Source; } else { ZF = 0; accumulator = Destination; }目標值和寄存器裏的值相等的話,就設置一個跳轉標誌,而且把原始數據設到目標裏面去。若是不等的話,就不設置跳轉標誌了。
Java當中提供了不少無鎖類,下面來介紹下無鎖類。 併發
咱們已經知道,無鎖比阻塞效率要高得多。咱們來看看Java是如何實現這些無鎖類的。 dom
AtomicInteger和Integer同樣,都繼承與Number類 ide
public class AtomicInteger extends Number implements java.io.Serializable
AtomicInteger裏面有不少CAS操做,典型的有: 高併發
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }這裏來解釋一下unsafe.compareAndSwapInt方法,他的意思是,對於this這個類上的偏移量爲valueOffset的變量值若是與指望值expect相同,那麼把這個變量的值設爲update。
其實偏移量爲valueOffset的變量就是value
static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }咱們此前說過,CAS是有可能會失敗的,可是失敗的代價是很小的,因此通常的實現都是在一個無限循環體內,直到成功爲止。
public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } }
從類名就可知,Unsafe操做是非安全的操做,好比:
前面已經提到了AtomicInteger,固然還有AtomicBoolean,AtomicLong等等,都大同小異。
這裏要介紹的是AtomicReference。
AtomicReference是一種模板類
public class AtomicReference<V> implements java.io.Serializable它能夠用來封裝任意類型的數據。
好比String
package test; import java.util.concurrent.atomic.AtomicReference; public class Test { public final static AtomicReference<String> atomicString = new AtomicReference<String>("hosee"); public static void main(String[] args) { for (int i = 0; i < 10; i++) { final int num = i; new Thread() { public void run() { try { Thread.sleep(Math.abs((int)Math.random()*100)); } catch (Exception e) { e.printStackTrace(); } if (atomicString.compareAndSet("hosee", "ztk")) { System.out.println(Thread.currentThread().getId() + "Change value"); }else { System.out.println(Thread.currentThread().getId() + "Failed"); } }; }.start(); } } }結果:
10Failed 13Failed 9Change value 11Failed 12Failed 15Failed 17Failed 14Failed 16Failed 18Failed能夠看到只有一個線程可以修改值,而且後面的線程都不能再修改。
咱們會發現CAS操做仍是有一個問題的
好比以前的AtomicInteger的incrementAndGet方法
public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; } }假設當前value=1當某線程int current = get()執行後,切換到另外一個線程,這個線程將1變成了2,而後又一個線程將2又變成了1。此時再切換到最開始的那個線程,因爲value仍等於1,因此仍是能執行CAS操做,固然加法是沒有問題的,若是有些狀況,對數據的狀態敏感時,這樣的過程就不被容許了。
此時就須要AtomicStampedReference類。
其內部實現一個Pair類來封裝值和時間戳。
private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } }
這個類的主要思想是加入時間戳來標識每一次改變。
//比較設置 參數依次爲:指望值 寫入新值 指望時間戳 新時間戳 public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }當指望值等於當前值,而且指望時間戳等於如今的時間戳時,才寫入新值,而且更新新的時間戳。
場景背景是,某公司給餘額少的用戶免費充值,可是每一個用戶只能充值一次。
package test; import java.util.concurrent.atomic.AtomicStampedReference; public class Test { static AtomicStampedReference<Integer> money = new AtomicStampedReference<Integer>( 19, 0); public static void main(String[] args) { for (int i = 0; i < 3; i++) { final int timestamp = money.getStamp(); new Thread() { public void run() { while (true) { while (true) { Integer m = money.getReference(); if (m < 20) { if (money.compareAndSet(m, m + 20, timestamp, timestamp + 1)) { System.out.println("充值成功,餘額:" + money.getReference()); break; } } else { break; } } } }; }.start(); } new Thread() { public void run() { for (int i = 0; i < 100; i++) { while (true) { int timestamp = money.getStamp(); Integer m = money.getReference(); if (m > 10) { if (money.compareAndSet(m, m - 10, timestamp, timestamp + 1)) { System.out.println("消費10元,餘額:" + money.getReference()); break; } }else { break; } } try { Thread.sleep(100); } catch (Exception e) { // TODO: handle exception } } }; }.start(); } }解釋下代碼,有3個線程在給用戶充值,當用戶餘額少於20時,就給用戶充值20元。有100個線程在消費,每次消費10元。用戶初始有9元,當使用AtomicStampedReference來實現時,只會給用戶充值一次,由於每次操做使得時間戳+1。運行結果:
充值成功,餘額:39 消費10元,餘額:29 消費10元,餘額:19 消費10元,餘額:9若是使用AtomicReference<Integer>或者 Atomic Integer來實現就會形成屢次充值。
充值成功,餘額:39 消費10元,餘額:29 消費10元,餘額:19 充值成功,餘額:39 消費10元,餘額:29 消費10元,餘額:19 充值成功,餘額:39 消費10元,餘額:29
public final boolean compareAndSet(int i, int expect, int update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); }
它的內部只是封裝了一個普通的array
private final int[] array;裏面有意思的是運用了二進制數的前導零來算數組中的偏移量。
shift = 31 - Integer.numberOfLeadingZeros(scale);前導零的意思就是好比8位表示12,00001100,那麼前導零就是1前面的0的個數,就是4。
具體偏移量如何計算,這裏就再也不作介紹了。
AtomicIntegerFieldUpdater類的主要做用是讓普通變量也享受原子操做。
就好比本來有一個變量是int型,而且不少地方都應用了這個變量,可是在某個場景下,想讓int型變成AtomicInteger,可是若是直接改類型,就要改其餘地方的應用。AtomicIntegerFieldUpdater就是爲了解決這樣的問題產生的。
package test; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public class Test { public static class V{ int id; volatile int score; public int getScore() { return score; } public void setScore(int score) { this.score = score; } } public final static AtomicIntegerFieldUpdater<V> vv = AtomicIntegerFieldUpdater.newUpdater(V.class, "score"); public static AtomicInteger allscore = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { final V stu = new V(); Thread[] t = new Thread[10000]; for (int i = 0; i < 10000; i++) { t[i] = new Thread() { @Override public void run() { if(Math.random()>0.4) { vv.incrementAndGet(stu); allscore.incrementAndGet(); } } }; t[i].start(); } for (int i = 0; i < 10000; i++) { t[i].join(); } System.out.println("score="+stu.getScore()); System.out.println("allscore="+allscore); } }上述代碼將score使用 AtomicIntegerFieldUpdater變成 AtomicInteger。保證了線程安全。
這裏使用allscore來驗證,若是score和allscore數值相同,則說明是線程安全的。
小說明:
系列: