【併發編程】線程安全策略

不可變對象

咱們知道線程安全的問題就是出在多個線程同時修改共享變量,不可變對象的策略徹底規避了對對象的修改,因此在多線程中使用必定是線程安全的。java

不可變對象須要知足的條件:數組

  • 對象建立之後其狀態就不能修改
  • 對象全部域都是final類型
  • 對象是正確建立的(在對象建立期間,this引用沒有逸出)

下面來複習一下final關鍵字的做用安全

修飾類:多線程

  • 不能被繼承,final類中全部成員方法都會被隱式指定爲final方法。

修飾方法:併發

  • 鎖定方法不被繼承類修改
  • 一個類的private方法會被隱式地指定爲final方法。

修飾對象:app

  • 基本數據類型變量:其數值初始化後沒法修改
  • 引用類型變量:初始化後不能指向另外的對象(裏面的值能夠修改)

除了final修飾的方法來使對象不可變,還能夠用Collections類中的unmodifiable爲前綴的方法,包括Collection、List、Set、Map等,只需把對應集合的對象傳入這個方法這個集合就不容許修改了。ide

一樣地,在Guava中也有相似的方法immutableXXX能夠達到相同的效果。高併發

下面來驗證一下測試

@Slf4j
public class ImmutableExample1 {

    private static Map<Integer, Integer> map = Maps.newHashMap();

    static {
        map.put(1, 2);
        map.put(3, 4);
        map.put(5, 6);
        map = Collections.unmodifiableMap(map);
    }

    public static void main(String[] args) {
        map.put(1, 3);
        log.info("{}", map.get(1));
    }
}
複製代碼

運行結果優化

能夠看到程序報了一個不支持操做的異常,說明當map通過Collections.unmodifiableMap方法後就不支持更新操做了。

下面咱們進入Collections.unmodifiableMap看一下它的實現

/** * Returns an unmodifiable view of the specified map. This method * allows modules to provide users with "read-only" access to internal * maps. Query operations on the returned map "read through" * to the specified map, and attempts to modify the returned * map, whether direct or via its collection views, result in an * <tt>UnsupportedOperationException</tt>.<p> * * The returned map will be serializable if the specified map * is serializable. * * @param <K> the class of the map keys * @param <V> the class of the map values * @param m the map for which an unmodifiable view is to be returned. * @return an unmodifiable view of the specified map. */
public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
    return new UnmodifiableMap<>(m);
}
複製代碼

能夠看到這個方法返回了一個新的不能被修改的map,咱們來看一下這個map的實現。

/** * @serial include */
private static class UnmodifiableMap<K,V> implements Map<K,V>, Serializable {
    private static final long serialVersionUID = -1034234728574286014L;

    private final Map<? extends K, ? extends V> m;

    UnmodifiableMap(Map<? extends K, ? extends V> m) {
        if (m==null)
            throw new NullPointerException();
        this.m = m;
    }

    public int size() {return m.size();}
    public boolean isEmpty() {return m.isEmpty();}
    public boolean containsKey(Object key) {return m.containsKey(key);}
    public boolean containsValue(Object val) {return m.containsValue(val);}
    public V get(Object key) {return m.get(key);}

    public V put(K key, V value) {
        throw new UnsupportedOperationException();
    }
    public V remove(Object key) {
        throw new UnsupportedOperationException();
    }
    public void putAll(Map<? extends K, ? extends V> m) {
        throw new UnsupportedOperationException();
    }
    public void clear() {
        throw new UnsupportedOperationException();
    }

    private transient Set<K> keySet;
    private transient Set<Map.Entry<K,V>> entrySet;
    private transient Collection<V> values;

    public Set<K> keySet() {
        if (keySet==null)
            keySet = unmodifiableSet(m.keySet());
        return keySet;
    }

    public Set<Map.Entry<K,V>> entrySet() {
        if (entrySet==null)
            entrySet = new UnmodifiableEntrySet<>(m.entrySet());
        return entrySet;
    }

    public Collection<V> values() {
        if (values==null)
            values = unmodifiableCollection(m.values());
        return values;
    }

    public boolean equals(Object o) {return o == this || m.equals(o);}
    public int hashCode() {return m.hashCode();}
    public String toString() {return m.toString();}

    // Override default methods in Map
    @Override
    @SuppressWarnings("unchecked")
    public V getOrDefault(Object k, V defaultValue) {
        // Safe cast as we don't change the value
        return ((Map<K, V>)m).getOrDefault(k, defaultValue);
    }

