JUC解析-LongAdder

在JDK1.8的atomic中增長了一個原子操做類LongAddr,與AtomicLong相比,在效率方面有了不小的提高。java

實現原理

AtomicLong的原子操做是經過CAS進行更新的,當有衝突的時候,會經過自旋的方式等待原子操做,具體:數組

public final long updateAndGet(LongUnaryOperator updateFunction) {
    long prev, next;
    do {
      prev = get();
      next = updateFunction.applyAsLong(prev);
    } while (!compareAndSet(prev, next));
    return next;
  }
複製代碼

從上面的源碼能夠看出,在原子更新衝突比較少的時候,更新的效率仍是很快的,直接經過CAS更新,可是因爲是對同一個元素進行更新,當更新操做比較頻繁的時候,發生的衝突的機率就會增大,效率就會下降,如今問題就變爲:多線程

怎樣減小發生衝突的機率?app

在多線程環境下,線程的行爲是不可預測的,所以能夠在更新的元素上作文章,在AtomicLong中是經過對一個成員變量private volatile long value進行原子的更新操做的,所以衝突也是圍繞該元素的,來看下Doug Lea大神是怎麼作的:dom

  1. 初始化一個volatile long base和一個int [] cell的數組,數組中的元素都初始化爲0。
  2. 當線程進行原子操做的時候,優先對base變量CAS操做,若是沒有發生衝突,則更新成功,若是發生衝突,則放棄自旋的方案,進行步驟3。
  3. 對線程經過hashcode映射到cell數組中的一個元素中,而後將原子的操做更新到該數組元素中。
  4. 最後經過疊加base和cell中的每一個元素,就能夠獲取到原子操做的數值。

在LongAddr中,將原子更新的操做分散到了基礎變量base和一個數組中,這樣就下降了發生衝突的機率,提高了更新的效率。ide

具體實現

LongAddr繼承自Striped64,具體的操做是經過Striped64實現,Striped64的成員變量以下:post

public class LongAdder extends Striped64 implements Serializable {
  //對應上面的數組
  transient volatile Cell[] cells;
  //基礎更新變量
  transient volatile long base;
  //同步對cells進行操做的
  transient volatile int cellsBusy;
複製代碼

其中Cell的定義以下:this

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
      return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
      try {
          UNSAFE = sun.misc.Unsafe.getUnsafe();
          Class<?> ak = Cell.class;
          valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
      } catch (Exception e) {
          throw new Error(e);
      }
    }
  }
複製代碼

@sun.misc.Contended註解是爲了解決僞共享的問題,具體不瞭解的能夠看這裏淺談僞共享atom

Cell中只是包含了一個cas的原子操做行爲,主要是數組中的元素進行原子更新操做。spa

更新操做

public void add(long x) {
    Cell[] as;
    long b, v;
    int m;
    Cell a;
    //若是cell不爲空,或者cell爲空可是caseBase失敗
    if ((as = cells) != null || !casBase(b = base, b + x)) {
      boolean uncontended = true;
      //1. cells 爲空
      //2. cells不爲空,可是cells中對應本線程位置爲null
      //3. cells中本線程對應的位置不爲空,可是對本位置操做case失敗
      if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
        longAccumulate(x, null, uncontended);
      }
  }
