AtomicInteger的CAS算法淺析 自旋鎖淺析

  以前淺析過自旋鎖(自旋鎖淺析),咱們知道它的實現原理就是CAS算法。CAS(Compare and Swap)即比較並交換,做爲著名的無鎖算法,它也是樂觀鎖的實現方式之一。JDK併發包裏也有許多代碼中有CAS的身影閃爍其中,鑑於CAS算法在併發領域的重要性和普適性,仍是再結合AtomicInteger這個原子類來淺析一下吧。淺析以前,先借用以前自旋鎖測試代碼直接看AtomicInteger的自增測試結果,能夠拿它跟自旋鎖作個比較:html

    @Test
    public void testAtomicInteger()
    {
        // 10個線程使用AtomicInteger自增
        AtomicInteger ai = new AtomicInteger();
        for (int i = 0; i < 10; i++)
        {
            new Thread(new Runnable()
            {
                @Override
                public void run()
                {
                    // 自增1萬次
                    for (int j = 0; j < 10000; j++)
                    {
                        count = ai.incrementAndGet();
                    }

                    // 一個線程執行完了就減1,10個線程執行完了就變成0,執行主線程
                    latch.countDown();
                }
            }).start();
        }

        // 主線程等待
        try
        {
            latch.await();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

        TestCase.assertEquals(count, 100000);
    }

  運行結果:java

count值:100000, 耗時:10毫秒.

  是否是比自旋鎖要簡單?必須的,由於AtomicInteger自己已經實現了CAS算法,人家自然就用於併發自增的。以前也說到過,CAS的原理很簡單,它包含三個值:當前內存值(V)、預期原來的值(A)以及期待更新的值(B)。若是內存位置V的值與預期原值A相匹配,那麼處理器會自動將該位置值更新爲新值B,返回true。不然處理器不作任何操做,返回false。舉上面的例子,咱們有10個線程分別去作自增操做,很明顯count是共享變量,它將被這10個線程追殺加1。假如線程1將count追加到100時,正準備更新到101這一刻,線程2插一腳搶先一步把count追加到101,那麼線程1該怎麼辦呢?它將獲取最新的count值再去自增。具體怎麼實現的,咱們接下來看。爲了直觀點,咱們能夠換種方式實現上面的例子:算法

package com.wulinfeng.test.testpilling;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

    // 共享變量
    private static int count;

    // 10個線程就先初始化10
    private static CountDownLatch latch = new CountDownLatch(10);

    public static void main(String[] args) {
        // 10個線程使用AtomicInteger自增
        AtomicInteger ai = new AtomicInteger();
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 自增1萬次
                    for (int j = 0; j < 10000; j++) {
                        count = ai.incrementAndGet();
                        System.out.println("線程" + threadNum + ": " + count);
                    }

                    // 一個線程執行完了就減1,10個線程執行完了就變成0,執行主線程
                    latch.countDown();
                }
            }).start();
        }

        // 主線程等待
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  截取運行結果後面幾行:多線程

線程1: 99993
線程1: 99994
線程1: 99995
線程1: 99996
線程1: 99997
線程1: 99998
線程1: 99999
線程1: 100000

  這後面的日誌打印的都是線程1在追加,把控制檯日誌往上拉就能夠看到其餘線程也一直在追加的。那麼這10個線程如何在CAS的咒法護身下沒有互相沖突的呢?看AtomicInteger的源碼便知:併發

package java.util.concurrent.atomic;
import java.util.function.IntUnaryOperator;
import java.util.function.IntBinaryOperator;
import sun.misc.Unsafe;

