AtomicInteger源碼分析—基於CAS的樂觀鎖實現html
參考:java
http://www.importnew.com/22078.htmlgit
https://www.cnblogs.com/mantu/p/5796450.htmlgithub
http://hustpawpaw.blog.163.com/blog/static/184228324201210811243127/算法
咱們都知道,cpu是時分複用的,也就是把cpu的時間片,分配給不一樣的thread/process輪流執行,時間片與時間片之間,須要進行cpu切換,也就是會發生進程的切換。切換涉及到清空寄存器,緩存數據。而後從新加載新的thread所需數據。當一個線程被掛起時,加入到阻塞隊列,在必定的時間或條件下,在經過notify(),notifyAll()喚醒回來。在某個資源不可用的時候,就將cpu讓出,把當前等待線程切換爲阻塞狀態。等到資源(好比一個共享數據)可用了,那麼就將線程喚醒,讓他進入runnable狀態等待cpu調度。這就是典型的悲觀鎖的實現。獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的狀況,而且只有在確保其它線程不會形成干擾的狀況下執行,會致使其它全部須要鎖的線程掛起,等待持有鎖的線程釋放鎖。windows
可是,因爲在進程掛起和恢復執行過程當中存在着很大的開銷。當一個線程正在等待鎖時,它不能作任何事,因此悲觀鎖有很大的缺點。舉個例子,若是一個線程須要某個資源,可是這個資源的佔用時間很短,當線程第一次搶佔這個資源時,可能這個資源被佔用,若是此時掛起這個線程,可能馬上就發現資源可用,而後又須要花費很長的時間從新搶佔鎖,時間代價就會很是的高。緩存
因此就有了樂觀鎖的概念,他的核心思路就是,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。在上面的例子中,某個線程能夠不讓出cpu,而是一直while循環,若是失敗就重試,直到成功爲止。因此,當數據爭用不嚴重時,樂觀鎖效果更好。好比CAS就是一種樂觀鎖思想的應用。安全
CAS就是Compare and Swap的意思,比較並操做。不少的cpu直接支持CAS指令。CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。性能優化
JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操做,而且JVM把它們編譯爲底層硬件提供的最有效的方法,在運行CAS的平臺上,運行時把它們編譯爲相應的機器指令。在Java.util.concurrent.atomic包下面的全部的原子變量類型中,好比AtomicInteger,都使用了這些底層的JVM支持爲數字類型的引用類型提供一種高效的CAS操做。多線程
在CAS操做中,會出現ABA問題。就是若是V的值先由A變成B,再由B變成A,那麼仍然認爲是發生了變化,並須要從新執行算法中的步驟。有簡單的解決方案:不是更新某個引用的值,而是更新兩個值,包括一個引用和一個版本號,即便這個值由A變爲B,而後爲變爲A,版本號也是不一樣的。AtomicStampedReference和AtomicMarkableReference支持在兩個變量上執行原子的條件更新。AtomicStampedReference更新一個「對象-引用」二元組,經過在引用上加上「版本號」,從而避免ABA問題,AtomicMarkableReference將更新一個「對象引用-布爾值」的二元組。
AtomicInteger 是一個支持原子操做的 Integer 類,就是保證對AtomicInteger類型變量的增長和減小操做是原子性的,不會出現多個線程下的數據不一致問題。若是不使用 AtomicInteger,要實現一個按順序獲取的 ID,就必須在每次獲取時進行加鎖操做,以免出現併發時獲取到一樣的 ID 的現象。
接下來經過源代碼來看AtomicInteger具體是如何實現的原子操做。
首先看incrementAndGet() 方法,下面是具體的代碼。
1
2
3
4
5
6
7
8
|
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1 ;
if (compareAndSet(current, next))
return next;
}
}
|
經過源碼,能夠知道,這個方法的作法爲先獲取到當前的 value 屬性值,而後將 value 加 1,賦值給一個局部的 next 變量,然而,這兩步都是非線程安全的,可是內部有一個死循環,不斷去作compareAndSet操做,直到成功爲止,也就是修改的根本在compareAndSet方法裏面,compareAndSet()方法的代碼以下:
1
2
3
|
public final boolean compareAndSet( int expect, int update) {
return unsafe.compareAndSwapInt( this , valueOffset, expect, update);
}
|
compareAndSet()方法調用的compareAndSwapInt()方法的聲明以下,是一個native方法。
publicfinal native boolean compareAndSwapInt(Object var1, long var2, int var4, intvar5);
compareAndSet 傳入的爲執行方法時獲取到的 value 屬性值,next 爲加 1 後的值, compareAndSet所作的爲調用 Sun 的 UnSafe 的 compareAndSwapInt 方法來完成,此方法爲 native 方法,compareAndSwapInt 基於的是CPU 的 CAS指令來實現的。因此基於 CAS 的操做可認爲是無阻塞的,一個線程的失敗或掛起不會引發其它線程也失敗或掛起。而且因爲 CAS 操做是 CPU 原語,因此性能比較好。
相似的,還有decrementAndGet()方法。它和incrementAndGet()的區別是將 value 減 1,賦值給next 變量。
AtomicInteger中還有getAndIncrement() 和getAndDecrement() 方法,他們的實現原理和上面的兩個方法徹底相同,區別是返回值不一樣,前兩個方法返回的是改變以後的值,即next。而這兩個方法返回的是改變以前的值,即current。還有不少的其餘方法,就不列舉了。
下面以具體的例子分析下AtomicInteger的實現:
計數器(Counter),採用Java裏比較方便的鎖機制synchronized關鍵字,初步的代碼以下:
1 public class Counter {
2 private int value;
3
4 public synchronized int getValue() {
5 return value;
6 }
7
8 public synchronized int increment() {
9 return ++value;
10 }
11
12 public synchronized int decrement() {
13 return --value;
14 }
15 }
synchronized關鍵字是基於阻塞的鎖機制,也就是說當一個線程擁有鎖的時候,訪問同一資源的其它線程須要等待,直到該線程釋放鎖,這也就咱們前面所說的悲觀鎖。這裏會有些問題:首先,若是被阻塞的線程優先級很高很重要怎麼辦?其次,若是得到鎖的線程一直不釋放鎖怎麼辦?(這種狀況是很是糟糕的)。還有一種狀況,若是有大量的線程來競爭資源,那CPU將會花費大量的時間和資源來處理這些競爭(事實上CPU的主要工做並不是這些),同時,還有可能出現一些例如死鎖之類的狀況,最後,其實鎖機制是一種比較粗糙,粒度比較大的機制,相對於像計數器這樣的需求有點兒過於笨重,所以,對於這種需求咱們期待一種更合適、更高效的線程安全機制。
package TestAtomicInteger;
import java.util.concurrent.atomic.AtomicInteger;
class MyThread implements Runnable {
// static int i = 0;
static AtomicInteger ai= new AtomicInteger( 0 );
public void run() {
for ( int m = 0 ; m < 1000000 ; m++) {
ai.getAndIncrement();
}
}
};
public class TestAtomicInteger {
public static void main(String[] args) throws InterruptedException {
MyThread mt = new MyThread();
Thread t1 = new Thread(mt);
Thread t2 = new Thread(mt);
t1.start();
t2.start();
Thread.sleep( 500 );
System.out.println(MyThread.ai.get());
}
}
|
能夠發現結果都是2000000,也就是說AtomicInteger是線程安全的。
下面咱們就以模擬CAS機制來實現Counter的例子:
CAS類:
1 public class SimpleCAS {
2 private volatile int value;
3 public synchronized int getValue(){
4 return value;
5 }
6 public synchronized boolean comperaAndSwap(int expectedValue,int newValue){
7 int oldValue = value;
8 if(oldValue == expectedValue){
9 value = newValue;
10 return true;
11 }else{
12 return false;
13 }
14 }
15 }
CASCounter類:
1 public class CASCounter {
2 private SimpleCAS cas;
3 public int getValue(){
4 return cas.getValue();
5 }
6 public int increment(){
7 int olevalue = cas.getValue();
8 for (; ;) {
9 if(cas.comperaAndSwap(olevalue, olevalue+1)){
10 return cas.getValue();
11 }
12 }
13
14 }
15 }
上面的模擬不是CSA的真正實現,其實咱們在語言層面是沒有作任何同步的操做的,你們也能夠看到源碼沒有任何鎖加在上面,可它爲何是線程安全的呢?這就是Atomic包下這些類的奧祕:語言層面不作處理,咱們將其交給硬件—CPU和內存,利用CPU的多處理能力,實現硬件層面的阻塞,再加上volatile變量的特性便可實現基於原子操做的線程安全。因此說,CAS並非無阻塞,只是阻塞並不是在語言、線程方面,而是在硬件層面,因此無疑這樣的操做會更快更高效!
總結一下,AtomicInteger基於衝突檢測的樂觀併發策略。 能夠想象,這種樂觀在線程數目很是多的狀況下,失敗的機率會指數型增長。
AtomicInteger等對象出現的目的主要是爲了解決在多線程環境下變量計數的問題,例如經常使用的i++,i--操做,它們不是線程安全的,AtomicInteger引入後,就沒必要在進行i++和i--操做時,進行加鎖操做,在咱們平常工做中,有不少業務場景須要在多線程環境下進行變量的計數:訂單數統計、訪問量統計、累計相應時長統計等。
demo 源碼:https://github.com/mantuliu/javaAdvance
下面咱們先分析一下AtomicInteger的源代碼。經過源碼分析咱們知道,AtomicInteger的核心就是一個CAS算法(CompareAndSwap),比較並交換算法,此算法是由unsafe的底層代碼實現,它是一個原子的操做,原理就是:若是內存中的實際值與update值相同,則將實際值更新爲expect值,反之則返回失敗,由上層系統循環獲取實際值後,再次調用此CAS算法:
/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.atomic;
import sun.misc.Unsafe;
/**
* An {@code int} value that may be updated atomically. See the
* {@link java.util.concurrent.atomic} package specification for
* description of the properties of atomic variables. An
* {@code AtomicInteger} is used in applications such as atomically
* incremented counters, and cannot be used as a replacement for an
* {@link java.lang.Integer}. However, this class does extend
* {@code Number} to allow uniform access by tools and utilities that
* deal with numerically-based classes.
*
* @since 1.5
* @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;//value值的偏移地址
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;//實際的值
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;//初始化
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
/**
* Gets the current value.
*
* @return the current value
*/
public final int get() {
return value;//返回value值
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(int newValue) {
value = newValue;//設置新值,由於沒有判斷oldvalue,因此此操做是非線程安全的
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);//與set操做效果同樣,只是採用的是unsafe對象中經過偏移地址來設置值的方式
}
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int newValue) {//原子操做,設置新值,返回老值
for (;;) {
int current = get();
if (compareAndSet(current, newValue))//經過CAS算法,比較current的值和實際值是否一致,若是一致則設置爲newValue
return current;
}
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
* appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return true if successful.
*/
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {//i++操做
for (;;) {
int current = get();//獲取當前值
int next = current + 1;//當前值+1
if (compareAndSet(current, next))//比較current值和實際的值是否一致,如不一致,則繼續循環
return current;
}
}
/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
public final int getAndDecrement() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {//例如:當咱們統計接口的響應時間時,能夠利用此方法
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* Atomically decrements by one the current value.
*
* @return the updated value
*/
public final int decrementAndGet() {
for (;;) {
int current = get();
int next = current - 1;
if (compareAndSet(current, next))
return next;
}
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final int addAndGet(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value.
*/
public String toString() {
return Integer.toString(get());
}
public int intValue() {
return get();
}
public long longValue() {
return (long)get();
}
public float floatValue() {
return (float)get();
}
public double doubleValue() {
return (double)get();
}
}
下面,咱們爲四種狀況(同步關鍵字、ReentrantLock公平鎖和非公平鎖、AtomicInteger)作一下性能對比分析,當咱們看到上面的代碼分析後,咱們判斷AtomicInteger應該比加鎖的方式快,可是實驗的結果代表,AtomicInteger只比ReentrantLock加公平鎖的狀況快幾十倍,比其它兩種方式略慢一些。
四個demo都用100個線程來循環模擬下單60秒鐘:
demo Lesson8SyncIntPerform:在使用同步關鍵字加鎖的狀況下100個線程循環下單數爲:677337556
demo Lesson8SyncIntPerform:在使用同步關鍵字加鎖的狀況下100個線程循環下單數爲:755994691
demo Lesson8AtomicIntPerform:在使用AtomicInteger的狀況下100個線程循環下單數爲:562359607
demo Lesson8AtomicIntPerform:在使用AtomicInteger的狀況下100個線程循環下單數爲:575367967
demo Lesson8LockIntPerform:在使用ReentrantLock加非公平鎖的狀況下100個線程循環下單數爲:857239882
demo Lesson8LockIntPerform:在使用ReentrantLock加非公平鎖的狀況下100個線程循環下單數爲:860364303
demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平鎖的狀況下100個線程循環下單數爲:19153640
demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平鎖的狀況下100個線程循環下單數爲:19076567
上面的實驗結果代表,在jdk1.6及後續的版本中(本實驗的jdk版本是1.7,操做系統爲windows操做系統),已經對於synchronized關鍵字的性能優化了不少,已經和ReentrantLock的性能差很少,加鎖的效果比不加鎖時使用AtomicInteger性能效果還要略好一些,可是公平鎖的性能明顯下降,其它三種狀況下的性能是公平鎖性能的幾十倍以上,這和公平鎖每次試圖佔有鎖時,都必須先要進等待隊列,按照FIFO的順序去獲取鎖,所以在咱們的實驗情景下,使用公平鎖的線程進行了頻繁切換,而頻繁切換線程,性能必然會降低的厲害,這也告誡了咱們在實際的開發過程當中,在須要使用公平鎖的情景下,務必要考慮線程的切換頻率。
在運用CAS作Lock-Free操做中有一個經典的ABA問題:
線程1準備用CAS將變量的值由A替換爲B,在此以前,線程2將變量的值由A替換爲C,又由C替換爲A,而後線程1執行CAS時發現變量的值仍然爲A,因此CAS成功。但實際上這時的現場已經和最初不一樣了,儘管CAS成功,但可能存在潛藏的問題,例以下面的例子:
現有一個用單向鏈表實現的堆棧,棧頂爲A,這時線程T1已經知道A.next爲B,而後但願用CAS將棧頂替換爲B:
head.compareAndSet(A,B);
在T1執行上面這條指令以前,線程T2介入,將A、B出棧,再pushD、C、A,此時堆棧結構以下圖,而對象B此時處於遊離狀態:
此時輪到線程T1執行CAS操做,檢測發現棧頂仍爲A,因此CAS成功,棧頂變爲B,但實際上B.next爲null,因此此時的狀況變爲:
其中堆棧中只有B一個元素,C和D組成的鏈表再也不存在於堆棧中,無緣無故就把C、D丟掉了。
以上就是因爲ABA問題帶來的隱患,各類樂觀鎖的實現中一般都會用版本戳version來對記錄或對象標記,避免併發操做帶來的問題,在Java中,AtomicStampedReference<E>也實現了這個做用,它經過包裝[E,Integer]的元組來對對象標記版本戳stamp,從而避免ABA問題,例以下面的代碼分別用AtomicInteger和AtomicStampedReference來對初始值爲100的原子整型變量進行更新,AtomicInteger會成功執行CAS操做,而加上版本戳的AtomicStampedReference對於ABA問題會執行CAS失敗:
public class Test {
private static AtomicInteger atomicInt = new AtomicInteger(100);
private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);
public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(new Runnable() {
@Override
public void run() {
atomicInt.compareAndSet(100, 101);
atomicInt.compareAndSet(101, 100);
}
});
Thread intT2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println(c3); // true
}
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
Thread refT1 = new Thread(new Runnable() {
@Override
public void run()
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
}
});
Thread refT2 = new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedRef.getStamp();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println(c3); // false
}
});
refT1.start();
refT2.start();
}
}