併發包學習(二)-容器學習記錄

可能有些同窗知道ArrayList,HashSet,,HashMap這些容器都是線程不安全的,若是多個線程併發的訪問這些容器就會致使線程不安全問題,不少時候須要咱們手動對這些容器進行同步處理,形成咱們很大的不便,所以java爲咱們提供了同步容器和併發容器來解決這個問題。java

1、同步容器

首先詳細介紹前,須要強調下同步容器是線程安全的類,可是也可能形成線程不安全的問題,緣由在後面有解釋。數組

同步容器的原理很簡單,就是在原容器的基礎上加了synchronize的鎖,來保證同一時間只有一個線程來訪問。安全

同步容器總的能夠分爲兩類:併發

  • java提供好的線程的類
  1. ArrayList >>Vector,Stack
  2. HashMap>>HashTable
  • Collections.synchronizedXXX提供的靜態工廠方法建立的類
  1. Collections.synchronizedCollection(Collection<T>t)
  2. Collections.synchronizedList(List<T>list)
  3. Collections.synchronizedMap(Map<K, V>map)
  4. Collections.synchronizedSet(Set<T> t)

Vector案例一(線程安全)性能

@Slf4j
@ThreadSafe
public class VectorExample1 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static List<Integer> list = new Vector<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

Vector案例二(線程不安全)ui

@NotThreadSafe
public class VectorExample2 {

    private static Vector<Integer> vector = new Vector<>();

    public static void main(String[] args) {

        while (true) {

            for (int i = 0; i < 10; i++) {
                vector.add(i);
            }
        
            Thread thread1 = new Thread() {
                public void run() {
                    for (int i = 0; i < vector.size(); i++) {
                        vector.remove(i);
                    }
                }
            };

            Thread thread2 = new Thread() {
                public void run() {
                    for (int i = 0; i < vector.size(); i++) {
                        vector.get(i);
                    }
                }
            };
            thread1.start();
            thread2.start();
        }
    }
}

我在上面的代碼標題上已經提早說明這是個線程不安全的類了,爲何同步容器的Vector也多是線程不安全的呢。你們能夠實際運行下上面的類,應該會報數組越界的錯誤。spa

這裏我解釋下,Vector雖然能保證同一個時刻只有一個線程在訪問它,以上面的代碼爲例,當咱們的線程2運行到get(i)的時候,線程1恰好把這個數據移除,這個時候就會出現問題。因此同步容器由於操做順序的緣由,可能會產生線程不安全的問題。線程

Vector案例二code

public class VectorExample3 {

    // java.util.ConcurrentModificationException
    private static void test1(Vector<Integer> v1) { // foreach
        for(Integer i : v1) {
            if (i.equals(3)) {
                v1.remove(i);
            }
        }
    }

    // java.util.ConcurrentModificationException
    private static void test2(Vector<Integer> v1) { // iterator
        Iterator<Integer> iterator = v1.iterator();
        while (iterator.hasNext()) {
            Integer i = iterator.next();
            if (i.equals(3)) {
                v1.remove(i);
            }
        }
    }

    // success
    private static void test3(Vector<Integer> v1) { // for
        for (int i = 0; i < v1.size(); i++) {
            if (v1.get(i).equals(3)) {
                v1.remove(i);
            }
        }
    }

    public static void main(String[] args) {

        Vector<Integer> vector = new Vector<>();
        vector.add(1);
        vector.add(2);
        vector.add(3);
        test1(vector);
    }
}

結果:前兩種test方法均會拋出溢常,第三種正常,你們在用foreach和iterator的時候不要對容器的數據進行移除操做,由於這兩種方法會對容器的大小和預期的值進行校驗。同理ArrayList等也會產生這樣的問題的。這個東西對於我實在是印象深入,由於不知道這個問題,鬧出來不少毛病。blog

Collections案例一

@Slf4j
@ThreadSafe
public class CollectionsExample1 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

Collections案例二

@Slf4j
@ThreadSafe
public class CollectionsExample2 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

Collections案例三