/**
 * An {@code int} value that may be updated atomically.  See the
 * {@link java.util.concurrent.atomic} package specification for
 * description of the properties of atomic variables. An
 * {@code AtomicInteger} is used in applications such as atomically
 * incremented counters, and cannot be used as a replacement for an
 * {@link java.lang.Integer}. However, this class does extend
 * {@code Number} to allow uniform access by tools and utilities that
 * deal with numerically-based classes.
 *
 * @since 1.5
 * @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    /**
     * Creates a new AtomicInteger with initial value {@code 0}.
     */
    public AtomicInteger() {
    }

    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final int get() {
        return value;
    }

    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(int newValue) {
        value = newValue;
    }

    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p><a href="package-summary.html#weakCompareAndSet">May fail
     * spuriously and does not provide ordering guarantees</a>, so is
     * only rarely an appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful
     */
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the previous value
     */
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    /**
     * Atomically decrements by one the current value.
     *
     * @return the updated value
     */
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function, returning the previous value. The
     * function should be side-effect-free, since it may be re-applied
     * when attempted updates fail due to contention among threads.
     *
     * @param updateFunction a side-effect-free function
     * @return the previous value
     * @since 1.8
     */
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function, returning the updated value. The
     * function should be side-effect-free, since it may be re-applied
     * when attempted updates fail due to contention among threads.
     *
     * @param updateFunction a side-effect-free function
     * @return the updated value
     * @since 1.8
     */
    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function to the current and given values,
     * returning the previous value. The function should be
     * side-effect-free, since it may be re-applied when attempted
     * updates fail due to contention among threads.  The function
     * is applied with the current value as its first argument,
     * and the given update as the second argument.
     *
     * @param x the update value
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @return the previous value
     * @since 1.8
     */
    public final int getAndAccumulate(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    /**
     * Atomically updates the current value with the results of
     * applying the given function to the current and given values,
     * returning the updated value. The function should be
     * side-effect-free, since it may be re-applied when attempted
     * updates fail due to contention among threads.  The function
     * is applied with the current value as its first argument,
     * and the given update as the second argument.
     *
     * @param x the update value
     * @param accumulatorFunction a side-effect-free function of two arguments
     * @return the updated value
     * @since 1.8
     */
    public final int accumulateAndGet(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

    /**
     * Returns the String representation of the current value.
     * @return the String representation of the current value
     */
    public String toString() {
        return Integer.toString(get());
    }

    /**
     * Returns the value of this {@code AtomicInteger} as an {@code int}.
     */
    public int intValue() {
        return get();
    }

    /**
     * Returns the value of this {@code AtomicInteger} as a {@code long}
     * after a widening primitive conversion.
     * @jls 5.1.2 Widening Primitive Conversions
     */
    public long longValue() {
        return (long)get();
    }

    /**
     * Returns the value of this {@code AtomicInteger} as a {@code float}
     * after a widening primitive conversion.
     * @jls 5.1.2 Widening Primitive Conversions
     */
    public float floatValue() {
        return (float)get();
    }

    /**
     * Returns the value of this {@code AtomicInteger} as a {@code double}
     * after a widening primitive conversion.
     * @jls 5.1.2 Widening Primitive Conversions
     */
    public double doubleValue() {
        return (double)get();
    }

}

  標黃了3個成員變量和兩個方法。方法比較簡單,一個是返回自增前的值,一個是返回自增後的值。app

  第一個成員變量是unsafe,這個必不可少,沒了它CAS就是浮雲。CAS算法用到了Unsafe對象的compareAndSwapInt方法,而它是一個本地方法,因此實現源碼到此爲止,無法再跟進去了。其實CAS算法的精華也就在於此,因此很遺憾。但至少咱們知道Unsafe總共有3個CAS方法:ide

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

  第二個成員變量是valueOffset,它是共享變量value在AtomicInteger對象上的內存偏移量。它做爲compareAndSwapInt的第二個參數,用於修改共享變量value的值。post

  最後就是value了,上面已經介紹了,它就是例子中的count,是共享變量,也就是多線程併發中被追殺的共享資源。它使用volatile修飾,解決了可見性和有序性問題,再由unsafe的CAS保證了原子性,3大問題都解決了,多線程併發問題也就解決了。測試

  回過頭再看那標黃的兩個方法,實現都是unsafe的getAndAddInt,點進去瞧瞧,發現它不是本地方法:this

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

  代碼看起來是否是有點眼熟?沒錯,仍是自旋鎖的套路,只不過這裏用到的是Unsafe的CAS算法,而咱們的自旋鎖用到的是多套了一層馬甲的AtmoicXXX的CAS算法,因此說到底,咱們用的仍是Unsafe的CAS。經過循環,先獲取當前值var5(怎麼獲取當前值的?getIntVolatile就不用看了,仍是本地方法),再計算更新值var5+var4,而後經過compareAndSwapInt方法設置value變量。若是compareAndSwapInt方法返回失敗,表示value變量的值被別的線程更改了,因此須要循環獲取value變量的最新值,再次經過compareAndSwapInt方法設置value變量,直至設置成功,跳出循環,返回更新前的值。

  從上面看到,CAS底層實現依賴於Unsafe包,咱們只要明白CAS的原理便可:預期值與當前值一致,那麼執行更新,不然死循環嘗試更新,直到成功。

相關文章
相關標籤/搜索