可能有些同窗知道ArrayList,HashSet,,HashMap這些容器都是線程不安全的,若是多個線程併發的訪問這些容器就會致使線程不安全問題,不少時候須要咱們手動對這些容器進行同步處理,形成咱們很大的不便,所以java爲咱們提供了同步容器和併發容器來解決這個問題。java
首先詳細介紹前,須要強調下同步容器是線程安全的類,可是也可能形成線程不安全的問題,緣由在後面有解釋。數組
同步容器的原理很簡單,就是在原容器的基礎上加了synchronize的鎖,來保證同一時間只有一個線程來訪問。安全
同步容器總的能夠分爲兩類:併發
Vector案例一(線程安全)性能
@Slf4j @ThreadSafe public class VectorExample1 { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static List<Integer> list = new Vector<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }
Vector案例二(線程不安全)ui
@NotThreadSafe public class VectorExample2 { private static Vector<Integer> vector = new Vector<>(); public static void main(String[] args) { while (true) { for (int i = 0; i < 10; i++) { vector.add(i); } Thread thread1 = new Thread() { public void run() { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } } }; Thread thread2 = new Thread() { public void run() { for (int i = 0; i < vector.size(); i++) { vector.get(i); } } }; thread1.start(); thread2.start(); } } }
我在上面的代碼標題上已經提早說明這是個線程不安全的類了,爲何同步容器的Vector也多是線程不安全的呢。你們能夠實際運行下上面的類,應該會報數組越界的錯誤。spa
這裏我解釋下,Vector雖然能保證同一個時刻只有一個線程在訪問它,以上面的代碼爲例,當咱們的線程2運行到get(i)的時候,線程1恰好把這個數據移除,這個時候就會出現問題。因此同步容器由於操做順序的緣由,可能會產生線程不安全的問題。線程
Vector案例二code
public class VectorExample3 { // java.util.ConcurrentModificationException private static void test1(Vector<Integer> v1) { // foreach for(Integer i : v1) { if (i.equals(3)) { v1.remove(i); } } } // java.util.ConcurrentModificationException private static void test2(Vector<Integer> v1) { // iterator Iterator<Integer> iterator = v1.iterator(); while (iterator.hasNext()) { Integer i = iterator.next(); if (i.equals(3)) { v1.remove(i); } } } // success private static void test3(Vector<Integer> v1) { // for for (int i = 0; i < v1.size(); i++) { if (v1.get(i).equals(3)) { v1.remove(i); } } } public static void main(String[] args) { Vector<Integer> vector = new Vector<>(); vector.add(1); vector.add(2); vector.add(3); test1(vector); } }
結果:前兩種test方法均會拋出溢常,第三種正常,你們在用foreach和iterator的時候不要對容器的數據進行移除操做,由於這兩種方法會對容器的大小和預期的值進行校驗。同理ArrayList等也會產生這樣的問題的。這個東西對於我實在是印象深入,由於不知道這個問題,鬧出來不少毛病。blog
Collections案例一
@Slf4j @ThreadSafe public class CollectionsExample1 { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }
Collections案例二
@Slf4j @ThreadSafe public class CollectionsExample2 { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } }
Collections案例三
@Slf4j @ThreadSafe public class CollectionsExample3 { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>()); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void update(int i) { map.put(i, i); } }
由上面三個案例能夠看出Collections.synchronizedXXX生成的三個同步容器類獲得的值和預期的結果是相同的,因此是安全的。
總結:同步容器保證了同一時刻只有一個線程在訪問,可是由於操做的緣由,仍是會產生線程不安全的問題,這個時候咱們可使用synchronize或者Lock來對相關代碼塊進行加鎖操做,可是這種狀況下又致使性能比較低下,又有什麼好的解決辦法呢。答案就在下面要介紹的併發容器了,實際項目中,同步容器已經不多使用,更多的仍是被併發容器所取代了。
ArrayList >>CopyOnWriteArrayList
CopyOnWriteArrayList 有幾個缺點:
一、因爲寫操做的時候,須要拷貝數組,會消耗內存,若是原數組的內容比較多的狀況下,可能致使young gc或者full gc
二、不能用於實時讀的場景,像拷貝數組、新增元素都須要時間,因此調用一個set操做後,讀取到數據可能仍是舊的,雖然CopyOnWriteArrayList 能作到最終一致性,可是仍是無法知足實時性要求;
CopyOnWriteArrayList 合適讀多寫少的場景,不過這類慎用
由於誰也無法保證CopyOnWriteArrayList 到底要放置多少數據,萬一數據稍微有點多,每次add/set都要從新複製數組,這個代價實在過高昂了。在高性能的互聯網應用中,這種操做分分鐘引發故障。
CopyOnWriteArrayList透露的思想
如上面的分析CopyOnWriteArrayList表達的一些思想:
一、讀寫分離,讀和寫分開
二、最終一致性
三、使用另外開闢空間的思路,來解決併發衝突
@Slf4j @ThreadSafe public class CopyOnWriteArrayListExample { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static List<Integer> list = new CopyOnWriteArrayList<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void update(int i) { list.add(i); } }
經過結果可知是線程安全的。
HashSet、TreeSet>>CopyOnWriteArraySet、ConcurrentSkipListSet
CopyOnWriteArraySet的底層的實現是CopyOnWriteArrayList,所以它的特色和CopyOnWriteArrayList相似
@Slf4j @ThreadSafe public class CopyOnWriteArraySetExample { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static Set<Integer> set = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } }
ConcurrentSkipListSet是JDK6新增的類,ConcurrentSkipListSet基於map集合,須要注意在此類的批量操做的方法不保證原子性,可是保證底層每次調用的原子性。因此在批量操做時須要另外完成同步操做。
@Slf4j @ThreadSafe public class ConcurrentSkipListSetExample { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static Set<Integer> set = new ConcurrentSkipListSet<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void update(int i) { set.add(i); } }
HashMap、TreeMap>>ConcurrentHashMap、ConcurrentSkipListMap
ConcurrentHashMap
@Slf4j @ThreadSafe public class ConcurrentHashMapExample { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentHashMap<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void update(int i) { map.put(i, i); } }
ConcurrentSkipListMap
@Slf4j @ThreadSafe public class ConcurrentSkipListMapExample { // 請求總數 public static int clientTotal = 5000; // 同時併發執行的線程數 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); update(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void update(int i) { map.put(i, i); } }