目錄html
java.util包下面的容器集主要有兩種,一種是Collection接口下面的List和Set,一種是Map,
大體結構以下:java
同步容器也叫線程安全容器,是經過syncrhoized關鍵字對線程不安全的操做進行加鎖來保證線程安全的
其中同步容器主要包括:
1.Vector、Stack、HashTable
2.Collections 工具類中提供的同步集合類
Collections類是一個工具類,至關於Arrays類對於Array的支持,Collections類中提供了大量對集合或者容器進行排序、查找的方法。它還提供了幾個靜態方法來建立同步容器類:api
java.util.concurrent提供了多種線程安全容器,大多數是使用系統底層技術實現的線程安全,也叫併發容器,相似native。Java8中使用CAS。數組
這裏主要介紹一些常見的同步容器和併發容器,經過案例輸出結果對比進行介紹
我大體分爲了三類Map/Set,List,Queue來進行講解,但一個Map/Set,只介紹了Map,由於在java的設計中,Set就是Map,說白了就是隻有Key沒有Value的Map,好了,如今開始進入正題安全
代碼中new了三個Map,HashTable,ConcurrentHashMap,ConcurrentSkipListMap比較每一個map的運行效率,起100個線程向map中存放10000條隨機數,並經過門閂CountDownLatch控制運行狀態,輸出運行時間多線程
/** * 併發容器 - ConcurrentMap */ package com.bernardlowe.concurrent.t06; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; public class Test_01_ConcurrentMap { public static void main(String[] args) { final Map<String, String> map = new Hashtable<>(); // final Map<String, String> map = new ConcurrentHashMap<>(); // final Map<String, String> map = new ConcurrentSkipListMap<>(); final Random r = new Random(); Thread[] array = new Thread[100]; final CountDownLatch latch = new CountDownLatch(array.length); long begin = System.currentTimeMillis(); for(int i = 0; i < array.length; i++){ array[i] = new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 10000; j++){ map.put("key"+r.nextInt(100000000), "value"+r.nextInt(100000)); } latch.countDown(); } }); } for(Thread t : array){ t.start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("執行時間爲 : " + (end-begin) + "毫秒!"); } }
Hashtable結果:併發
ConcurrentHashMap結果:dom
ConcurrentSkipListMap結果:ide
ConcurrentHashMap的底層是哈希實現的同步Map(Set)
ConcurrentSkipListMap內部是SkipList(跳錶)結構實現的非阻塞讀/寫/刪除 的 Map,它的value是有序存儲的, 並且其內部是由縱橫鏈表組成,在JDK1.8中,ConcurrentHashMap的性能和存儲空間要優於ConcurrentSkipListMap工具
爲了讓測試數據結果對比更加直觀,我這裏故意將生成的隨機數調的比較大。這裏須要注意一下,在測試的時候,若是機器性能比較好,可能結果會出現偏差,由於System.currentTimeMillis(),這個方法調用了個native方法,獲取的時間精度會依賴於操做系統的實現機制,具體爲何,能夠看看這篇文章http://blog.sina.com.cn/s/blog_6b8bd9d80101fe8t.html。但我按照文檔的辦法將System.currentTimeMillis()改成System.nanoTime(),發現並無解決這個問題,多是由於並無達到納秒級別吧。
下面代碼與4.1的代碼相似,也是new了三個List,ArrayList,Vector,CopyOnWriteArrayList,起100個線程向map中存放1000條隨機數,並經過門閂CountDownLatch控制運行狀態,輸出運行時間和最後list的的長度。因爲ArrayList是線程不安全,在多線程執行的時候,須要try{}catch{},不然會由於數組越界而報錯,由於ArrayList底層是一個長度動態擴展的數組
/** * 併發容器 - CopyOnWriteList */ package com.bernardlowe.concurrent.t06; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; public class Test_02_CopyOnWriteList { public static void main(String[] args) { final List<String> list = new ArrayList<String>(); // 線程不安全 // final List<String> list = new Vector<>(); // 線程安全 // final List<String> list = new CopyOnWriteArrayList<>(); // 線程安全 final Random r = new Random(); Thread[] array = new Thread[100]; final CountDownLatch latch = new CountDownLatch(array.length); long begin = System.currentTimeMillis(); for(int i = 0; i < array.length; i++){ array[i] = new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 1000; j++){ try { list.add("value" + r.nextInt(100000)); } catch (Exception e) { } } latch.countDown(); } }); } for(Thread t : array){ t.start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("執行時間爲 : " + (end-begin) + "毫秒!"); System.out.println("List.size() : " + list.size()); } }
ArrayList結果:由於ArrayList是線程不安全的,因此在多線程環境中,可能會丟失數據
Vector結果:
CopyOnWriteArrayList結果:
CopyOnWriteArrayList是讀寫分離的,寫時複製出一個新的數組,完成插入、修改或者移除操做後將新數組賦值給array,讀取時直接讀取最新的數組,因此在寫操做時,效率很是低(雖然寫比較慢,但它在刪除數組頭和尾仍是很快的)
從上面三個結果能夠看出,CopyOnWriteArrayList雖然保證了線程安全,但它的寫操做效率過低了,但相比Vector,併發安全且性能比Vector好,Vector是增刪改查方法都加了synchronized,保證同步,可是每一個方法執行的時候都要去得到鎖,性能就會大大降低,而CopyOnWriteArrayList 只是在增刪改上加鎖,可是讀不加鎖,在讀方面的性能就好於Vector,CopyOnWriteArrayList支持讀多寫少的併發狀況,因此CopyOnWriteArrayList是不會存在髒讀問題的
這一節主要介紹一些併發隊列的經常使用api
基礎鏈表同步隊列
peek() -> 查看queue中的首數據
poll() -> 獲取queue中的首數據
/** * 併發容器 - ConcurrentLinkedQueue * 隊列 - 鏈表實現的。 */ package com.bernardlowe.concurrent.t06; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class Test_03_ConcurrentLinkedQueue { public static void main(String[] args) { Queue<String> queue = new ConcurrentLinkedQueue<>(); //向隊列中增長10個數據 for(int i = 0; i < 10; i++){ queue.offer("value" + i); } System.out.println(queue); System.out.println(queue.size()); // peek() -> 查看queue中的首數據, System.out.println("首數據 " + queue.peek()); System.out.println("隊列長度 "+ queue.size()); System.out.println("==================="); // poll() -> 獲取queue中的首數據 System.out.println("首數據 " + queue.peek()); System.out.println("隊列長度 "+ queue.size()); } }
結果:
阻塞隊列,隊列容量不足自動阻塞,隊列容量爲0自動阻塞。
put & take - 自動阻塞
put自動阻塞, 隊列容量滿後,自動阻塞
take自動阻塞方法, 隊列容量爲0後,自動阻塞
/** * 併發容器 - LinkedBlockingQueue * 阻塞容器。 */ package com.bernardlowe.concurrent.t06; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class Test_04_LinkedBlockingQueue { final BlockingQueue<String> queue = new LinkedBlockingQueue<>(); final Random r = new Random(); public static void main(String[] args) { final Test_04_LinkedBlockingQueue t = new Test_04_LinkedBlockingQueue(); new Thread(new Runnable() { @Override public void run() { while(true){ try { t.queue.put("value"+t.r.nextInt(1000)); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "producer").start(); for(int i = 0; i < 3; i++){ new Thread(new Runnable() { @Override public void run() { while(true){ try { System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "consumer"+i).start(); } } }
結果:
結果就是一個簡單的生產者消費者
底層數組實現的有界隊列,當容量不足的時候,有阻塞能力,根據調用API(add/put/offer)不一樣,有不一樣特性
這裏主要介紹三個api方法add,put,offer
offer方法
單參數offer方法,不阻塞。容量不足的時候,返回false。當前新增數據操做放棄。
三參數offer方法(offer(value,times,timeunit)),容量不足的時候,阻塞times時長(單位爲timeunit),若是在阻塞時長內,有容量空閒,新增數據返回true。若是阻塞時長範圍內,無容量空閒,放棄新增數據,返回false。
/** * 併發容器 - ArrayBlockingQueue * 有界容器。 */ package com.bernardlowe.concurrent.t06; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Test_05_ArrayBlockingQueue { final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); public static void main(String[] args) { final Test_05_ArrayBlockingQueue t = new Test_05_ArrayBlockingQueue(); for(int i = 0; i < 5; i++){ // 1.add method System.out.println("add method : " + t.queue.add("value"+i)); // 2.put method // try { // t.queue.put("put"+i); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println("put method : " + i); // 3.offer method // System.out.println("offer method : " + t.queue.offer("value"+i)); // try { // System.out.println("offer method : " + // t.queue.offer("value"+i, 1, TimeUnit.SECONDS)); // } catch (InterruptedException e) { // e.printStackTrace(); // } } System.out.println(t.queue); } }
add方法結果:容量不足的時候,拋出異常
put方法結果:容量不足的時候,阻塞等待
單/多參數offer方法結果:
單參數offer:容量不足,直接返回結果,不阻塞
多參數offer:容量不足,阻塞
延時隊列。根據比較機制,實現自定義處理順序的隊列。經常使用於定時任務。
如:定時關機。
具體示例代碼以下
/** * 併發容器 - DelayQueue */ package com.bernardlowe.concurrent.t06; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Test_06_DelayQueue { static BlockingQueue<MyTask_06> queue = new DelayQueue<>(); public static void main(String[] args) throws InterruptedException { long value = System.currentTimeMillis(); MyTask_06 task1 = new MyTask_06(value + 2000); MyTask_06 task2 = new MyTask_06(value + 1000); MyTask_06 task3 = new MyTask_06(value + 3000); MyTask_06 task4 = new MyTask_06(value + 2500); MyTask_06 task5 = new MyTask_06(value + 1500); queue.put(task1); queue.put(task2); queue.put(task3); queue.put(task4); queue.put(task5); System.out.println(queue); System.out.println(value); for(int i = 0; i < 5; i++){ System.out.println(queue.take()); } } } class MyTask_06 implements Delayed { private long compareValue; public MyTask_06(long compareValue){ this.compareValue = compareValue; } /** * 比較大小。自動實現升序 * 建議和getDelay方法配合完成。 * 若是在DelayQueue是須要按時間完成的計劃任務,必須配合getDelay方法完成。 */ @Override public int compareTo(Delayed o) { return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } /** * 獲取計劃時長的方法。 * 根據參數TimeUnit來決定,如何返回結果值。 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString(){ return "Task compare value is : " + this.compareValue; } }
結果:
這裏主要是兩個方法的區別,add和transfer
/** * 併發容器 - LinkedTransferQueue * 轉移隊列 */ package com.bernardlowe.concurrent.t06; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; public class Test_07_TransferQueue { TransferQueue<String> queue = new LinkedTransferQueue<>(); public static void main(String[] args) { final Test_07_TransferQueue t = new Test_07_TransferQueue(); /*new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " thread begin " ); System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }, "output thread").start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } try { t.queue.transfer("test string"); } catch (InterruptedException e) { e.printStackTrace(); }*/ new Thread(new Runnable() { @Override public void run() { try { t.queue.transfer("test string"); // t.queue.add("test string"); System.out.println("add ok"); } catch (Exception e) { e.printStackTrace(); } } }).start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " thread begin " ); System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }, "output thread").start(); } }
這裏的transfer()和take()都是阻塞方法,take先請求接收數據或者transfer先發送數據,都會進行阻塞等待。
舉個例子,transfer()就至關與手機打電話,當A給B打電話,B必須接收到電話信號接聽才能進行通話,不然A會一直等待
add()就至關於A給B發短信,短信已經存到了運營商那邊,等待B接收,無論發短信時B是否在線
該隊列一個容量爲0的隊列,是一個特殊的TransferQueue,它和TransferQueue很像,但這個隊列必需要有消費線程才行
又兩個方法add,put
add方法,無阻塞。若沒有消費線程阻塞等待數據,則拋出異常。
put方法,有阻塞。若沒有消費線程阻塞等待數據,則阻塞。
/** * 併發容器 - SynchronousQueue */ package com.bernardlowe.concurrent.t06; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class Test_08_SynchronusQueue { BlockingQueue<String> queue = new SynchronousQueue<>(); public static void main(String[] args) { final Test_08_SynchronusQueue t = new Test_08_SynchronusQueue(); new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " thread begin " ); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - " + t.queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }, "output thread").start(); /*try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }*/ // t.queue.add("test add"); try { t.queue.put("test put"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size()); } }
將t.queue.add("test add");
的註釋打開,t.queue.put("test put");
加上註釋
add方法異常結果: 由於它是一個容量爲0的隊列