    @Override
    public void forEach(BiConsumer<? super K, ? super V> action) {
        m.forEach(action);
    }

    @Override
    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V putIfAbsent(K key, V value) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean remove(Object key, Object value) {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean replace(K key, V oldValue, V newValue) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V replace(K key, V value) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
        throw new UnsupportedOperationException();
    }
複製代碼

從上面的實現中能夠看到UnmodifiableMap 對於不少操做都是直接拋出不支持操做的異常。

Guava 中的immutable 方法也是相似原理。

線程封閉

線程封閉就是把對象封裝到一個線程裏,只有一個線程能夠看到這個對象,這樣就算這個對象不是線程安全也不會有線程安全問題。

實現線程封閉主要有三種方式

  • Ad-hoc線程封閉:程序控制實現,實現較複雜已棄用。
  • 堆棧封閉:能使用局部變量的地方就不使用全局變量,多線程下訪問同一個方法時,方法中的局部變量都會拷貝一份到線程的棧中,也就是說每個線程中都有隻屬於本線程的私有變量,所以局部變量不會被多個線程共享。
  • ThreadLocal線程封閉:使用map實現了線程封閉,map的key是線程id,map的值是封閉的對象。

下面主要來看ThreadLocal線程封閉方法。

ThreadLocal是爲每個線程都提供了一個線程內的局部變量,每一個線程只能訪問到屬於它的副本。

咱們來看一下ThreadLocal的源碼中的get和set方法

/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
    
