Android併發編程 原子類與併發容器

在Android開發的漫漫長途上的一點感想和記錄,若是能給各位看官帶來一絲啓發或者幫助,那真是極好的。java


前言

上一篇博文中,主要說了些線程以及鎖的東西,咱們大多數的併發開發需求,基本上能夠用synchronized或者volatile解決,雖然synchronized已經被JDK優化了,但有的時候咱們仍是以爲synchronized過重了,node

好比說一個電影院賣票,這個票數是必定的並且共享的,我想盡快的賣票而且知道還有多少餘票。在程序員看來這就是個票數自減以及獲取最新票數的操做。程序員

private static Long sCount = 10000L;
final Object obj = new Object();
//這裏開了1000個線程對sCount併發操做
for (int i = 0; i < 1000; i++) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            synchronized (obj) {
                //這裏加鎖保證同步,使用synchronized總以爲沒有
                //必要,畢竟就是自減操做,若是不使用synchronized又有什麼辦法呢?
                sCount--;
            }

        }
    }).start();
}


Thread.sleep(5000);
System.out.println(sCount);

再有,咱們日常使用的容器類List以及Map,如ArrayList、HashMap這些容器是非線程安全的,那咱們若是須要支持併發的容器,咱們該怎麼辦呢??讀者莫急,這正是本篇分享的內容。算法

原子類

咱們先來解決第一個問題,JDK1.5以後爲咱們提供了一系列的原子操做類,位於java.util.concurrent.atomic包下。編程

原子操做基本類型類

  1. AtomicBoolean:原子更新布爾類型。
  2. AtomicInteger:原子更新整型。
  3. AtomicLong:原子更新長整型。

以上3個類提供的方法幾乎如出一轍,因此本篇僅以AtomicInteger爲例進行講解,
AtomicInteger的經常使用方法以下。數組

  • int addAndGet(int delta):以原子方式將輸入的數值與實例中的值(AtomicInteger裏的

value)相加,並返回結果。安全

  • boolean compareAndSet(int expect,int update):若是當前值(調用該函數的值)等於預期值(expect),則以原子方式將當前值(調用該函數的值)設置爲更新的值(update)。
  • int getAndIncrement():以原子方式將當前值加1,返回舊值。
  • int incrementAndGet()以原子方式將當前值加1,返回新值。
  • int getAndSet(int newValue):以原子方式設置爲newValue的值,並返回舊值。

那按照上面的知識從新對上面的賣票問題編程以下數據結構

private static AtomicLong sAtomicLong = new AtomicLong(10000L);

//這裏開了1000個線程對sCount併發操做
for (int i = 0; i < 1000; i++) {
    new Thread(new Runnable() {
        @Override
        public void run() {
          sAtomicLong.decrementAndGet();
        }
    }).start();
}


Thread.sleep(5000);
System.out.println(sAtomicLong);

上面的是原子更新基本類型,那對於對象呢,JDK也提供了原子更新對象引用的原子類多線程

原子更新引用類型

  • AtomicReference:原子更新引用類型。
  • AtomicReferenceFieldUpdater:原子更新引用類型裏的字段。
  • AtomicMarkableReference:原子更新帶有標記位的引用類型。能夠原子更新一個布爾類

型的標記位和引用類型。構造方法是AtomicMarkableReference(V initialRef,boolean
initialMark)。併發

以上幾個類提供的方法幾乎同樣,因此本節僅以AtomicReference爲例進行講解

boolean compareAndSet(V expect, V update):若是當前對象(調用該函數的對象)等於預期對象(expect),則以原子方式將當前對象(調用該函數的對象)設置爲更新的對象(update)。

V get():獲取找對象

void set(V newValue):設置對象

V getAndSet(V newValue):以原子方式將當前對象(調用該函數的對象)設置爲指定的對象(newValue),並返回原來的對象(設置以前)

