LongAdder解析

 對LongAdder的最初瞭解是從Coolshell上的一篇文章中得到的,可是一直都沒有深刻的瞭解過其實現,只知道它相較於AtomicLong來講,更加適合寫多讀少的併發情景。今天,咱們就研究一下LongAdder的原理,探究一下它如此高效的緣由。算法

基本原理和思想

 Java有不少併發控制機制,好比說以AQS爲基礎的鎖或者以CAS爲原理的自旋鎖。不瞭解AQS的朋友能夠閱讀我以前的AQS源碼解析文章。通常來講,CAS適合輕量級的併發操做,也就是併發量並很少,並且等待時間不長的狀況,不然就應該使用普通鎖,進入阻塞狀態,避免CPU空轉。shell

 因此,若是你有一個Long類型的值會被多線程修改,那麼使用CAS進行併發控制比較好,可是若是你是須要鎖住一些資源,而後進行數據庫操做,那麼仍是使用阻塞鎖比較好。數據庫

 第一種狀況下,咱們通常都使用AtomicLongAtomicLong是經過無限循環不停的採起CAS的方法去設置內部的value,直到成功爲止。那麼當併發數比較多或出現更新熱點時,就會致使CAS的失敗機率變高,重試次數更多,越多的線程重試,CAS失敗的機率越高,造成惡性循環,從而下降了效率。編程

 而LongAdder的原理就是下降對value更新的併發數,也就是將對單一value的變動壓力分散到多個value值上,下降單個value的「熱度」。數組

 咱們知道LongAdder的大體原理以後,再來詳細的瞭解一下它的具體實現,其中也有不少值得借鑑的併發編程的技巧。bash

LongAdder的成員變量

LongAdderStriped64的子類,其有三個比較重要的成員函數,在以後的函數分析中須要使用到,這裏先說明一下。多線程

// CPU的數量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Cell對象的數組,長度通常是2的指數
transient volatile Cell[] cells;
// 基礎value值,當併發較低時,只累加該值
transient volatile long base;
// 建立或者擴容Cells數組時使用的自旋鎖變量
transient volatile int cellsBusy;

複製代碼

cellsLongAdder的父類Striped64中的Cell數組類型的成員變量。每一個Cell對象中都包含一個value值,並提供對這個value值的CAS操做。併發

static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
}
複製代碼

Add操做

 咱們首先來看一下LongAdderadd函數,其會屢次嘗試CAS操做將值進行累加,若是成功了就直接返回,失敗則繼續執行。代碼比較複雜,並且涉及的狀況比較多,咱們就以梳理歷次嘗試CAS操做爲主線,講清楚這些CAS操做的前提條件和場景。app

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 當cells數組爲null時,會進行第一次cas操做嘗試。
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null || 
            !(uncontended = a.cas(v = a.value, v + x)))
            // 當cells數組不爲null,而且經過getProbe() & m
            // 定位的Cell對象不爲null時進行第二次CAS操做。
            // 若是執行不成功,則進入longAccumulate函數。
            longAccumulate(x, null, uncontended); 
    }
}
複製代碼

 當併發量較少時,cell數組還沒有初始化,因此只調用casBase函數,對base變量進行CAS累加。dom

第一個CAS操做

 咱們來看一下casBase函數相關的源碼吧。咱們能夠認爲變量base就是第一個value值,也是基礎value變量。先調用casBase函數來cas一下base變量,若是成功了,就不須要在進行下面比較複雜的算法,

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
複製代碼

 當併發量逐漸提升時,casBase函數會失敗。若是cells數組爲null或爲空,就直接調用longAccumulate方法。由於cells爲null或在爲空,說明cells未初始化,因此調用longAccumulate進行初始化。不然繼續判斷。  若是cells中已經初始化,就繼續進行後續判斷。咱們先來理解一下getProbe() & m的這個操做吧,能夠把這個操做看成一次計算"hash"值,而後將cells中這個位置的Cell對象賦值給變量a。若是變量a不爲null,那麼就調用該對象的cas方法去設置其value值。若是a爲null,或在cas賦值發生衝突,那麼調用longAccumulate方法。

第二個CAS操做

LongAccumulate方法

longAccumulate函數比較複雜,帶有個人註釋的代碼已經貼在了文章後邊,這裏咱們就只講一下其中比較關鍵的一些技巧和思想。

 首先,咱們都知道只有當對base的cas操做失敗以後,LongAdder才引入Cell數組.因此在longAccumulate中就是對Cell數組進行操做,分別涉及了數組的初始化,擴容和設置某個位置的Cell對象等操做。

 在這段代碼中,關於cellBusy的cas操做構成了一個SpinLock,這就是經典的SpinLock的編程技巧,你們能夠學習一下。

 咱們先來看一下longAccumulate的主體代碼,首先是一個無限for循環,而後根據cells數組的狀態來判斷是要進行cells數組的初始化,仍是進行對象添加或者擴容。