複製代碼
  1. 若是cells數組爲空,則直接對基礎變量base執行cas操做casBase,若是casBase執行失敗,則說明有衝突,執行步驟3。
  2. 若是cells數組不爲空,則此時爲了下降衝突的機率,優先對cells數組進行更新操做,直接執行步驟3。
  3. 執行子邏輯判斷: 3.1. 若是cells爲空,直接執行步驟4。 3.2. 若是cells不爲空,可是本線程對應的cells數組中的元素爲null,則進行步驟4。 3.3. 若是cells不爲空,而且本線程對應的數組中的元素不爲null,則對該元素進行cas操做,若是執行成功返回,執行失敗則進行步驟4。
  4. 執行更新子邏輯,具體參照源碼註解
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
  int h;
  // 初始化Thread的種子和隨機數
  if ((h = getProbe()) == 0) {
      ThreadLocalRandom.current(); // force initialization
      h = getProbe();
      wasUncontended = true;
  }
  boolean collide = false;    // True if last slot nonempty
  //步驟6
  for (;;) {
    Cell[] as; Cell a; int n; long v;
    //若是cells數據不爲空,即知足條件3.2和3.3時
    if ((as = cells) != null && (n = as.length) > 0) {
      //若是被hash到的cells數組位置爲null(對應條件3.2)
      if ((a = as[(n - 1) & h]) == null) {
        //若是此時cells數組中同步鎖爲空
        if (cellsBusy == 0) {
          //初始化Cell元素,初試化爲x
          Cell r = new Cell(x);
          //設置cellsBusy鎖,避免其餘線程同時修改cells數組
          if (cellsBusy == 0 && casCellsBusy()) {
            boolean created = false;
            try {
              //初始化cells數組中本線程對應的元素
              Cell[] rs; int m, j;
              if ((rs = cells) != null &&
                (m = rs.length) > 0 &&
                rs[j = (m - 1) & h] == null) {
                  rs[j] = r;
                  created = true;
              }
            } finally {
              //恢復cellsBusy鎖
              cellsBusy = 0;
            }
            //若是建立成功則退出,不然continue
            if (created)
              break;
            /*若是created爲false,說明上面指定的cells數組的位置 cells[m%cells.length]已經有其它線程設置了cell了, 繼續從新開始循環。*/
            continue; 
          }
        }
        //若是cellsBusy=1,說明有線程正在更改cells數組,
        將collide設置爲false
        collide = false;
      }
      /*若是被hash到的cells數組位置不爲null(對應3.3), 說明已經發生競爭,將wasUncontended設置爲true, 最後從新計算一個新的probe,而後從新執行循環。 */
      else if (!wasUncontended)       
        wasUncontended = true;        
      /*若是當前線程第一次參與cells爭用的cas失敗, 這裏會嘗試將x值加到cells[m%cells.length]的value , 若是成功直接退出*/
      else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                  fn.applyAsLong(v, x))))
        break;
      //若是cells數組的長度已經到了最大值(大於等於cup數量),
      或者是當前cells已經作了擴容,則將collide設置爲false,
      後面從新計算prob的值.*/
      else if (n >= NCPU || cells != as)
        collide = false; 
      /*若是發生了衝突collide=false,則設置其爲true; 會在最後從新計算hash值後,進入下一次for循環*/
      else if (!collide)
        collide = true;
      /*擴容cells數組,新參與cell爭用的線程兩次均失敗, 且符合庫容條件,會執行該分支*/
      else if (cellsBusy == 0 && casCellsBusy()) {
        try {
            if (cells == as) {      
              Cell[] rs = new Cell[n << 1];
              for (int i = 0; i < n; ++i)
                rs[i] = as[i];
              cells = rs;
            }
        } finally {
          cellsBusy = 0;
        }
        collide = false;
        continue;                   
      }
      //從新計算線程的hash值
      h = advanceProbe(h);
    } 
    //若是知足3.1 和 3.2 則初始化cells數組
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
      boolean init = false;
      try {                           
        if (cells == as) {
          Cell[] rs = new Cell[2];
          rs[h & 1] = new Cell(x);
          cells = rs;
          init = true;
        }
      } finally {
        cellsBusy = 0;
      }
      if (init)
        break;
    }
    //若是以上操做都失敗了,則嘗試將值累加到base上
    else if (casBase(v = base, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
      break;                  
  }
}

複製代碼

上面代碼寫的比較簡單,可是邏輯比較繞,主要就是同步的在base和cells數據更新之間操做,下降發生衝突的機率,提升效率。

獲取結果

因爲在多線程環境下,數據的更新操做是分散到base變量和cells數組中的每一個元素中的,所以每次計算結果都要把所有的數值疊加起來

public long sum() {
  Cell[] as = cells; Cell a;
  long sum = base;
  if (as != null) {
    for (int i = 0; i < as.length; ++i) {
      if ((a = as[i]) != null)
        sum += a.value;
    }
  }
  return sum;
}
複製代碼

線程hash數值的計算

基本的實現邏輯:

  1. 在Thread線程類中保存有兩個變量:
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;  //此線程的隨機數

@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed; //此線程的隨機種子
複製代碼
  1. ThreadLocalRandom類會更新每一個線程類種子,而且根據種子計算出該線程的隨機數,這樣線程之間就不會存在隨機數的同步成本,提升效率。
//判斷該線程的隨機數是否已經初始化,若是沒有則執行localInit初始化
  public static ThreadLocalRandom current() {
    if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
      localInit();
    return instance;
  }
  //更新線程的種子數和隨機數
  static final void localInit() {
    int p = probeGenerator.addAndGet(PROBE_INCREMENT);
    int probe = (p == 0) ? 1 : p; // skip 0
    long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
    Thread t = Thread.currentThread();
    UNSAFE.putLong(t, SEED, seed);
    UNSAFE.putInt(t, PROBE, probe);
  }
複製代碼
  1. 若是發生衝突,能夠對Thread進行rehash,具體以下:
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
  }
複製代碼

完。

相關文章
相關標籤/搜索