併發-AtomicInteger源碼分析—基於CAS的樂觀鎖實現

AtomicInteger源碼分析—基於CAS的樂觀鎖實現html

 

參考:java

http://www.importnew.com/22078.htmlgit

https://www.cnblogs.com/mantu/p/5796450.htmlgithub

http://hustpawpaw.blog.163.com/blog/static/184228324201210811243127/算法

 

 

 

1. 悲觀鎖與樂觀鎖

咱們都知道,cpu是時分複用的,也就是把cpu的時間片,分配給不一樣的thread/process輪流執行,時間片與時間片之間,須要進行cpu切換,也就是會發生進程的切換。切換涉及到清空寄存器,緩存數據。而後從新加載新的thread所需數據。當一個線程被掛起時,加入到阻塞隊列,在必定的時間或條件下,在經過notify(),notifyAll()喚醒回來。在某個資源不可用的時候,就將cpu讓出,把當前等待線程切換爲阻塞狀態。等到資源(好比一個共享數據)可用了,那麼就將線程喚醒,讓他進入runnable狀態等待cpu調度。這就是典型的悲觀鎖的實現。獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的狀況,而且只有在確保其它線程不會形成干擾的狀況下執行,會致使其它全部須要鎖的線程掛起,等待持有鎖的線程釋放鎖。windows

可是,因爲在進程掛起和恢復執行過程當中存在着很大的開銷。當一個線程正在等待鎖時,它不能作任何事,因此悲觀鎖有很大的缺點。舉個例子,若是一個線程須要某個資源,可是這個資源的佔用時間很短,當線程第一次搶佔這個資源時,可能這個資源被佔用,若是此時掛起這個線程,可能馬上就發現資源可用,而後又須要花費很長的時間從新搶佔鎖,時間代價就會很是的高。緩存

因此就有了樂觀鎖的概念,他的核心思路就是,每次不加鎖而是假設沒有衝突而去完成某項操做,若是由於衝突失敗就重試,直到成功爲止。在上面的例子中,某個線程能夠不讓出cpu,而是一直while循環,若是失敗就重試,直到成功爲止。因此,當數據爭用不嚴重時,樂觀鎖效果更好。好比CAS就是一種樂觀鎖思想的應用。安全

2.   java中CAS的實現

CAS就是Compare and Swap的意思,比較並操做。不少的cpu直接支持CAS指令。CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知此次競爭中失敗,並能夠再次嘗試。CAS有3個操做數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。性能優化

JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操做,而且JVM把它們編譯爲底層硬件提供的最有效的方法,在運行CAS的平臺上,運行時把它們編譯爲相應的機器指令。在Java.util.concurrent.atomic包下面的全部的原子變量類型中,好比AtomicInteger,都使用了這些底層的JVM支持爲數字類型的引用類型提供一種高效的CAS操做。多線程

在CAS操做中,會出現ABA問題。就是若是V的值先由A變成B,再由B變成A,那麼仍然認爲是發生了變化,並須要從新執行算法中的步驟。有簡單的解決方案:不是更新某個引用的值,而是更新兩個值,包括一個引用和一個版本號,即便這個值由A變爲B,而後爲變爲A,版本號也是不一樣的。AtomicStampedReference和AtomicMarkableReference支持在兩個變量上執行原子的條件更新。AtomicStampedReference更新一個「對象-引用」二元組,經過在引用上加上「版本號」,從而避免ABA問題,AtomicMarkableReference將更新一個「對象引用-布爾值」的二元組。

3.  AtomicInteger的實現。

AtomicInteger 是一個支持原子操做的 Integer 類,就是保證對AtomicInteger類型變量的增長和減小操做是原子性的,不會出現多個線程下的數據不一致問題。若是不使用 AtomicInteger,要實現一個按順序獲取的 ID,就必須在每次獲取時進行加鎖操做,以免出現併發時獲取到一樣的 ID 的現象。

接下來經過源代碼來看AtomicInteger具體是如何實現的原子操做。

首先看incrementAndGet() 方法,下面是具體的代碼。

