併發包學習(一)-Atomic包小記

此篇是J.U.C學習的第一篇Atomic包相關的內容,但願此篇總結能對本身的基礎有所提高。本文總結來源自《Java併發編程的藝術》第七章並配以本身的實踐理解。若有錯誤還請指正。java

1、案例分析

首先看兩段代碼:算法

代碼①:編程

/**
 * @author laoyeye
 * @Description: 5000個線程,200個併發
 * @date 2018/8/16 21:58
 */
public class IntTest {
    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(count);
    }

    private static void add() {
        count++;
    }

5000個線程200個併發的狀況下,對一個共享變量進行++操做。數組

結果:4997安全

代碼②:多線程

public class AtomicIntegerTest {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(count);
    }

    private static void add() {
        count.incrementAndGet();
    }
}

5000個線程200個併發的狀況下,一樣進行每次加一操做。併發

結果:5000。和預期的結果同樣app

那麼爲何AtomicInteger能夠獲得預期的結果,而使用基本數據類型Int的值卻不對呢?函數

主要是原子性的問題,Int的操做,在多線程的狀況下並不保證原子性,而AtomicInteger則是一個JDK提供的一個原子操做類,具體AtomicInteger怎麼實現的原子性能夠看下文。高併發

2、Atomic相關概念

java從JDK1.5開始提供java.util.concurrent.atomic包,即本文所述的Atomic包。這個包的原子操做類提供了一個簡單,高效,線程安全地更新一個變量的方式。

由於變量的類型不少,Atomic包基本上分爲四種類型的更新方式,分別是原子更新基本類型,原子更新數組,原子更新引用和原子更新屬性(字段)。Atomic包的類基本上都是使用Unsafe實現的包裝類。 Unsafe 類提供了硬件級別的原子操做,能夠安全的直接操做內存變量,其在 JUC 源碼中被普遍的使用。

3、原子更新基本類型

一、AtomicBoolen:原子更新布爾類型。

二、AtomicInteger:原子更新整型。

三、AtomicLong:原子更新整型。

AtomicInteger詳解

一樣以一種的代碼②爲例,爲何AtomicInteger的incrementAndGet()方法保證了原子性的操做呢,咱們來看一下源碼的實現:

源碼①:

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

首先咱們經過unsafe調用了它的objectFieldOffset(Field field)方法,這個方法返回指定的變量在所屬類的內存偏移地址,偏移地址僅僅在該Unsafe函數中訪問指定字段時使用。

源碼②:

unsafe.getAndAddInt(this, valueOffset, 1) + 1;
   public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

getIntVolatile獲取對象obj中偏移量offset的變量對應的volative內存語義的值,即預期的值var5。

compareAndSwapInt方法中,var1爲須要改變的對象,var2爲偏移量(即以前求出來的valueOffset的值),var5爲expect的值,第四個爲update後的值。

當value的值與expect這個值相等,那麼則將value修改成update這個值,並返回true,不然返回false。

此操做極爲常說的CAS原子操做,這裏使用while循環是考慮到多個線程同時調用的狀況CAS失敗後須要自旋重試。

AtomicBoolen詳解

代碼③

public class AtomicBooleanTest {

        // 請求總數
        public static int clientTotal = 5000;

        // 同時併發執行的線程數
        public static int threadTotal = 200;

        public static AtomicBoolean isHappened = new AtomicBoolean(false);

        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        test();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(isHappened.get());
        }

        private static void test() {
            if (isHappened.compareAndSet(false, true)) {
                System.out.println("execute");
            }
        }
    }

執行結果:

execute
true

經過結果可知System.out.println("execute");的代碼只執行過一次,200的併發,爲何只執行了一次呢,咱們再來看下源碼的解決辦法。

源碼③

    public final boolean compareAndSet(boolean expect, boolean update) {
        int e = expect ? 1 : 0;
        int u = update ? 1 : 0;
        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }

咱們看到當調用compareAndSet方法時,先把Boolean型轉換爲整型,在使用compareAndSwapInt進行CAS。因此即便在200併發的狀況下,AtomicBoolen依舊可以保持原子性。

經過上面兩個類的講解咱們看到都是使用的compareAndSwapInt的方法,unsafe類還提供了compareAndSwapLong,用於AtomicLong,以及compareAndSwapObject方法。而像char,float,double等數據類型沒有對應的原子操做類,這時候咱們能夠參考AtomicBoolen的思路作相似處理。

 4、原子更新數組

一、AtomicIntegerArray:原子更新整型數組裏的元素

二、AtomicLongArray:原子更新長整型數組裏的元素

三、AtomicReferenceArray:原子更新引用類型數組裏的元素

這裏咱們只介紹下AtomicIntegerArray,基本操做相似。

代碼④

public class AtomicIntegerArrayTest {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    static int[] value = new int[]{1,2};

    public static AtomicIntegerArray ai = new AtomicIntegerArray(value);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(ai.get(0));
        System.out.println(value[0]);
    }

    private static void test() {
        ai.getAndSet(0,3);
    }
}

結果:3,1

爲何是3和1呢,一樣的咱們從源碼中找答案。

源碼④:

    public final int getAndSet(int i, int newValue) {
        return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
    }
    public final int getAndSetInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var4));

        return var5;
    }

一樣的原理,當前位置的數組value的值和預期的值相等,而後將對應的元素更新爲新的值。可是須要注意的是,AtomicIntegerArray會將當前數組複製一份,因此當AtomicIntegerArray對內部的數組元素進行修改後,不會影響到原先的數組。