final void longAccumulate(long x, LongBinaryOperator fn,
                             boolean wasUncontended) {
       int h;
       if ((h = getProbe()) == 0) { 
           //獲取PROBE變量,探針變量,與當前運行的線程相關,不一樣線程不一樣
           ThreadLocalRandom.current(); 
       //初始化PROBE變量,和getProbe都使用Unsafe類提供的原子性操做。
           h = getProbe();
           wasUncontended = true;
       }
       boolean collide = false;
       for (;;) { //cas經典無限循環,不斷嘗試
           Cell[] as; Cell a; int n; long v;
           if ((as = cells) != null && (n = as.length) > 0) { 
           // cells不爲null,而且數組size大於0,表示cells已經初始化了
           // 初始化Cell對象並設置到數組中或者進行數組擴容
           }
           else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
           //cells數組未初始化,得到cellsBusy lock,進行cells數組的初始化
           // cells數組初始化操做
           }
          //若是初始化數組失敗了,那就再次嘗試一下直接cas base變量,
          // 若是成功了就直接返回,這是最後一個進行CAS操做的地方。
           else if (casBase(v = base, ((fn == null) ? v + x :
                                       fn.applyAsLong(v, x))))
               break;
       }
   }
複製代碼

 進行Cell數組代碼以下所示,它首先調用casCellsBusy函數獲取了cellsBusy‘鎖’,而後進行數組的初始化操做,最後將cellBusy'鎖'釋放掉。

// 注意在進入這段代碼以前已經casCellsBusy得到cellsBusy這個鎖變量了。
boolean init = false;
try {
    if (cells == as) {
        Cell[] rs = new Cell[2];
        rs[h & 1] = new Cell(x); //設置x的值爲cell對象的value值
        cells = rs;
        init = true;
    }
} finally {
    cellsBusy = 0;
}
if (init)
    break;
複製代碼

第三個CAS操做

 若是Cell數組已經初始化過了,那麼就進行Cell數組的設置或者擴容。這部分代碼有一系列的if else的判斷,若是前一個條件不成立,纔會進入下一條判斷。

 首先,當Cell數組中對應位置的cell對象爲null時,代表該位置的Cell對象須要進行初始化,因此使用casCellsBusy函數獲取'鎖',而後初始化Cell對象,而且設置進cells數組,最後釋放掉'鎖'。

 當Cell數組中對應位置的cell對象不爲null,則直接調用其cas操做進行累加。

 當上述操做都失敗後,認爲多個線程在對同一個位置的Cell對象進行操做,這個Cell對象是一個「熱點」,因此Cell數組須要進行擴容,將熱點分散。

if ((a = as[(n - 1) & h]) == null) { //經過與操做計算出來須要操做的Cell對象的座標
    if (cellsBusy == 0) { //volatile 變量,用來實現spinLock,來在初始化和resize cells數組時使用。
    //當cellsBusy爲0時,表示當前能夠對cells數組進行操做。 
        Cell r = new Cell(x);//將x值直接賦值給Cell對象
        if (cellsBusy == 0 && casCellsBusy()) {//若是這個時候cellsBusy仍是0
        //就cas將其設置爲非0,若是成功了就是得到了spinLock的鎖.能夠對cells數組進行操做.
        //若是失敗了,就會再次執行一次循環
            boolean created = false;
            try {
                Cell[] rs; int m, j;
                //判斷cells是否已經初始化,而且要操做的位置上沒有cell對象.
                if ((rs = cells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                    rs[j] = r;&emsp;//將以前建立的值爲x的cell對象賦值到cells數組的響應位置.
                    created = true;
                }
            } finally {
                //經典的spinLock編程技巧,先得到鎖,而後try finally將鎖釋放掉
                //將cellBusy設置爲0就是釋放鎖.
                cellsBusy = 0;
            }
            if (created)
                break;&emsp;//若是建立成功了,就是使用x建立了新的cell對象,也就是新建立了一個分擔熱點的value
            continue; 
        }
    }
    collide = false; //未發生碰撞
}
else if (!wasUncontended)//是否已經發生過一次cas操做失敗
    wasUncontended = true; //設置成true,以便第二次進入下一個else if 判斷
else if (a.cas(v = a.value, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
    &emsp;//fn是操做類型,若是是空,就是相加,因此讓a這個cell對象中的value值和x相加,而後在cas設置,若是成果
    //就直接返回
    break;
else if (n >= NCPU || cells != as)
&emsp;&emsp;//若是cells數組的大小大於系統的可得到處理器數量或在as再也不和cells相等.
    collide = false;
else if (!collide)
    collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
&emsp;&emsp;//再次得到cellsBusy這個spinLock,對數組進行resize
    try {
        if (cells == as) {//要再次檢測as是否等於cells以避免其餘線程已經對cells進行了操做.
            Cell[] rs = new Cell[n << 1]; //擴容一倍
            for (int i = 0; i < n; ++i)
                rs[i] = as[i];
            cells = rs;//賦予cells一個新的數組對象
        }
    } finally {
        cellsBusy = 0;
    }
    collide = false;
    continue;
}
h = advanceProbe(h);//因爲使用當前探針變量沒法操做成功,因此從新設置一個,再次嘗試
複製代碼

第四個CAS操做

後記

 本篇文章寫的不是很好,我寫完以後又看了一遍coolshell上的關於LongAdder的文章,感受本身沒有人家寫的那麼簡潔明瞭。我對代碼細節的註釋和投入太多了。其實不少代碼你們均可以看懂,並不須要大量的代碼片斷加註釋。之後要注意一下。以後會接着研究一下JUC包中的其餘類,但願你們多多關注。

相關文章
相關標籤/搜索