在JDK1.8的atomic中增長了一個原子操做類LongAddr,與AtomicLong相比,在效率方面有了不小的提高。java
AtomicLong的原子操做是經過CAS進行更新的,當有衝突的時候,會經過自旋的方式等待原子操做,具體:數組
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}
複製代碼
從上面的源碼能夠看出,在原子更新衝突比較少的時候,更新的效率仍是很快的,直接經過CAS更新,可是因爲是對同一個元素進行更新,當更新操做比較頻繁的時候,發生的衝突的機率就會增大,效率就會下降,如今問題就變爲:多線程
怎樣減小發生衝突的機率?app
在多線程環境下,線程的行爲是不可預測的,所以能夠在更新的元素上作文章,在AtomicLong中是經過對一個成員變量private volatile long value
進行原子的更新操做的,所以衝突也是圍繞該元素的,來看下Doug Lea大神是怎麼作的:dom
volatile long base
和一個int [] cell
的數組,數組中的元素都初始化爲0。在LongAddr中,將原子更新的操做分散到了基礎變量base和一個數組中,這樣就下降了發生衝突的機率,提高了更新的效率。ide
LongAddr繼承自Striped64,具體的操做是經過Striped64實現,Striped64的成員變量以下:post
public class LongAdder extends Striped64 implements Serializable {
//對應上面的數組
transient volatile Cell[] cells;
//基礎更新變量
transient volatile long base;
//同步對cells進行操做的
transient volatile int cellsBusy;
複製代碼
其中Cell的定義以下:this
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
複製代碼
@sun.misc.Contended註解是爲了解決僞共享的問題,具體不瞭解的能夠看這裏淺談僞共享。atom
Cell中只是包含了一個cas的原子操做行爲,主要是數組中的元素進行原子更新操做。spa
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
//若是cell不爲空,或者cell爲空可是caseBase失敗
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//1. cells 爲空
//2. cells不爲空,可是cells中對應本線程位置爲null
//3. cells中本線程對應的位置不爲空,可是對本位置操做case失敗
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
複製代碼
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 初始化Thread的種子和隨機數
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//步驟6
for (;;) {
Cell[] as; Cell a; int n; long v;
//若是cells數據不爲空,即知足條件3.2和3.3時
if ((as = cells) != null && (n = as.length) > 0) {
//若是被hash到的cells數組位置爲null(對應條件3.2)
if ((a = as[(n - 1) & h]) == null) {
//若是此時cells數組中同步鎖爲空
if (cellsBusy == 0) {
//初始化Cell元素,初試化爲x
Cell r = new Cell(x);
//設置cellsBusy鎖,避免其餘線程同時修改cells數組
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
//初始化cells數組中本線程對應的元素
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//恢復cellsBusy鎖
cellsBusy = 0;
}
//若是建立成功則退出,不然continue
if (created)
break;
/*若是created爲false,說明上面指定的cells數組的位置 cells[m%cells.length]已經有其它線程設置了cell了, 繼續從新開始循環。*/
continue;
}
}
//若是cellsBusy=1,說明有線程正在更改cells數組,
將collide設置爲false
collide = false;
}
/*若是被hash到的cells數組位置不爲null(對應3.3), 說明已經發生競爭,將wasUncontended設置爲true, 最後從新計算一個新的probe,而後從新執行循環。 */
else if (!wasUncontended)
wasUncontended = true;
/*若是當前線程第一次參與cells爭用的cas失敗, 這裏會嘗試將x值加到cells[m%cells.length]的value , 若是成功直接退出*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//若是cells數組的長度已經到了最大值(大於等於cup數量),
或者是當前cells已經作了擴容,則將collide設置爲false,
後面從新計算prob的值.*/
else if (n >= NCPU || cells != as)
collide = false;
/*若是發生了衝突collide=false,則設置其爲true; 會在最後從新計算hash值後,進入下一次for循環*/
else if (!collide)
collide = true;
/*擴容cells數組,新參與cell爭用的線程兩次均失敗, 且符合庫容條件,會執行該分支*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
//從新計算線程的hash值
h = advanceProbe(h);
}
//若是知足3.1 和 3.2 則初始化cells數組
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//若是以上操做都失敗了,則嘗試將值累加到base上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
複製代碼
上面代碼寫的比較簡單,可是邏輯比較繞,主要就是同步的在base和cells數據更新之間操做,下降發生衝突的機率,提升效率。
因爲在多線程環境下,數據的更新操做是分散到base變量和cells數組中的每一個元素中的,所以每次計算結果都要把所有的數值疊加起來
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
複製代碼
基本的實現邏輯:
@sun.misc.Contended("tlr")
int threadLocalRandomProbe; //此線程的隨機數
@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed; //此線程的隨機種子
複製代碼
//判斷該線程的隨機數是否已經初始化,若是沒有則執行localInit初始化
public static ThreadLocalRandom current() {
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
//更新線程的種子數和隨機數
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}
複製代碼
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
複製代碼
完。