    /** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
複製代碼

從源碼中能夠看出:每個線程擁有一個ThreadLocalMap,這個map存儲了該線程擁有的全部局部變量。

set時先經過Thread.currentThread()獲取當前線程,進而獲取到當前線程的ThreadLocalMap,而後以ThreadLocal本身爲key,要存儲的對象爲值,存到當前線程的ThreadLocalMap中。

get時也是先得到當前線程的ThreadLocalMap,以ThreadLocal本身爲key,取出和該線程的局部變量。

一個線程內能夠設置多個ThreadLocal,這樣該線程就擁有了多個局部變量。好比當前線程爲t1,在t1內建立了兩個ThreadLocal分別是tl1tl2,那麼t1ThreadLocalMap就有兩個鍵值對。

t1.threadLocals.set(tl1, obj1) // 等價於在t1線程中調用tl1.set(obj1)
t1.threadLocals.set(tl2, obj2) // 等價於在t1線程中調用tl2.set(obj1)

t1.threadLocals.getEntry(tl1) // 等價於在t1線程中調用tl1.get()得到obj1
t1.threadLocals.getEntry(tl2) // 等價於在t1線程中調用tl2.get()得到obj2
複製代碼

同步容器

因爲不少常見的容器都是線程不安全的,這就要求開發人員在任何訪問到這些容器的地方進行同步處理,致使使用很是不便,所以Java提供了同步容器。

常見的同步容器有如下幾種:

  • ArrayList -> Vector, Stack

  • HashMap -> HashTable(key,value不能爲null)

  • Collections.synchronizedXXX(List, Set, Map)

注意:同步容器不是絕對的線程安全。

Vector

@Slf4j
public class VectorExample1 {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    private static Vector<Integer> list = new Vector<>();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i){

        list.add(i);
    }
}
複製代碼

運行結果

在這裏Vector是線程安全的。

下面來看一個線程不安全的例子

public class VectorExample2 {

    private static Vector<Integer> vector = new Vector<>();

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++){
            vector.add(i);
        }

        Thread thread1 = new Thread() {
            public void run() {

                for (int i = 0; i < 10; i++){
                    vector.remove(i);
                }
            }
        };

        Thread thread2 = new Thread() {
            public void run() {

                for (int i = 0; i < 10; i++){
                    vector.get(i);
                }
            }
        };

        thread1.start();
        thread2.start();
    }
}
複製代碼

運行結果

能夠看到拋出了數組越界的異常。這是由於thread2 中可能會get到已經被thread1移除的下標。

HashTable

@Slf4j
public class HashTableExample {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new Hashtable<>();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i){

        map.put(i, i);
    }
}
複製代碼

運行結果

Collections.sync

將以前例子中的容器類改爲

private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
複製代碼

運行結果始終是5000,線程安全。

將容器換成SetMap也是同樣。

同步容器的錯誤用法

public class VectorExample3 {

    private static void test1(Vector<Integer> v1) {
        for (Integer i : v1) {
            if (i.equals(3)){
                v1.remove(i);
            }
        }
    }

    private static void test2(Vector<Integer> v1) {
        Iterator<Integer> iterator = v1.iterator();
        while (iterator.hasNext()) {
            Integer i = iterator.next();
            if (i.equals(3)){
                v1.remove(i);
            }
        }
    }

    private static void test3(Vector<Integer> v1) {
        for (int i = 0; i < v1.size(); i++){
            if (v1.get(i).equals(3)) {
                v1.remove(i);
            }
        }
    }

    public static void main(String[] args) {
        Vector<Integer> vector = new Vector<>();
        vector.add(1);
        vector.add(2);
        vector.add(3);
        test1(vector);
    }

}
複製代碼

這裏定義了3種對Vector遍歷後刪除指定值的方法,依次對每一個方法進行測試。

測試結果:

test1test2都拋出java.util.ConcurrentModificationException異常

test3運行正常

下面來看一下異常產生的緣由

從第一個報錯處點進去能夠看到

final void checkForComodification() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
}
複製代碼

咱們在對一個集合進行遍歷操做的同時對它進行了增刪的操做,致使了modCount != expectedModCount 從而拋出異常。

所以當咱們用for-each迭代器遍歷集合時不要對集合進行更新操做。若是須要對集合進行增刪操做,推薦的作法是在遍歷過程當中標記好要增刪的位置,遍歷結束後再進行相關的操做。

併發容器

CopyOnWriteArrayList

核心思想:

  • 讀寫分離
  • 最終一致性
  • 經過另外開闢空間解決併發衝突

相比於ArrayListCopyOnWriteArrayList是線程安全的。

當有新元素添加到CopyOnWriteArrayList時,它先從原有的數組中拷貝一份出來,而後在新數組上作寫操做,寫完後再將原有的數組指向到新的數組。CopyOnWriteArrayList的整個add操做都是在鎖的保護下進行的。

缺點:

  • 拷貝數組會消耗內存,元素多時可能會致使GC問題。
  • 不能用於實時讀的場景。拷貝數組,新增元素都須要時間,因此調用get操做後讀取到的數據可能仍是舊的,CopyOnWriteArrayList只能保證最終的一致性,不能知足實時性的要求。

CopyOnWriteArrayList的讀操做都是在原數組上讀的,不須要加鎖。

下面來coding測試一下

public class CopyOnWriteArrayListExample {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    private static List<Integer> list = new CopyOnWriteArrayList<>();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i){

        list.add(i);
    }
}
複製代碼

運行結果始終是5000,線程安全。

下面咱們進入CopyOnWriteArrayListadd方法看一下

/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
複製代碼

能夠看到整個方法是加了鎖的,添加新元素時是把整個數組複製到一個新的數組中。

CopyOnWriteArraySet

HashSet對應的線程安全類。

底層實現是基於CopyOnWriteArrayList ,所以它符合CopyOnWriteArrayList的特色和適用場景。

迭代器不支持可變的remove操做。

ConcurrentSkipListSet

TreeSet對應的線程安全類。

基於Map集合,在多線程環境下addremove等操做都是線程安全的,可是批量操做如addAllremoveAll等並不能保證以原子方式執行。緣由是它們的底層調用的仍是addremove等方法,須要手動作同步操做。

不能存儲null值。

ConcurrentHashMap

HashMap的線程安全類。

不能存儲null值。

對讀操做作了大量優化,後面會詳細介紹。

ConcurrentSkipListMap

TreeMap的線程安全類。

內部使用SkipList來實現。

key有序,相比於ConcurrentHashMap支持更高併發,存取數與線程沒有關係,也就是說在相同條件下併發線程越多ConcurrentSkipListMap優點越大。

安全共享對象策略總結

  • 1.線程限制:一個被線程限制的對象,由線程獨佔,而且只能被佔有它的線程修改。
  • 2.共享只讀:一個共享只讀的對象,在沒有額外同步的狀況下,能夠被多個線程訪問,但任何線程都不能修改它。
  • 3.線程安全對象:一個線程安全對象或者容器,在內部經過同步機制來保證線程安全,因此其餘線程無需額外的同步就能夠經過公共接口隨意訪問它。
  • 4.被守護對象:被守護對象只能經過獲取特定的鎖來訪問。

Written by Autu

2019.7.19

相關文章
相關標籤/搜索