5、原子更新引用類型

一、AtomicReference:原子更新引用類型

二、AtomicStampedReference:更新帶有版本號的引用類型,可解決CAS的ABA問題

三、AtomicMarkableReference:原子更新帶有標記位的引用類型

 原子更新基本類型每次只能更新一個變量,若是要原子更新更多變量,這時候就須要引用類型了。

代碼⑤

public class AtomicReferenceTest {
    public static void main(String[] args) {

        User user1 = new User("張三",12);
        User user2 = new User("lisi",20);

        AtomicReference<User> ar = new AtomicReference<User>();
        ar.set(user1);
        ar.compareAndSet(user1, user2);

        System.out.println("user " + ar.get().getName());
    }


static class User {
    private String name;
    private int old;

    public String getName() {
        return name;
    }

    public int getOld() {
        return old;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setOld(int old) {
        this.old = old;
    }

    public User(String name, int old) {
        this.name = name;
        this.old = old;
    }
}
}

結果:user lisi

能夠看到結果已經原子更新爲lisi了,年齡也同步更新。

代碼⑥

public class AtomicMarkableReferenceTest {
    public static void main(String[] args) {


        User user1 = new User("張三",12);
        User user2 = new User("lisi",20);

        AtomicStampedReference ar = new AtomicStampedReference(user1,0);

        final  Integer stamp = ar.getStamp();

        ar.compareAndSet(user1, user2,stamp,stamp+10);

        System.out.println("user " + ((User)ar.getReference()).getName());
        System.out.println("user " + ar.getStamp());

        System.out.println( ar.compareAndSet(user1, user2, stamp,stamp+10));
    }


static class User {
    private String name;
    private int old;

    public String getName() {
        return name;
    }

    public int getOld() {
        return old;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setOld(int old) {
        this.old = old;
    }

    public User(String name, int old) {
        this.name = name;
        this.old = old;
    }
}
}

結果:

user lisi
user 10
false

能夠看到咱們在作了原子更新後,版本號也作了改變,這時候若是還用原來的版本號去更新,就會出現更新失敗的狀況。

AtomicMarkableReference跟AtomicStampedReference相似 
AtomicStampedReference是使用pair的int stamp做爲計數器使用,AtomicMarkableReference的pair使用的是boolean mark。 
就像一杯水,AtomicStampedReference可能關心的是動過幾回,AtomicMarkableReference關心的是有沒有被人動過,方法都比較簡單,不在演示了。

6、原子更新字段類

一、AtomicIntegerFieldUpdater:更新整型字段

二、AtomicLongFieldUpdater:更新長整型字段

三、AtomicReferenceFieldUpdater:原子更新引用類型裏的字段

public class AtomicIntegerFieldUpdaterTest {

    private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterTest.class, "count");

    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicIntegerFieldUpdaterTest ai = new AtomicIntegerFieldUpdaterTest();

        if (updater.compareAndSet(ai, 100, 120)) {
            System.out.println("方法1,"+ai.getCount());
        }

        if (updater.compareAndSet(ai, 100, 120)) {
            System.out.println("方法2,"+ai.getCount());
        } else {
            System.out.println("方法3,"+ai.getCount());
        }
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
}

結果:

方法1,120
方法3,120

 原子更新字段類須要兩部,①必須使用靜態方法newupdate()建立一個更新器,而且設置想要更新的類和屬性。第二步,更新類的字段屬性必須使用public volatile修飾

 7、1.8新增的LongAdder相關類

這個類是1.8新增的一個類,爲何在已經有AtomicLong的狀況下,仍是增長了這個類呢?

這主要是因爲AtomicLong CAS算法的缺陷形成的,衆所周知,CAS是比較當前值與預期的值是否相等,相等則更新爲新的值,不然從新自旋取值。這就形成了CAS在高併發狀況性下大量失敗,性能較低的狀況。

既然AtomicLong性能問題是因爲過多線程同時去競爭同一個變量的更新而下降的,那麼若是把一個變量分解爲多個變量,讓一樣多的線程去競爭多個資源,那麼性能問題不就迎刃而解了嗎?

沒錯,所以,JDK8 提供的LongAdder就是這個思路。這個類我目前只在網上了解到原理,還未應用也不瞭解源碼實現,等之後再更新吧。

下文來自簡書:https://www.jianshu.com/p/22d38d5c8c2a

總結分析下LongAdder減小衝突的方法以及在求和場景下比AtomicLong更高效的緣由

  • 首先和AtomicLong同樣,都會先採用cas方式更新值
  • 在初次cas方式失敗的狀況下(一般證實多個線程同時想更新這個值),嘗試將這個值分隔成多個cell(sum的時候求和就好),讓這些競爭的線程只管更新本身所屬的cell(由於在rehash以前,每一個線程中存儲的hashcode不會變,因此每次都應該會找到同一個cell),這樣就將競爭壓力分散了

AtomicLong能否能夠被LongAdder替代

有了傳說中更高效的LongAdder,那AtomicLong能否不使用了呢?固然不是!

答案就在LongAdder的java doc中,從咱們翻譯的那段能夠看出,LongAdder適合的場景是統計求和計數的場景,並且LongAdder基本只提供了add方法,而AtomicLong還具備cas方法(要使用cas,在不直接使用unsafe以外只能藉助AtomicXXX了),,例如getAndIncrement、getAndDecrement等,使用起來很是的靈活,而LongAdder只有add和sum,使用起來比較受限。

相關文章
相關標籤/搜索