java.util.concurrent從jdk1.5開始新加入的一個包,致力於解決併發編程的線程安全問題,使用戶可以更爲快捷方便的編寫多線程狀況下的併發程序。java
同步容器只有包括Vector和HashTable,相比其餘容器類只是多用了Synchronize的技術數據庫
1.ArrayList是最經常使用的List實現類,內部是經過數組實現的,它容許對元素進行快速隨機訪問。數組的缺點是每一個元素之間不能有間隔,當數組大小不知足時須要增長存儲能力,就要講已經有數組的數據複製到新的存儲空間中。當從ArrayList的中間位置插入或者刪除元素時,須要對數組進行復制、移動、代價比較高。所以,它適合隨機查找和遍歷,不適合插入和刪除。編程
2.Vector與ArrayList同樣,也是經過數組實現的,不一樣的是它支持線程的同步,即某一時刻只有一個線程可以寫Vector,避免多線程同時寫而引發的不一致性,但實現同步須要很高的花費,所以,訪問它比訪問ArrayList慢數組
注意: Vector線程安全、ArrayList線程不安全緩存
1.HashMap不是線程安全的 安全
HastMap是一個接口 是map接口的子接口,是將鍵映射到值的對象,其中鍵和值都是對象,而且不能包含重複鍵,但能夠包含重複值。HashMap容許null key和null value,而hashtable不容許。多線程
2.HashTable是線程安全的一個Collection。併發
3.HashMap是Hashtable的輕量級實現(非線程安全的實現),他們都完成了Map接口,主要區別在於HashMap容許空(null)鍵值(key),因爲非線程安全,效率上可能高於Hashtable。app
HashMap容許將null做爲一個entry的key或者value,而Hashtable不容許。dom
HashMap把Hashtable的contains方法去掉了,改爲containsvalue和containsKey。
注意: HashTable線程安全,HashMap線程不安全。
Collections.synchronized*(m) 將線程不安全集合變爲線程安全集合
Map m = Collections.synchronizedMap(new HashMap()); List l = Collections.synchronizedList(new ArrayList());
ConcurrentHashMap內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的HashTable,它們有本身的鎖。只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。把一個總體分紅了16個段(Segment)也就是最高支持16個線程的併發修改操做。
這也是在重線程場景時減少鎖的粒度從而下降鎖競爭的一種方案。而且代碼中大多共享變量使用volatile關鍵字聲明,目的是第一時間獲取修改的內容,性能很是好。
CountDownLatch是JAVA提供在java.util.concurrent包下的一個輔助類,能夠把它當作是一個計數器,其內部維護着一個count計數,只不過對這個計數器的操做都是原子操做,同時只能有一個線程去操做這個計數器,CountDownLatch經過構造函數傳入一個初始計數值,調用者能夠經過調用CounDownLatch對象的cutDown()方法,來使計數減1;若是調用對象上的await()方法,那麼調用者就會一直阻塞在這裏,直到別人經過cutDown方法,將計數減到0,才能夠繼續執行。
public class Test002 { public static void main(String[] args) throws InterruptedException { System.out.println("等待子線程執行完畢..."); CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { System.out.println("子線程," +Thread.currentThread().getName() + "開始執行..."); countDownLatch.countDown();// 每次減去1 System.out.println("子線程," + Thread.currentThread().getName() + "結束執行..."); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println("子線程," + Thread.currentThread().getName() + "開始執行..."); countDownLatch.countDown(); System.out.println("子線程," + Thread.currentThread().getName() + "結束執行..."); } }).start(); countDownLatch.await();// 調用當前方法主線程阻塞 countDown結果爲0, 阻塞變爲運行狀態 System.out.println("兩個子線程執行完畢...."); System.out.println("繼續主線程執行.."); }}
一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。
使用場景
須要全部的子任務都完成時,才執行主任務,這個時候就能夠選擇使用CyclicBarrier。
public class CyclicBarrierTest { public static void main(String[] args) throws IOException, InterruptedException { //若是將參數改成4,可是下面只加入了3個選手,這永遠等待下去 //Waits until all parties have invoked await on this barrier. CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "1號選手"))); executor.submit(new Thread(new Runner(barrier, "2號選手"))); executor.submit(new Thread(new Runner(barrier, "3號選手"))); executor.shutdown(); }}class Runner implements Runnable { // 一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point) private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { super(); this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(8)); System.out.println(name + " 準備好了..."); // barrier的await方法,在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " 起跑!"); }}
Semaphore是計數信號量。Semaphore管理一系列許可證。每一個acquire方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個release方法增長一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可證這個對象,Semaphore只是維持了一個可得到許可證的數量。
Semaphore常常用於限制獲取某種資源的線程數量
需求: 一個廁所只有3個坑位,可是有10我的來上廁所,那怎麼辦?假設10我的的編號分別爲1-10,而且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候須要看看前面3人是否有人出來了,若是有人出來,進去,不然等待。一樣的道理,4-10號也須要等待正在上廁所的人出來後才能進去,而且誰先進去這得看等待的人是否有素質,是否能遵照先來先上的規則。
class Parent implements Runnable { private String name; private Semaphore wc; public Parent(String name,Semaphore wc){ this.name=name; this.wc=wc; } @Override public void run() { try { // 剩下的資源(剩下的茅坑) int availablePermits = wc.availablePermits(); if (availablePermits > 0) { System.out.println(name+"天助我也,終於有茅坑了..."); } else { System.out.println(name+"怎麼沒有茅坑了..."); } //申請茅坑 若是資源達到3次,就等待 wc.acquire(); System.out.println(name+"終於輪我上廁所了..爽啊"); Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時間。 System.out.println(name+"廁所上完了..."); wc.release(); } catch (Exception e) { } }}public class TestSemaphore02 { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i <=10; i++) { Parent parent = new Parent("第"+i+"我的,",semaphore); new Thread(parent).start(); } }}
ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早加入的,尾是最近加入的,該隊列不容許null元素。
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別) poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
private static int count = 2; // 線程個數
//CountDownLatch,一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。
private static CountDownLatch latch = new CountDownLatch(count); public static void main(String[] args) throws InterruptedException { long timeStart = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(4); ConcurrentLinkedQueueTest.offer(); for (int i = 0; i < count; i++) { es.submit(new Poll()); } latch.await(); //使得主線程(main)阻塞直到latch.countDown()爲零才繼續執行 System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms"); es.shutdown(); } /** * 生產 */ public static void offer() { for (int i = 0; i < 100000; i++) { queue.offer(i); } } static class Poll implements Runnable { public void run() { // while (queue.size()>0) { while (!queue.isEmpty()) { System.out.println(queue.poll()); } latch.countDown(); } }}
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:
在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素
ArrayBlockingQueue
ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,咱們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。
ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。
LinkedBlockingQueue
LinkedBlockingQueue阻塞隊列大小的配置是可選的,若是咱們初始化時指定一個大小,它就是有邊界的,若是不指定,它就是無邊界的。說是無邊界,實際上是採用了默認大小爲Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表
SynchronousQueue
SynchronousQueue隊列內部僅容許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另外一個線程消費
public class BlockingQueueTest2 { /** * * 定義裝蘋果的籃子 * */ public class Basket { // 籃子,可以容納3個蘋果 BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3); // 生產蘋果,放入籃子 public void produce() throws InterruptedException { // put方法放入一個蘋果,若basket滿了,等到basket有位置 basket.put("An apple"); } // 消費蘋果,從籃子中取走 public String consume() throws InterruptedException { // take方法取出一個蘋果,若basket爲空,等到basket有蘋果爲止(獲取並移除此隊列的頭部) return basket.take(); } } // 定義蘋果生產者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 生產蘋果 System.out.println("生產者準備生產蘋果:" + instance); basket.produce(); System.out.println("!生產者生產蘋果完畢:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定義蘋果消費者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消費蘋果 System.out.println("消費者準備消費蘋果:" + instance); System.out.println(basket.consume()); System.out.println("!消費者消費蘋果完畢:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 創建一個裝蘋果的籃子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生產者001", basket); Producer producer2 = test.new Producer("生產者002", basket); Consumer consumer = test.new Consumer("消費者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序運行5s後,全部任務中止// try {// Thread.sleep(1000 * 5);// } catch (InterruptedException e) {// e.printStackTrace();// }// service.shutdownNow(); }}
優先級阻塞隊列,該實現類須要本身實現一個繼承了 Comparator 接口的類, 在插入資源時會按照自定義的排序規則來對資源數組進行排序。 其中值大的排在數組後面 ,取值時從數組頭開始取
public class TestQueue{ static Logger logger = LogManager.getLogger(); static Random random = new Random(47); public static void main(String args[]) throws InterruptedException { PriorityBlockingQueue<PriorityEntity> queue = new PriorityBlockingQueue<PriorityEntity>(); ExecutorService executor = Executors.newCachedThreadPool(); executor.execute(new Runnable() { public void run() { int i = 0; while (true) { queue.put(new PriorityEntity(random.nextInt(10), i++)); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); } catch (InterruptedException e) { logger.error(e); } } } }); executor.execute(new Runnable() { public void run() { while (true) { try { System.out.println("take-- " + queue.take() + " left:-- [" + queue.toString() + "]"); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(3000)); } catch (InterruptedException e) { logger.error(e); } } catch (InterruptedException e) { logger.error(e); } } } }); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { logger.error(e); } } static class PriorityEntity implements Comparable<PriorityEntity> { private static int count = 0; private int id = count++; private int priority; private int index = 0; public PriorityEntity(int _priority, int _index) { System.out.println("_priority : " + _priority); this.priority = _priority; this.index = _index; } public String toString() { return id + "# [index=" + index + " priority=" + priority + "]"; } //數字小,優先級高 public int compareTo(PriorityEntity o) { return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0; } }}
開發過程當中,合理地使用線程池能夠帶來3個好處:
下降資源消耗:經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
提升響應速度:當任務到達時,任務能夠不須要等到線程建立就能當即執行。
提升線程的可管理性:線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。
線程池做用就是限制系統中執行線程的數量。
根據系統的環境狀況,能夠自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了形成系統擁擠效率不高。用線程池控制線程數量,其餘線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務須要運行時,若是線程池中有等待的工做線程,就能夠開始運行了;不然進入等待隊列。
建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { public void run() { System.out.println(index); } }); } } }
線程池爲無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程
建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
由於線程池大小爲3,每一個任務輸出index後sleep 2秒,因此每兩秒打印3個數字。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
建立一個定長線程池,支持定時及週期性任務執行。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }public class ThreadPoolExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } }
表示延遲3秒執行
public class ThreadPoolExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); } }
表示延遲1秒後每3秒執行一次
newSingleThreadExecutor
建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
阿里發佈的 Java開發手冊中強制線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險
ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize - 線程池核心池的大小。
maximumPoolSize - 線程池的最大線程數。
keepAliveTime - 當線程數大於核心時,此爲終止前多餘的空閒線程等待新任務的最長時間。
unit - keepAliveTime 的時間單位。
workQueue - 用來儲存等待執行任務的隊列。
threadFactory - 線程工廠。
handler - 拒絕策略。
線程優先級:
corePoolSize > workQueue > maximumPoolSize>handler(拒絕)
拒絕策略:
要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:
CPU密集型時,任務能夠少配置線程數,大概和機器的cpu核數至關,這樣可使得每一個線程都在執行任務 IO密集型時,大部分線程都阻塞,故須要多配置線程數,2*cpu核數 操做系統之名稱解釋: 某些進程花費了絕大多數時間在計算上,而其餘則在等待I/O上花費了大可能是時間,前者稱爲計算密集型(CPU密集型)computer-bound,後者稱爲I/O密集型,I/O-bound。
寫在最後:歡迎留言討論,加關注,持續更新!!!