1
2
3
4
5
6
7
8
public final int incrementAndGet() { 
        for (;;) { 
            int current = get(); 
            int next = current + 1
            if (compareAndSet(current, next)) 
                return next; 
        
    }

經過源碼,能夠知道,這個方法的作法爲先獲取到當前的 value 屬性值,而後將 value 加 1,賦值給一個局部的 next 變量,然而,這兩步都是非線程安全的,可是內部有一個死循環,不斷去作compareAndSet操做,直到成功爲止,也就是修改的根本在compareAndSet方法裏面,compareAndSet()方法的代碼以下:

1
2
3
public final boolean compareAndSet(int expect, int update) { 
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 
    }

compareAndSet()方法調用的compareAndSwapInt()方法的聲明以下,是一個native方法。

publicfinal native boolean compareAndSwapInt(Object var1, long var2, int var4, intvar5);

compareAndSet 傳入的爲執行方法時獲取到的 value 屬性值,next 爲加 1 後的值, compareAndSet所作的爲調用 Sun 的 UnSafe 的 compareAndSwapInt 方法來完成,此方法爲 native 方法,compareAndSwapInt 基於的是CPU 的 CAS指令來實現的。因此基於 CAS 的操做可認爲是無阻塞的,一個線程的失敗或掛起不會引發其它線程也失敗或掛起。而且因爲 CAS 操做是 CPU 原語,因此性能比較好。

相似的,還有decrementAndGet()方法。它和incrementAndGet()的區別是將 value 減 1,賦值給next 變量。

AtomicInteger中還有getAndIncrement() 和getAndDecrement() 方法,他們的實現原理和上面的兩個方法徹底相同,區別是返回值不一樣,前兩個方法返回的是改變以後的值,即next。而這兩個方法返回的是改變以前的值,即current。還有不少的其餘方法,就不列舉了。

下面以具體的例子分析下AtomicInteger的實現:

  計數器(Counter),採用Java裏比較方便的鎖機制synchronized關鍵字,初步的代碼以下:

複製代碼
 1 public class Counter {
 2     private int value;  
 3       
 4     public synchronized int getValue() {  
 5         return value;  
 6     }  
 7   
 8     public synchronized int increment() {  
 9         return ++value;  
10     }  
11   
12     public synchronized int decrement() {  
13         return --value;  
14     }  
15 }
複製代碼

  synchronized關鍵字是基於阻塞的鎖機制,也就是說當一個線程擁有鎖的時候,訪問同一資源的其它線程須要等待,直到該線程釋放鎖,這也就咱們前面所說的悲觀鎖。這裏會有些問題:首先,若是被阻塞的線程優先級很高很重要怎麼辦?其次,若是得到鎖的線程一直不釋放鎖怎麼辦?(這種狀況是很是糟糕的)。還有一種狀況,若是有大量的線程來競爭資源,那CPU將會花費大量的時間和資源來處理這些競爭(事實上CPU的主要工做並不是這些),同時,還有可能出現一些例如死鎖之類的狀況,最後,其實鎖機制是一種比較粗糙,粒度比較大的機制,相對於像計數器這樣的需求有點兒過於笨重,所以,對於這種需求咱們期待一種更合適、更高效的線程安全機制。

package TestAtomicInteger;
 
import java.util.concurrent.atomic.AtomicInteger;
 
class MyThread implements Runnable {
//    static  int i = 0;
     static AtomicInteger ai=new AtomicInteger(0);
      
 
    public void run() {
        for (int m = 0; m < 1000000; m++) {
            ai.getAndIncrement();
        }
    }
};
 
public class TestAtomicInteger {
    public static void main(String[] args) throws InterruptedException {
        MyThread mt = new MyThread();
 
        Thread t1 = new Thread(mt);
        Thread t2 = new Thread(mt);
        t1.start();
        t2.start();
        Thread.sleep(500);
        System.out.println(MyThread.ai.get());
    }
}

  能夠發現結果都是2000000,也就是說AtomicInteger是線程安全的。

  下面咱們就以模擬CAS機制來實現Counter的例子:

   CAS類:

複製代碼
 1 public class SimpleCAS {
 2     private volatile int value;
 3     public synchronized int getValue(){
 4         return value;  
 5     } 
 6     public synchronized boolean comperaAndSwap(int expectedValue,int newValue){
 7         int oldValue = value;
 8         if(oldValue == expectedValue){
 9             value = newValue;
10             return true;
11         }else{
12             return false;
13         }
14     }
15 }
複製代碼

  CASCounter類:

複製代碼
 1 public class CASCounter {
 2     private SimpleCAS cas;  
 3     public int getValue(){
 4         return cas.getValue();
 5     }
 6     public int increment(){
 7         int olevalue = cas.getValue();
 8         for (; ;) {
 9             if(cas.comperaAndSwap(olevalue, olevalue+1)){
10                 return cas.getValue();
11             }
12         }
13          
14     }
15 }
複製代碼

  上面的模擬不是CSA的真正實現,其實咱們在語言層面是沒有作任何同步的操做的,你們也能夠看到源碼沒有任何鎖加在上面,可它爲何是線程安全的呢?這就是Atomic包下這些類的奧祕:語言層面不作處理,咱們將其交給硬件—CPU和內存,利用CPU的多處理能力,實現硬件層面的阻塞,再加上volatile變量的特性便可實現基於原子操做的線程安全。因此說,CAS並非無阻塞,只是阻塞並不是在語言、線程方面,而是在硬件層面,因此無疑這樣的操做會更快更高效!

  總結一下,AtomicInteger基於衝突檢測的樂觀併發策略。 能夠想象,這種樂觀在線程數目很是多的狀況下,失敗的機率會指數型增長。

 

 

 

 

 

AtomicInteger等對象出現的目的主要是爲了解決在多線程環境下變量計數的問題,例如經常使用的i++,i--操做,它們不是線程安全的,AtomicInteger引入後,就沒必要在進行i++和i--操做時,進行加鎖操做,在咱們平常工做中,有不少業務場景須要在多線程環境下進行變量的計數:訂單數統計、訪問量統計、累計相應時長統計等。

demo 源碼:https://github.com/mantuliu/javaAdvance

    下面咱們先分析一下AtomicInteger的源代碼。經過源碼分析咱們知道,AtomicInteger的核心就是一個CAS算法(CompareAndSwap),比較並交換算法,此算法是由unsafe的底層代碼實現,它是一個原子的操做,原理就是:若是內存中的實際值與update值相同,則將實際值更新爲expect值,反之則返回失敗,由上層系統循環獲取實際值後,再次調用此CAS算法:

複製代碼
/*
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent.atomic;
import sun.misc.Unsafe;

/**
 * An {@code int} value that may be updated atomically.  See the
 * {@link java.util.concurrent.atomic} package specification for
 * description of the properties of atomic variables. An
 * {@code AtomicInteger} is used in applications such as atomically
 * incremented counters, and cannot be used as a replacement for an
 * {@link java.lang.Integer}. However, this class does extend
 * {@code Number} to allow uniform access by tools and utilities that
 * deal with numerically-based classes.
 *
 * @since 1.5
 * @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;//value值的偏移地址

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;//實際的值

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;//初始化
    }

    /**
     * Creates a new AtomicInteger with initial value {@code 0}.
     */
    public AtomicInteger() {
    }

    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final int get() {
        return value;//返回value值
    }

    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(int newValue) {
        value = newValue;//設置新值,由於沒有判斷oldvalue,因此此操做是非線程安全的
    }

    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);//與set操做效果同樣,只是採用的是unsafe對象中經過偏移地址來設置值的方式
    }

    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final int getAndSet(int newValue) {//原子操做,設置新值,返回老值
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))//經過CAS算法,比較current的值和實際值是否一致,若是一致則設置爲newValue
                return current;
        }
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
     * and does not provide ordering guarantees, so is only rarely an
     * appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful.
     */
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {//i++操做
        for (;;) {
            int current = get();//獲取當前值
            int next = current + 1;//當前值+1
            if (compareAndSet(current, next))//比較current值和實際的值是否一致,如不一致,則繼續循環
                return current;
        }
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the previous value
     */
    public final int getAndDecrement() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {//例如:當咱們統計接口的響應時間時,能夠利用此方法
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return current;
        }
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the updated value
     */
    public final int decrementAndGet() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return next;
        }
    }

    /**
     * Returns the String representation of the current value.
     * @return the String representation of the current value.
     */
    public String toString() {
        return Integer.toString(get());
    }


    public int intValue() {
        return get();
    }

    public long longValue() {
        return (long)get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}