@Slf4j
@ThreadSafe
public class CollectionsExample3 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i) {
        map.put(i, i);
    }
}

由上面三個案例能夠看出Collections.synchronizedXXX生成的三個同步容器類獲得的值和預期的結果是相同的,因此是安全的。

總結:同步容器保證了同一時刻只有一個線程在訪問,可是由於操做的緣由,仍是會產生線程不安全的問題,這個時候咱們可使用synchronize或者Lock來對相關代碼塊進行加鎖操做,可是這種狀況下又致使性能比較低下,又有什麼好的解決辦法呢。答案就在下面要介紹的併發容器了,實際項目中,同步容器已經不多使用,更多的仍是被併發容器所取代了。

 2、併發容器

ArrayList >>CopyOnWriteArrayList

CopyOnWriteArrayList 有幾個缺點:
一、因爲寫操做的時候,須要拷貝數組,會消耗內存,若是原數組的內容比較多的狀況下,可能致使young gc或者full gc
二、不能用於實時讀的場景,像拷貝數組、新增元素都須要時間,因此調用一個set操做後,讀取到數據可能仍是舊的,雖然CopyOnWriteArrayList 能作到最終一致性,可是仍是無法知足實時性要求;
CopyOnWriteArrayList 合適讀多寫少的場景,不過這類慎用
由於誰也無法保證CopyOnWriteArrayList 到底要放置多少數據,萬一數據稍微有點多,每次add/set都要從新複製數組,這個代價實在過高昂了。在高性能的互聯網應用中,這種操做分分鐘引發故障。

CopyOnWriteArrayList透露的思想
如上面的分析CopyOnWriteArrayList表達的一些思想:
一、讀寫分離,讀和寫分開
二、最終一致性
三、使用另外開闢空間的思路,來解決併發衝突

@Slf4j
@ThreadSafe
public class CopyOnWriteArrayListExample {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static List<Integer> list = new CopyOnWriteArrayList<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", list.size());
    }

    private static void update(int i) {
        list.add(i);
    }
}

經過結果可知是線程安全的。

HashSet、TreeSet>>CopyOnWriteArraySet、ConcurrentSkipListSet

CopyOnWriteArraySet的底層的實現是CopyOnWriteArrayList,所以它的特色和CopyOnWriteArrayList相似

  • 它最適合於具備如下特徵的應用程序:set 大小一般保持很小,只讀操做遠多於可變操做,須要在遍歷期間防止線程間的衝突。
  • 它是線程安全的, 底層的實現是CopyOnWriteArrayList;
  • 由於一般須要複製整個基礎數組,因此可變操做(add、set 和 remove 等等)的開銷很大。
  • 迭代器不支持可變 remove 操做。
  • 使用迭代器進行遍歷的速度很快,而且不會與其餘線程發生衝突。在構造迭代器時,迭代器依賴於不變的數組快照。
@Slf4j
@ThreadSafe
public class CopyOnWriteArraySetExample {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static Set<Integer> set = new CopyOnWriteArraySet<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

ConcurrentSkipListSet是JDK6新增的類,ConcurrentSkipListSet基於map集合,須要注意在此類的批量操做的方法不保證原子性,可是保證底層每次調用的原子性。因此在批量操做時須要另外完成同步操做。

@Slf4j
@ThreadSafe
public class ConcurrentSkipListSetExample {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static Set<Integer> set = new ConcurrentSkipListSet<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", set.size());
    }

    private static void update(int i) {
        set.add(i);
    }
}

HashMap、TreeMap>>ConcurrentHashMap、ConcurrentSkipListMap

ConcurrentHashMap

@Slf4j
@ThreadSafe
public class ConcurrentHashMapExample {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new ConcurrentHashMap<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i) {
        map.put(i, i);
    }
}

ConcurrentSkipListMap

@Slf4j
@ThreadSafe
public class ConcurrentSkipListMapExample {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    update(count);
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size:{}", map.size());
    }

    private static void update(int i) {
        map.put(i, i);
    }
}
相關文章
相關標籤/搜索