那這個東西用在哪裏呢,我在著名的Rxjava源碼中看到了原子更新對象的用法。

CachedThreadScheduler.java

//原子引用AtomicReference
AtomicReference<CachedWorkerPool> pool;
static final CachedWorkerPool NONE;
static {
    NONE = new CachedWorkerPool(0, null);
    NONE.shutdown();
}

public CachedThreadScheduler() {
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}

@Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
    //調用AtomicReference的compareAndSet方法
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

在建立線程調度器的時候把初始的工做線程池更新爲新的工做線程池

AtomicReferenceFieldUpdater以原子方式更新一個對象的屬性值

AtomicMarkableReference是帶有標記的原子更新引用的類,能夠有效解決ABA問題,什麼是ABA問題,
咱們就以上面的代碼爲例

假設pool.compareAndSet調用以前,pool內的對象NONE被更新成了update,而後又更新成了NONE,那麼在調用pool.compareAndSet的時候仍是會把pool內的對象更新爲update,也就是說AtomicReference不關心對象的中間歷程,這對於一些以當前對象是否被更改過爲判斷條件的特殊情境,AtomicReference就不適用了。

因此JDK提供了AtomicMarkableReference

那除了上面的原子更新引用類型以外,JDK還爲咱們提供了原子更新數組

原子更新數組

經過原子的方式更新數組裏的某個元素,Atomic包提供瞭如下4個類。

  • AtomicIntegerArray:原子更新整型數組裏的元素。
  • AtomicLongArray:原子更新長整型數組裏的元素。
  • AtomicReferenceArray:原子更新引用類型數組裏的元素。
  • AtomicIntegerArray類主要是提供原子的方式更新數組裏的整型,其經常使用方法以下。
  • int addAndGet(int i,int delta):以原子方式將輸入值與數組中索引i的元素相加。
  • boolean compareAndSet(int i,int expect,int update):若是當前值等於預期值,則以原子

方式將數組位置i的元素設置成update值。

以上幾個類提供的方法幾乎同樣,因此本節僅以AtomicIntegerArray爲例進行講解

public class AtomicIntegerArrayTest {
    static int[] value = new int[]{2, 3};
    static AtomicIntegerArray ai = new AtomicIntegerArray(value);

    public static void main(String[] args) {
           ai.getAndSet(0, 4);
        System.out.println(ai.get(0));
        System.out.println(value[0]);
    }
}

如下是輸出的結果。
4
2

更快的原子操做基本類LongAdder DouleAdder

JDK1.8爲咱們提供了更快的原子操做基本類LongAdder DouleAdder,

LongAdder的doc部分說明以下

This class is usually preferable to {@link AtomicLong} when
multiple threads update a common sum that is used for purposes such
as collecting statistics, not for fine-grained synchronization
control. Under low update contention, the two classes have similar
characteristics. But under high contention, expected throughput of
this class is significantly higher, at the expense of higher space
consumption

上面那段話翻譯過來就是

當咱們的場景是爲了統計計數,而不是爲了更細粒度的同步控制時,而且是在多線程更新的場景時,LongAdder類比AtomicLong更好用。 在小併發的環境下,論更新的效率,二者都差很少。可是高併發的場景下,LongAdder有着明顯更高的吞吐量,可是有着更高的空間複雜度。

從LongAdder的doc文檔上咱們就能夠知道LongAdder更適用於統計求和場景,而不是細粒度的同步控制。

併發容器

咱們在開發中遇到比較簡單的併發操做像自增自減,求和之類的問題,上一節原子類已經能比較好的解決了,但對於本篇文章來講只是開胃小菜,下面正菜來嘍

ConcurrentLinkedQueue(併發的隊列)

ConcurrentLinkedQueue是一個基於鏈表的無界線程安全隊列,它採用先進先出的規則對節點進行排序,咱們添加一個元素的時候,它會添加到隊列的尾部;當咱們獲取一個元素時,它會返回隊列頭部的元素。

