昨天簡單的看了看Unsafe的使用,今天咱們看看JUC中的原子類是怎麼使用Unsafe的,以及分析一下其中的原理!java
一.簡單使用AtomicLong編程
還記的上一篇博客中咱們使用了volatile關鍵字修飾了一個int類型的變量,而後兩個線程,分別對這個變量進行10000次+1操做,最後結果不是20000,如今咱們改爲AtomicLong以後,你會發現結果始終都是20000了!有興趣的能夠試試,代碼以下數組
package com.example.demo.study; import java.util.concurrent.atomic.AtomicLong; public class Study0127 { //這是一個全局變量,注意,這裏使用了一個原子類AtomicLong public AtomicLong num = new AtomicLong(); //每次調用這個方法,都會對全局變量加一操做,執行10000次 public void sum() { for (int i = 0; i < 10000; i++) { //使用了原子類的incrementAndGet方法,其實就是把num++封裝成原子操做 num.incrementAndGet(); System.out.println("當前num的值爲num= "+ num); } } public static void main(String[] args) throws InterruptedException { Study0127 demo = new Study0127(); //下面就是新建兩個線程,分別調用一次sum方法 new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); } }
二.走近AtomicLong類緩存
在java中JDK 1.5以後,就出現了一個包,簡稱JUC併發包,全稱就是java.util .concurrent,其中咱們應該據說過一個類ConcurrentHashMap,這個map挺有意思的,有興趣能夠看看源碼!還有不少併發時候須要使用的類好比AtomicInteger,AtomicLong,AtomicBoolean等等,其實都差很少,此次咱們就簡單看看AtomicLong,其餘的幾個類也差很少多線程
public class AtomicLong extends Number implements java.io.Serializable { //獲取Unsafe對象,上篇博客說了咱們本身的類中不能使用這種方式的緣由,可是官方的這個類爲何能夠這樣獲取呢?由於本類AtomicLong //就是在rt.jar包下面,本類就是用Bootstrap類加載的,因此就能夠用這種方式 private static final Unsafe unsafe = Unsafe.getUnsafe(); //value這個字段的偏移量 private static final long valueOffset; //判斷jvm是否支持long類型的CAS操做 static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); private static native boolean VMSupportsCS8(); static { try { valueOffset = unsafe.objectFieldOffset (AtomicLong.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //這裏用了volatile使的多線程下可見性,必定要分清楚原子性和可見性啊 private volatile long value; //兩個構造器很少說 public AtomicLong() { } public AtomicLong(long initialValue) { value = initialValue; }
而後咱們看看AtomicLong的+1操做,能夠看到使用的仍是unsafe這個類,只須要看看getAndAddLong方法就能夠了併發
方法getAndAddLong裏面就是進行了CAS操做,能夠當作若是同時有多個線程都調用incrementAndGet方法進行+1,那麼同一時間只有一個線程會去進行操做,而其餘的會不斷的使用CAS去嘗試+1,每次嘗試的時候都會去主內存中獲取最新的值;app
public final long getAndAddLong(Object o, long offset, long delta) { long v; do {
//這個方法就是從新獲取主內存的值,由於使用了volatile修飾了那個變量,因此緩存就沒用了 v = getLongVolatile(o, offset); //這裏就是一個dowhile無限循環,多個線程不斷的調用compareAndSwapLong方法去設置值,其實就是CAS,沒什麼特別好說的吧,
//當某個線程CAS成功就跳出這個循環,不然就一直在循環不斷的嘗試,這也是CAS和線程阻塞的區別 } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; }
//這個CAS方法看不到,c實現的 public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
有興趣的能夠看看AtomicLong的其餘方法,不少都同樣,CAS是核心less
三.CAS的不足以及認識LongAdderdom
從上面的例子中,咱們能夠知道在多線程下使用AtomicLong類的時候,同一個時刻使用那個共享變量的只能是一個線程,其餘的線程都是在無限循環,這種循環也是須要消耗性能的,若是線程比較多,不少的線程都在各自的無限循環中,或者叫作多個線程都在自旋;每一個線程都在自旋無數次真的是比較坑,比較消耗性能,咱們能夠想辦法自旋必定的次數,線程就結束運行了,有興趣的能夠了解一下自旋鎖,其實就是這麼一個原理,很容易,哈哈哈!jvm
在JDK8以後,提供了一個更好的類取代AtomicLong,那就是LongAdder,上面說過同一時間只有一個線程在使用那個共享變量,其餘的線程都在自旋,那麼若是能夠把這個共享變量拆開成多個部分,那麼是否是能夠多個線程同時能夠去操做呢?而後操做完以後再綜合起來,有點分治法的思想,分而治之,最後綜合起來。
那麼咱們怎麼把那個共享變量拆成多個部分呢?
在LongAdder中是這樣處理的,把那個變量拆成一個base(這個是long類型的,初始值爲0)和一個Cell(這個裏面封裝了一個long類型的值,初始值爲0),每一個線程只會去競爭不少Cell就好了,最後把多個Cell中的值和base累加起來就是最終結果;並且一個線程若是沒有競爭到Cell以後不會傻傻的自旋,直接想辦法去競爭下一個Cell;
下圖所示
四.簡單使用LongAdder
用法其實和AtomicLong差很少,有興趣的能夠試試,最後的結果始終都是20000
package com.example.demo.study; import java.util.concurrent.atomic.LongAdder; public class Study0127 { //這裏使用LongAdder類 public LongAdder num = new LongAdder(); //每次調用這個方法,都會對全局變量加一操做,執行10000次 public void sum() { for (int i = 0; i < 10000; i++) { //LongAdder類的自增操做,至關於i++ num.increment(); System.out.println("當前num的值爲num= "+ num); } } public static void main(String[] args) throws InterruptedException { Study0127 demo = new Study0127(); //下面就是新建兩個線程,分別調用一次sum方法 new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); new Thread(new Runnable() { @Override public void run() { demo.sum(); } }).start(); } }
五.走進LongAdder
從上面能夠看到base只能是一個,而Cell可能有多個,並且Cell太多了也是很佔內存的,因此一開始的時候不會建立Cell,只有在須要時才建立,也叫作惰性加載。
咱們能夠知道LongAdder是繼承自Striped64這個類的
而Striped64類中有三個字段,cells數組用於存放多個Cell,一個是base很少說,還有一個cellsBusy用來實現自旋鎖,狀態只能是0或1(0表示Cell數組沒有被初始化和擴容,也沒有正在建立Cell元素,反之則爲1),在建立Cell,初始化Cell數組或者擴容Cell數組的時候,就會用到這個字段,保證同一時刻只有一個線程能夠進行其中之一的操做。
1.咱們簡單看看Cell的結構
從下面代碼中能夠很清楚的看到所謂的Cell就是對一個long類型變量的CAS操做
@sun.misc.Contended //這個註解的做用是爲了不僞共享,至於什麼僞共享,後面有機會再說說 static final class Cell { //每一個Cell類中就是這個聲明的變量後期要進行累加的 volatile long value; //構造函數 Cell(long x) { value = x; } //Unsafe對象 private static final sun.misc.Unsafe UNSAFE; //value的偏移量 private static final long valueOffset; //這個靜態代碼塊中就是獲取Unsafe對象和偏移量的 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } //CAS操做,沒什麼好說的 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } }
2.LongAdder類自增方法increment()
咱們能夠看到increment()方法其實就是調用了add方法,咱們須要關注add方法幹了一些什麼;
public void add(long x) { Cell[] as; long b, v; int m; Cell a; //這裏的cells是父類Striped64中的,不爲空的話就保存到as中,而後調用casBase方法,就是CAS給base更新爲base+x,也就是每次都新增x, //在這裏因爲add(1L)傳入的參數是1,也就是每次就是加一 //若是CAS成功以後就不說了,就完成操做了,若是CAS失敗,則進入到裏面去 if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; //這個if判斷條件賊長,咱們把這幾個條件分爲1,2,3,4部分,前三部分都是用於決定線程應該訪問Cell數組中哪個Cell元素,最後一個部分用於更新Cell的值 //若是第1,2,3部分都不知足,也就是說Cell數組存在並且已經找到了肯定的Cell元素,那就到第四部分,更新對應的Cell中的值(在Cell類中的cas方法已經看過了) //若是第1,2,3部分知足其中一個,那也就是說Cell數組根本就不存在或者線程找不到對應的Cell,就執行longAccumulate方法 if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //後面仔細看看這個方法,這是對Cell數組的初始化和擴容,頗有意思 longAccumulate(x, null, uncontended); } } //一個簡單的CAS操做 final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }
對於上面的,有興趣的能夠看看是怎麼找到指定的Cell的,在上面的a = as[getProbe() & m]中,其中m=數組的長度-1,其實這裏也是一個取餘的運算,而getProbe()這個方法是用於獲取當前線程的threadLocalRandomProb(當前本地線程探測值,初始值爲0),其實也就是一個隨機數啊,而後對數組的長度取餘獲得的就是對應的數組的索引,首次調用這個方法是數組的第一個元素,若是數組的第一個元素爲null,那麼就說明沒有找到對應的Cell;
對於取餘運算,舉個簡單的例子吧,我也有點忘記了,好比隨機數9要對4進行取餘,咱們能夠9&(4-1)=9&3=1001&0011=1,利用位運算取餘瞭解一下;
如今咱們重點看看longAccumulate方法,代碼比較長,單獨提取出來看看
3.longAccumulate方法
//此方法是對Cell數組的初始化和擴容,注意有個形參LongBinaryOperator,這是JDK8新增的函數式編程的接口,函數簽名爲(T,T)->T,這裏傳進來的是null final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; //初始化當前線程的threadLocalRandomProbd的值,也就是生成一個隨機數 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; //這裏表示初始化完畢了 if ((as = cells) != null && (n = as.length) > 0) { //這裏表示隨機數和數組大小取餘,獲得的結果就是當前線程要匹配到的Cell元素的索引,若是索引對應在Cell數組中的元素爲null,就新增一個Cell對象扔進去 if ((a = as[(n - 1) & h]) == null) { //cellsBusy爲0,表示當前Cell沒有進行擴容、初始化操做或者正在建立Cell等操做,那麼當前線程能夠對這個Cell數組隨心所欲 if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create //看下面的Cell數組初始化,說的很清楚,主要是設置cellsBusy爲1,而後將當前線程匹配到的Cell設置爲新建立的Cell對象 if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { //將cellsBusy重置爲0,表示此時其餘線程又能夠對Cell數組隨心所欲了 cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //Cell元素存在就執行CAS更新Cell中的值,這裏fn是形參爲null else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //當Cell數組元素個數大於CPU的個數 else if (n >= NCPU || cells != as) collide = false; // At max size or stale //是否有衝突 else if (!collide) collide = true; //擴容Cell數組,和上面兩個else if一塊兒看 //若是當前Cell數組元素沒有達到CPU個數並且有衝突就新型擴容,擴容的數量是原來的兩倍Cell[] rs = new Cell[n << 1];,爲何要和CPU個數比較呢? //由於當Cell數組元素和CPU個數相同的時候,效率是最高的,由於每個線程都是一個CPU來執行,再來修改其中其中一個Cell中的值 //這裏仍是利用cellsBusy這個字段,在下面初始化Cell數組中的用法同樣,就很少說了 else if (cellsBusy == 0 && casCellsBusy()) { try { //這裏就是新建一個數組是原來的兩倍,而後將原來數組的元素複製到新的數組,再改變原來的cells的引用指向新的數組 if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //使用完就重置爲0 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //這裏的做用是當線程找了很久,發現全部Cell個數已經和CPU個數相同了,而後匹配到的Cell正在被其餘線程使用 //因而爲了找到一個空閒的Cell,因而要從新計算hash值 h = advanceProbe(h); } //初始化Cell數組 //記得上面好像說過cellsBusy這個字段是能是0或者是1,當時0的時候,說明Cell數組沒有初始化和擴容,也沒有正在建立Cell元素, //反之則爲1,而casCellsBusy()方法就是用CAS將cellsBusy的值從0修改成1,表示當前線程正在初始化Cell數組,其餘線程就不能進行擴容操做了 //若是一個線程在初始化這個Cell數組,其餘線程在擴容的時候,看上面擴容,也會執行casCellsBusy()方法進行CAS操做,會失敗,由於指望的值是1,而不是0 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { //這裏首先新建一個容量爲2的數組,而後用隨機數h&1,也就是隨機數對數組的容量取餘的方式獲得索引,而後初始化數組中每一個Cell元素 Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //初始化完成以後要把這個字段重置爲0,表示此時其餘線程就又能夠對這個Cell進行擴容了 cellsBusy = 0; } if (init) break; }
//將base更新爲base+x,表示base會逐漸累加Cell數組中每個Cell中的值 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
其實longAccumulate方法就是表示多線程的時候對Cell數組的初始化,添加Cell元素還有擴容操做,還有就是當一個線程匹配到了Cell元素,發現其餘線程正在使用就會從新計算隨機數,而後繼續匹配其餘的Cell元素去了,沒什麼特別難的吧!別看這個方法很長,就是作這幾個操做
六.總結
這一篇核心就是CAS,咱們簡單的說了一下原子操做類AtomicLong的自增,可是當線程不少的狀況下,使用CAS有很大的缺點,就是同一時間是會有一個線程在執行,其餘全部線程都在自旋,自旋會消耗性能,因而可使用JDK提供的一個LongAdder類代替,這個類的做用就是將AtomicLong中的值優化爲了一個base和一個Cell數組,多線程去競爭的時候,假設線程個數個CPU個數相同,那麼此時每個線程都有單獨的一個CPU去運行,而後單獨的匹配到Cell數組中的某個元素,若是沒有匹配到那麼會對這個Cell數組進行初始化操做;若是匹配到的Cell數組中的元素正在使用,那麼久判斷是否能夠新建一個Cell丟數組裏面去,若是數組已經滿了,並且數組數量小於CPU個數,那麼久進行擴容;擴容結束後,仍是匹配到的Cell數組中的位置正在使用,那麼就是衝突,就會從新計算,經過一個新的隨機數和數組的取餘,獲得一個新的索引,再去訪問該對應的Cell數組的位置。。。。
仔細看看仍是挺有意思的啊!