複製代碼

      下面,咱們爲四種狀況(同步關鍵字、ReentrantLock公平鎖和非公平鎖、AtomicInteger)作一下性能對比分析,當咱們看到上面的代碼分析後,咱們判斷AtomicInteger應該比加鎖的方式快,可是實驗的結果代表,AtomicInteger只比ReentrantLock加公平鎖的狀況快幾十倍,比其它兩種方式略慢一些。

     四個demo都用100個線程來循環模擬下單60秒鐘:

demo Lesson8SyncIntPerform:在使用同步關鍵字加鎖的狀況下100個線程循環下單數爲:677337556

demo Lesson8SyncIntPerform:在使用同步關鍵字加鎖的狀況下100個線程循環下單數爲:755994691

demo Lesson8AtomicIntPerform:在使用AtomicInteger的狀況下100個線程循環下單數爲:562359607

demo Lesson8AtomicIntPerform:在使用AtomicInteger的狀況下100個線程循環下單數爲:575367967

demo Lesson8LockIntPerform:在使用ReentrantLock加非公平鎖的狀況下100個線程循環下單數爲:857239882

demo Lesson8LockIntPerform:在使用ReentrantLock加非公平鎖的狀況下100個線程循環下單數爲:860364303

demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平鎖的狀況下100個線程循環下單數爲:19153640

demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平鎖的狀況下100個線程循環下單數爲:19076567

   上面的實驗結果代表,在jdk1.6及後續的版本中(本實驗的jdk版本是1.7,操做系統爲windows操做系統),已經對於synchronized關鍵字的性能優化了不少,已經和ReentrantLock的性能差很少,加鎖的效果比不加鎖時使用AtomicInteger性能效果還要略好一些,可是公平鎖的性能明顯下降,其它三種狀況下的性能是公平鎖性能的幾十倍以上,這和公平鎖每次試圖佔有鎖時,都必須先要進等待隊列,按照FIFO的順序去獲取鎖,所以在咱們的實驗情景下,使用公平鎖的線程進行了頻繁切換,而頻繁切換線程,性能必然會降低的厲害,這也告誡了咱們在實際的開發過程當中,在須要使用公平鎖的情景下,務必要考慮線程的切換頻率。

 

 

 

AtomicStampedReference解決ABA問題

在運用CAS作Lock-Free操做中有一個經典的ABA問題:

線程1準備用CAS將變量的值由A替換爲B,在此以前,線程2將變量的值由A替換爲C,又由C替換爲A,而後線程1執行CAS時發現變量的值仍然爲A,因此CAS成功。但實際上這時的現場已經和最初不一樣了,儘管CAS成功,但可能存在潛藏的問題,例以下面的例子:

AtomicStampedReference解決ABA問題 - 木瓜仙人 - 木瓜仙人

現有一個用單向鏈表實現的堆棧,棧頂爲A,這時線程T1已經知道A.next爲B,而後但願用CAS將棧頂替換爲B:

head.compareAndSet(A,B);

在T1執行上面這條指令以前,線程T2介入,將A、B出棧,再pushD、C、A,此時堆棧結構以下圖,而對象B此時處於遊離狀態:

AtomicStampedReference解決ABA問題 - 木瓜仙人 - 木瓜仙人

此時輪到線程T1執行CAS操做,檢測發現棧頂仍爲A,因此CAS成功,棧頂變爲B,但實際上B.next爲null,因此此時的狀況變爲:

AtomicStampedReference解決ABA問題 - 木瓜仙人 - 木瓜仙人

其中堆棧中只有B一個元素,C和D組成的鏈表再也不存在於堆棧中,無緣無故就把C、D丟掉了。

以上就是因爲ABA問題帶來的隱患,各類樂觀鎖的實現中一般都會用版本戳version來對記錄或對象標記,避免併發操做帶來的問題,在Java中,AtomicStampedReference<E>也實現了這個做用,它經過包裝[E,Integer]的元組來對對象標記版本戳stamp,從而避免ABA問題,例以下面的代碼分別用AtomicInteger和AtomicStampedReference來對初始值爲100的原子整型變量進行更新,AtomicInteger會成功執行CAS操做,而加上版本戳的AtomicStampedReference對於ABA問題會執行CAS失敗:

 

public class Test {

    private static AtomicInteger atomicInt = new AtomicInteger(100);

    private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);

 

    public static void main(String[] args) throws InterruptedException {

       Thread intT1 = new Thread(new Runnable() {

           @Override

           public void run() {

              atomicInt.compareAndSet(100, 101);

              atomicInt.compareAndSet(101, 100);

           }

       });

 

       Thread intT2 = new Thread(new Runnable() {

           @Override

           public void run() {

              try {

                  TimeUnit.SECONDS.sleep(1);

              } catch (InterruptedException e) {

              }

              boolean c3 = atomicInt.compareAndSet(100, 101);

              System.out.println(c3); // true

           }

       });

 

       intT1.start();

       intT2.start();

       intT1.join();

       intT2.join();

 

       Thread refT1 = new Thread(new Runnable() {

           @Override

           public void run()

              try {

                  TimeUnit.SECONDS.sleep(1);

              } catch (InterruptedException e) {

              }

              atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

              atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

           }

       });

 

       Thread refT2 = new Thread(new Runnable() {

           @Override

           public void run() {

              int stamp = atomicStampedRef.getStamp();

              try {

                  TimeUnit.SECONDS.sleep(2);

              } catch (InterruptedException e) {

              }

              boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);

              System.out.println(c3); // false

           }

       });

 

       refT1.start();

       refT2.start();

    }

}

相關文章
相關標籤/搜索