咱們先來看一下ConcurrentLinkedQueue的類圖

ConcurrentLinkedQueue由head節點和tail節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是經過這個next關聯起來,從而組成一張鏈表結構的隊列。默認狀況下head節點存儲的元素爲空,tail節點等於head節點

如下源碼來自JDK1.8

public ConcurrentLinkedQueue() {
    //默認狀況下head節點存儲的元素爲空,tail節點等於head節點,哨兵節點
    head = tail = new Node<E>(null);
}

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

  
    Node(E item) {
        //設置item值
        //這種的設置方式相似於C++的指針,直接操做內存地址,
        //例如此行代碼,就是以CAS的方式把值(item)賦值給當前對象即Node地址偏移itemOffset後的地址
        //下面出現的casItem以及casNext也是同理
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }


    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

看完了初始化,咱們來看一下這個線程安全隊列的進隊和出隊方法

offer(E e) 以及 poll() 方法

offer(E e)

public boolean offer(E e) {  
        checkNotNull(e);// 檢查,爲空直接異常  
        // 建立新節點,並將e 做爲節點的item  
        final Node<E> newNode = new Node<E>(e);  
        // 這裏操做比較多,將尾節點tail 賦給變量 t,p  
        for (Node<E> t = tail, p = t;;) {  
            // 並獲取q 也就是 tail 的下一個節點  
            Node<E> q = p.next;  
            // 若是下一個節點是null,說明tail 是處於尾節點上  
            if (q == null) {  
                // 而後用cas 將下一個節點設置成爲新節點  
                // 這裏用cas 操做,若是多線程的狀況,總會有一個先執行成功,失敗的線程繼續執行循環。  
               
                // <1>   
                if (p.casNext(null, newNode)) {  
                    // 若是p.casNext有個線程成功了,p=newNode   
                    // 比較 t (tail) 是否是 最後一個節點  
                    if (p != t)   
                        // 若是不等,就利用cas將,尾節點移到最後  
                        // 若是失敗了,那麼說明有其餘線程已經把tail移動過,也是OK的  
                        casTail(t, newNode);    
                    return true;  
                }  
                // 若是<1>失敗了,說明確定有個線程成功了,  
                // 這時候失敗的線程,又會執行for 循環,再次設值,直到成功。  
            }  
            else if (p == q)   
                // 有可能恰好插入一個,而後P 就被刪除了,那麼 p==q  
                // 這時候在頭結點須要重新定位。  
                p = (t != (t = tail)) ? t : head;  
            else  
                // 這裏是爲了當P不是尾節點的時候,將P 移到尾節點,方便下一次插入  
                // 也就是一直保持向前推動  
                p = (p != t && t != (t = tail)) ? t : q;  
        }  
    }  


 private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

從上述代碼可知入隊列過程可概括爲3步

  1. 定位出尾節點;
  2. 使用CAS算法將入隊節點設置成尾節點的next節點,如不成功則重試
  3. 更新尾節點
1.定位尾節點

tail節點並不老是尾節點,因此每次入隊都必須先經過tail節點來找到尾節點。
尾節點多是tail節點,也多是tail節點的next節點。

2.設置入隊節點爲尾節點

p.casNext(null, newNode)方法用於將入隊節點設置爲當前隊列尾節點的next節點,若是p是null,
表示p是當前隊列的尾節點,若是不爲null,表示有其餘線程更新了尾節點,則須要從新獲取當前隊列的尾節點

3.更新尾節點
casTail(t, newNode);

將尾節點移到最後(即把tail指向新節點)
若是失敗了,那麼說明有其餘線程已經把tail移動過,此時新節點newNode爲尾節點,tail爲其前驅結點

poll()

public E poll() {  
        // 設置起始點  
        restartFromHead:  
        for (;;) {  
            for (Node<E> h = head, p = h, q;;) {  
                E item = p.item;  
                // 利用cas 將第一個節點設置爲null  
                if (item != null && p.casItem(item, null)) {  
                    // 和上面相似,p的next被刪了,  
                    // 而後而後判斷一下,目的爲了保證head的next不爲空  
                    if (p != h) // hop two nodes at a time  
                        updateHead(h, ((q = p.next) != null) ? q : p);  
                    return item;  
                }  
                else if ((q = p.next) == null) {  
                    // 有可能已經被另外線程先刪除了下一個節點  
                    // 那麼須要先設定head 的位置,並返回null  
                    updateHead(h, p);  
                    return null;  
                }  
                else if (p == q)  
                    
                    continue restartFromHead;  
                else  
                    // 和offer 相似,保證下一個節點有值,才能刪除  
                    p = q;  
            }  
        }  
    }

ConcurrentHashMap(併發的HashMap)

JDK1.7與JDK1.8 ConcurrentHashMap的實現仍是有不小的區別的

JDK1.7

在JDK1.7版本中,ConcurrentHashMap的數據結構是由一個Segment數組和多個HashEntry組成。

Segment數組的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分離技術,而每個Segment元素存儲的是HashEntry數組+鏈表,這個和HashMap的數據存儲結構同樣.

JDK1.8

1.8中放棄了Segment臃腫的設計,取而代之的是採用Node + CAS + Synchronized來保證併發安全進行實現,結構以下:

put實現

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            //① 只有在執行第一次put方法時纔會調用initTable()初始化Node數組
            tab = initTable();
        //② 若是相應位置的Node還未初始化,則經過CAS插入相應的數據;
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //③ 若是相應位置的Node不爲空,且當前該節點處於移動狀態 幫助轉移數據
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //④ 若是相應位置的Node不爲空,且當前該節點不處於移動狀態,則對該節點加synchronized鎖,
        
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //⑤ 若是該節點的hash不小於0,則遍歷鏈表更新節點或插入新節點;
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //⑥ 若是該節點是TreeBin類型的節點,說明是紅黑樹結構,則經過putTreeVal方法往紅黑樹中插入節點;
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            /**
            *若是binCount不爲0,說明put操做對數據產生了影響,若是當前鏈表的個數達到8個,
            *經過treeifyBin方法轉化爲紅黑樹,
            *若是oldVal不爲空,說明是一次更新操做,沒有對元素個數產生影響,則直接返回舊值;
            */
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    
    addCount(1L, binCount);
    return null;
}

CopyOnWriteArrayList(線程安全的ArrayList)

JDK1.8中關於CopyOnWriteArrayList的官方介紹以下

A thread-safe variant of {@link java.util.ArrayList} in which all mutative
operations ({@code add}, {@code set}, and so on)
are implemented bymaking a fresh copy of the underlying array.

中文翻譯大體是

CopyOnWriteArrayList是一個線程安全的java.util.ArrayList的變體,
add,set等改變CopyOnWriteArrayList的操做是經過製做當前數據的副本實現的

其實意思很簡單,假設有一個數組以下所示

併發讀取

多個線程併發讀取是沒有任何問題的

更新數組

咱們來看add 源碼

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

有了前面的積澱,這段代碼能夠說沒有任何難度

  1. 獲取重入鎖(線程互斥鎖)
  2. 創一個新的數組(在原有數據長度的基礎上加1)並把原數組的數據拷貝到新數組
  3. 把新數組的引用設置爲老數組

注 寫入過程當中,如有其餘線程讀取數據,那麼讀取的依然是老數組的數據

使用場景

由上面的結構以及源碼分析就知道CopyOnWriteArrayList用在讀多寫少的多線程環境中。


本篇總結

本篇分享了一些原子操做類以及併發容器,這些在多線程開發中都頗有做用。但願幫到你。


下篇預告

Android 併發工具類與線程池


參考博文


此致,敬禮

相關文章
相關標籤/搜索