JDK內部提供了大量實用的API和框架。本章主要介紹這些JDK內部功能,主要分爲3大部分:java
首先,介紹有關同步控制的工具,以前介紹的synchronized就是一種同步控制手段,將介紹更加豐富的多線程控制方法。node
其次,將詳細介紹JDK對線程池的支持,使用線程池,將很大程度提升線程調度的性能。程序員
第三,介紹JDK的一些併發容器。這些容器專爲並行訪問所設計,絕對是高效、安全、穩定的實用工具。算法
以前提到的synchronized是最簡單的同步控制的方法。本節中,首先介紹synchronized、Object.wait()、Object.notify()方法的替代品(或者說加強版)——重入鎖數據庫
jdk6.0以前,重入鎖的性能遠遠好於synchronized,可是以後,二者差距並不大。數組
下面是一段重入鎖使用案例:安全
public class ReenterLock implements Runnable{ public static ReentrantLock lock=new ReentrantLock(); public static int i=0; @Override public void run() { for(int j=0;j<10000000;j++){ lock.lock(); //1 try{ i++; }finally{ lock.unlock(); //2 } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl=new ReenterLock(); Thread t1=new Thread(tl); Thread t2=new Thread(tl); t1.start();t2.start(); t1.join();t2.join(); System.out.println(i); } }
1處加鎖,2處釋放鎖。數據結構
重入鎖是可讓線程反覆進入的,這裏的反覆僅僅侷限於一個線程。能夠寫成下面的形式:多線程
lock.lock(); lock.lock(); try{ i++; }finally{ lock.unlock(); lock.unlock(); }
這種狀況下,一個線程連續兩次得到同一把鎖,這是容許的!同時,釋放也必須釋放兩次,釋放次數多了,拋出異常,次數少了,至關於線程還持有當前鎖,其餘線程沒法進入臨界區。併發
重入鎖除了靈活,還提供了中斷處理的能力:
對於synchronized來講,若是一個線程在等待鎖,那麼結果只有兩種狀況,要麼它得到這把鎖繼續執行,要麼它保持等待。而重入鎖提供了另外一種可能,那就是線程能夠被中斷。也就是在等待鎖的過程當中,程序能夠根據須要取消對鎖的請求。有些時候,這麼作頗有必要。若是一個線程正在等待鎖,那麼它依然能夠收到一個通知,被告知無需再等待,能夠中止工做了。這種狀況對於處理死鎖是有必定幫助的。
下面代碼產生了一個死鎖,但得益於鎖中斷,咱們能夠輕鬆解決這個死鎖:
public class IntLock implements Runnable { //重入鎖ReentrantLock public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public IntLock(int lock) { this.lock = lock; } @Override public void run() { // TODO Auto-generated method stub try { if (lock == 1) { lock1.lockInterruptibly(); //1 Thread.sleep(500); lock2.lockInterruptibly(); System.out.println("lock1 is working...."); } else { lock2.lockInterruptibly(); Thread.sleep(500); lock1.lockInterruptibly(); System.out.println("lock2 is working...."); } } catch (Exception e) { e.printStackTrace(); } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); //釋放鎖 } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":線程退出"); } } public static void main(String[] args) throws InterruptedException { IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); Thread.sleep(1000); t2.interrupt(); //2 } }
線程t1和t2啓動後,t1先佔用lock1,再佔用lock2;t2先佔用lock2,再請求lock1。這很容易照成t一、t2互相等待,造成死鎖。這裏,統一使用1處的lockInterruptibly()方法,這是一個能夠對中斷進行相應的鎖申請動做,即在等待鎖的過程當中,能夠響應中斷。
在2處,t2線程被中斷,放棄對lock1的鎖申請,同時釋放已得到的lock2。這時t1就能順利執行完剩餘程序
除了外部通知以外,避免死鎖還有另一種方法,就是限時等待。咱們可使用tryLock()方法進行一次限時的等待。
public class TimeLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { //2 Thread.sleep(6000); //1 } else { System.out.println("get lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if(lock.isHeldByCurrentThread()) lock.unlock(); } } public static void main(String[] args) { TimeLock tl = new TimeLock(); Thread t1 = new Thread(tl); Thread t2 = new Thread(tl); t1.start(); t2.start(); } }
在這裏2處,tryLock()方法接收兩個參數,一個表示等待時長,另一個表示計時單位。沒個進入臨界區的線程須要佔用6秒的鎖(1處),而t2因爲等待5秒沒有等到想要的鎖(2處),便返回false。若等待時間改成比5秒大,將返回true,並得到鎖。
在大多數狀況下,鎖的申請是非公平的。系統知識隨機挑選一個,不保證其公平性。公平的鎖,會按照時間的前後順序,保證先到者先得,後到者後得。公平鎖的一大特色是:不會產生飢餓現象。咱們使用synchronized關鍵字獲得的就是非公平鎖,而重入鎖能夠對公平性設置。它有一個構造函數:
public ReentrantLock(boolean fair) //爲true時是公平鎖
實現公平鎖要維護一個有序隊列,所以實現公平鎖的成本較高,性能相對低下,所以,默認狀況下,鎖時非公平的。
public class FairLock implements Runnable{ //建立公平鎖 private static ReentrantLock lock=new ReentrantLock(true); //1 public void run() { while(true){ lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"得到鎖"); }finally{ lock.unlock(); } } } public static void main(String[] args) { FairLock lft=new FairLock(); Thread th1=new Thread(lft); Thread th2=new Thread(lft); th1.start(); th2.start(); } }/** Thread-0得到鎖 Thread-1得到鎖 Thread-0得到鎖 Thread-1得到鎖 Thread-0得到鎖 Thread-1得到鎖 */
你運行上面的程序,會看到結果頗有規律。
若是不使用公平鎖,根據系統的調度,一個線程會傾向於再次獲取已經持有的鎖,這種分配方式是高效的。可是無公平性可言,將上面1中的true改爲false便可。
對ReentrantLock的幾個重要方法整理以下:
就重入鎖的實現來看,它主要集中在java層面。主要包含三個要素:
Condition的做用和wait()和notify()方法的做用是大體相同的。不一樣的是wait()和notify()方法是和synchronized關鍵字合做使用的,而Condition是與重入鎖合做的。經過Lock接口(重入鎖實現了該接口)的newCondition()方法能夠生成一個與當前重入鎖綁定的Condition實例。
Condition接口提供的基本方法:
await()
:使當前線程等待,同時釋放當前鎖,當其餘線程使用signal()或者signalAll()方法時,線程會從新得到鎖並繼續執行。當線程中斷時,也能跳出等待,和Object.wait()很是類似。awaitUninterruptibly()
:與await()基本相同,可是不會響應等待過程當中的中斷。signal()
:喚醒一個等待中的線程,signalAll()
會喚醒全部等待中的線程。下面是Condition的演示:
public class ReenterLockCondition implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); System.out.println("Thread is start..."); condition.await(); System.out.println("Thread is going on"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReenterLockCondition tl = new ReenterLockCondition(); Thread t1 = new Thread(tl); t1.start(); Thread.sleep(2000); //通知線程t1繼續執行 lock.lock(); //1 condition.signal(); lock.unlock(); } }
和Object.wait()和notify()方法同樣,當線程使用Condition.await()時,要求線程持有相關的重入鎖,在Condition.await()調用後,這個線程會釋放這把鎖。同理,在Condition.signal()方法調用時,也要求線程先得到相關的鎖。在siganl()方法調用後,系統會從當前Condition對象的等待隊列中,喚醒一個線程,一旦線程被喚醒,它會從新嘗試得到與之綁定的重入鎖,一旦成功得到,就能夠繼續執行了。所以,通常調用完condition.signal()後,都須要釋放重入鎖。
廣義上講,信號量是對鎖的擴展。不管是內部鎖synchronized仍是重入鎖ReentrantLock,一次都只容許一個線程訪問一個資源,而信號量能夠指定多個線程,同時訪問某個資源。
主要提供了兩個構造函數:
public Semaphore(int permits) public Semaphore(int permits, boolean fair) //第二個參數能夠指定是否公平
在構造信號量對象時,必需要指定信號量的准入數,即同時能申請多少個許可。信號量的主要邏輯方法有:
public void acquire() public void acquireUninterruptibly() public boolean tryAcquire() public boolean tryAcquire(long, TimeUnit unit) public void release()
public class SemapDemo implements Runnable{ final Semaphore semp = new Semaphore(5); //3 @Override public void run() { try { semp.acquire(); //1 //模擬耗時操做 Thread.sleep(2000); System.out.println(Thread.currentThread().getId()+":done!"); //2 semp.release(); //4 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService exec = Executors.newFixedThreadPool(20); final SemapDemo demo=new SemapDemo(); for(int i=0;i<20;i++){ exec.submit(demo); } } }
上述代碼中,1處到2處爲臨界區管理代碼,程序會限制這段代碼的線程數。在第3處,申明瞭一個包含5個許可的信號量。這意味着1~2處只能同時有5個線程進入。線程在使用完acquire(),在離開時,務必使用release()釋放信號量。這和釋放鎖是一個道理。
ReadWriteLock是JDK5中提供的讀寫分離鎖。讀寫分離鎖能夠有效地減小鎖競爭,以提高系統性能。用鎖分離的機制來提高性能很容易理解,若是使用重入鎖或內部鎖,理論上全部讀—讀、讀—寫、寫—寫都是串行操做。而讀寫鎖,容許多個線程同時讀。
好比A一、A二、A3進行寫操做,B一、B二、B3進行讀操做。讀寫鎖容許B一、B二、B3之間並行。可是,考慮數據完整性,寫寫操做和讀寫操做間依然是須要相互等待和持有鎖的。總結以下:
讀-讀不互斥:可並行;
讀-寫互斥;
寫-寫互斥;
public class ReadWriteLockDemo { private static Lock lock = new ReentrantLock(); private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private static Lock readLock = readWriteLock.readLock(); private static Lock writeLock = readWriteLock.writeLock(); private int value; public Object handleRead(Lock lock) throws InterruptedException { try { lock.lock(); // 模擬讀操做 Thread.sleep(1000); // 讀操做的耗時越多,讀寫鎖的優點越明顯 System.out.println(Thread.currentThread().getName()+" read end!"); return value; } finally { lock.unlock(); } } public void handleWrite(Lock lock, int index) throws InterruptedException { try { lock.lock(); // 模擬寫操做 Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+" wrait end!"); value = index; } finally { lock.unlock(); System.out.println(value); } } public static void main(String[] args) { // TODO Auto-generated method stub final ReadWriteLockDemo demo = new ReadWriteLockDemo(); Runnable readRunnable = () -> { // TODO Auto-generated method stub try { demo.handleRead(readLock); // demo.handleRead(lock); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; Runnable writeRunnable = () -> { // TODO Auto-generated method stub try { demo.handleWrite(writeLock, new Random().nextInt()); // demo.handleWrite(lock, new Random().nextInt()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; for (int i = 0; i < 18; i++) { new Thread(readRunnable).start(); //1 } for (int i = 18; i < 20; i++) { new Thread(writeRunnable).start(); //2 } } }
上面代碼中,讀和寫的線程使用耗時的操做來模擬,在1處開啓同時讀的線程,能夠從結果看出讀的速度能夠是並行的,而2處則不行。
這個工具稱爲倒計數器:一般用來控制線程等待,它可讓某一個線程等待直到倒計時結束,再開始執行。
使用場景:好比火箭就很適合使用CountDownLatch。火箭發射前,每每要進行各項設備、儀器檢查,只有檢查完畢後,引擎才能點火。
CountDownLatch的構造函數接收一個整數,即當前這個計數器的計數個數:
public CountDownLatch(int count)
演示:
public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); //1 static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { try { //模擬檢查任務 Thread.sleep(new Random().nextInt(10) * 1000); System.out.println("check complete"); end.countDown(); //2 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { exec.submit(demo); } //等待檢查 end.await(); //3 //發射火箭 System.out.println("Fire!"); exec.shutdown(); } }
在1處,生成一個CountDownLatch,計數數量爲10,表示須要10個線程完成任務,等待在CountDownLatch上的線程才能繼續執行。2處表示一個線程已完成,計數器減一。在3處,要求主線程等待10個線程所有完成任務後,主線程才繼續執行。
主線程在CountDownLatch上等待,當全部檢查任務所有完成後,主線程方能繼續執行。
循環柵欄(CyclicBarrier)和倒計時鎖(CountDownLatch)很是相似:只是循環柵欄的計數器能夠反覆使用。好比假設咱們將計數器設置爲10,那麼湊齊第一批10個線程後,計數器就會歸零,而後接着湊齊下一批10個線程,這就是循環柵欄的內在含義。
使用場景:好比司令下達命令,要10個士兵去完成一項任務,士兵要先集合報道完,接着去執行任務。當10個士兵把手頭任務都執行完成了,司令才能對象宣佈,任務完成!
這裏有兩步:1,士兵集合報道;2,士兵把任務完成。當這兩步前後完成,司令才認爲任務完成。
構造函數:比CountDownLatch稍微強大一些。CyclicBarrier能夠接收一個參數做爲barrierAction(系統當計數器一次計數完成後,系統會執行的動做):
public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldierName) { this.cyclic = cyclic; this.soldier = soldierName; } public void run() { try { //士兵報道 System.out.println(soldier + " 報道"); //等待全部士兵到齊 cyclic.await(); //2 doWork(); //等待全部士兵完成任務 cyclic.await(); //3 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt() % 10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ":任務完成"); } } public static class BarrierRun implements Runnable { boolean flag; int N; public BarrierRun(boolean flag, int N) { this.flag = flag; this.N = N; } public void run() { if (flag) { System.out.println("司令:[士兵" + N + "個,任務完成!]"); } else { System.out.println("司令:[士兵" + N + "集合完畢]"); flag = true; } } } public static void main(String args[]) { final int N = 10; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); //1 //設置屏障點,主要是爲了執行這個方法 System.out.println("隊伍集合"); for (int i = 0; i < N; ++i) { allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i)); allSoldier[i].start(); //4 } } }
在1處,建立了CyclicBarrier實例,並將計數器設置爲10,並要求在計數器達到指標時,執行BarrierRun。在2處,每個士兵線程都會等待,知道全部士兵集合完畢,集合完畢後,意味着CyclicBarrier的一次計數完成,當再一次調用CyclicBarrier()時,會進行下一次計數。在3處,會等待全部士兵完成任務。還能夠第三次第四次調用 cyclic.await();
整個工做過程圖示:
CyclicBarrier.await可能會拋出兩個異常,第一是中斷異常,能夠響應外部緊急事件。大部分迫使線程等待的方法均可能拋出這個異常。第二是它特有的BrokenBarrierException,這個異常說明當前的CyclicBarrier已經破損了,可能沒有辦法等待全部線程到齊了。若是繼續等待,就白等了。
能夠在4處上方插入:
if (i == 5) allSoldier[0].interrupt();
這樣作,咱們能夠獲得1箇中斷異常和9個BrokenBarrierException,1個士兵處於中斷,其餘9個須要等待這個線程,拋出BrokenBarrierException能夠避免其餘9個線程進行永久的,無謂的等待。
LockSupport是一個很實用的線程阻塞工具,能夠在線程的任何位置讓線程阻塞。
和Thread.suspend()相比,它尼補了因爲resume()在前生成的致使線程沒法繼續執行的問題。和Object.wait()相比,它不須要先得到某個對象鎖,也不會拋出InterruptedException。
public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name){ super.setName(name); } @Override public void run() { synchronized (u) { System.out.println("in "+getName()); LockSupport.park(this); //1 } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(1000); t2.start(); LockSupport.unpark(t1); //2 LockSupport.unpark(t2); //3 t1.join(); t2.join(); } }
咱們將原來的suspend和resume方法用park()和unpark()代替,在1處,咱們掛起了當前線程,在2處,咱們分別繼續執行t1和t2,從結果能夠看出,它不會由於unpark在park執行前而致使線程永久掛起。
爲何LockSupport不會致使線程永久掛起?
由於LockSupport使用了相似信號量的機制(不一樣的是不能累加),它爲每一個線程準備了一個許可。
這個特色使得:即便unpark()操做發生在park()以前,它也可使下一次park()操做當即返回。這就是不會致使線程永久掛起的緣由。
同時,處於park()掛起狀態的線程不會像suspend()那樣給出使人費解的Runnable狀態,它會很是明確的給出一個WAITING狀態,甚至會標註是park()引發的。這讓問題很容易分析。
LockSupport除了阻塞功能外,還支持中斷響應。可是和其餘接收中斷的函數不同,它不拋出中斷異常,而是默默返回,但能夠從Thread.interrupted()等方法得到中斷標記。
public class LockSupportIntDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name) { super.setName(name); } public void run() { synchronized (u) { System.out.println("in " + getName()); LockSupport.park(); if (Thread.interrupted()) { System.out.println(getName() + " 被中斷了!"); } } System.out.println(getName() + " 執行結束"); } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); t1.interrupt(); LockSupport.unpark(t2); } }
多線程在多核的處理下有助於性能,但若是不加控制的使用線程,反而會對系統性能產生不利影響。
爲何會形成不利影響?
在實際生產環境中,線程的數量必須獲得控制。
聯想一下數據庫鏈接池,就知道線程池是啥了。
在線程池中,總有那麼幾個活躍的線程。當須要時,就從池中取出空閒線程,當完成工做後,再還回去,方便其餘人使用。
JDK提供了一套Executor框架,用來對線程池的支持。它的核心成員以下:
上面是jdk併發包的核心類。其中ThreadPoolExecutor表示一個線程池。Excecutor扮演線程池工廠的角色,經過它能夠取得一個擁有特定功能的線程池。
Executors提供了各類類型的線程池:
線程池類型 | 做用 |
---|---|
newFixedThreadPool() | 返回固定線程數量的線程池。線程池中線程數量保持不變。有新任務時,有空閒線程,則執行。若沒有則暫存一個任務隊列,等到有空閒線程,再執行 |
newSingleThreadExecutor() | 返回只有一個線程的線程池。如有多餘任務提交線程池,則存入任務隊列,待線程空閒,按先入先出的順序執行任務隊列中的任務。 |
newCachedThreadPool() | 返回一個可根據實際狀況調整線程數量的線程池。如有空閒線程可用,則用空閒線程。若沒有,建立新的線程處理任務。全部線程完成任務後,將返回線程池進行復用。 |
newSingleThreadScheduledExecutor() | 返回一個ScheduleExecutorService對象,線程池大小爲1。它會在給定時間執行某任務。如在固定延時以後,或週期性執行某個任務 |
newScheduledThreadPool() | 返回ScheduleExecutorService對象,但能夠指定線程池數量 |
一、固定大小的線程池
以newFixedThreadPool()爲例,簡單展現線程池的使用:
public class ThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); //1 for (int i = 0; i < 10; i++) { es.submit(task); //2 } } }
在1處,建立了一個固定大小的線程池,內有5個線程。在2處,依次向線程池提交了10個任務。
上面程序,前5個任務和後5個任務的執行時間正好相差1秒。
二、計劃任務
newScheduledThreadPool()。它返回一個ScheduledExecutorService對象,能夠根據時間須要進行調度,它其實起到了計劃任務的做用。它的一些主要方法以下:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
scheduleAtFixedRate()和scheduleWithFixedDelay()都會對任務進行週期性調度,不過它們有小小區別:
下面是scheduleAtFixedRate()的例子,任務執行1秒,調用週期2秒。即每2秒,任務會被執行一次。
public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 0, 2, TimeUnit.SECONDS); } }
上面的執行結果是每次打印時間間隔爲2秒。
那若是任務的執行時間超過調度時間,會發生什麼呢?會出現堆疊的狀況嗎,不會,若出現這種狀況,任務的週期將變成8秒,即任務完成那一刻纔開始下一次任務的調度。
若是採用scheduleWithFixedDelay(),任務的實際間隔將是10秒。
對於核心的幾個線程池,其內部都使用了ThreadPoolExecutor實現。這裏就不給出它們的實現方式了。
下面是ThreadPoolExecutor最重要的構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
參數含義:
參數workQueue:被提交但未被執行的任務隊列,它是一個BlockingQueue接口的對象,僅用於存放Runnable對象。ThreadPoolExecutor的構造函數中可使用下面幾種BlockingQueue:
直接提交的隊列:該功能由SynchronousQueue
提供。SynchronousQueue沒有容量,若使用它,提交的任務不會被真實的保存,而老是將任務提交給線程執行。若沒有空線程,則建立;若線程數量達到頂峯,則執行拒絕策略。一般,使用SynchronousQueue須要很大的maximumPoolSize值,不然很容易執行拒絕策略。
有界的任務隊列:可使用ArrayBlockingQueue
實現。它的構造函數必須帶一個容量參數,表示最大容量。若實際線程數小於corePoolSize,則優先建立新線程,若大於corePoolSize,則將新任務加入等待隊列。若等待隊列已滿,在總線程數<=maximumPoolSize的前提下,建立新的進行執行任務。若>maximumPoolSize,執行拒絕策略。
可見,有界隊列僅在任務隊列裝滿時,才肯呢過將線程數提高至corePoolSize以上。換句話說,除非系統很是繁忙,不然核心線程數維持在corePoolSize。
無界的任務隊列:可經過LinkedBlockingQueue
實現。與有界隊列相比,除非系統資源耗盡,不然無界隊列不存在任務入隊失敗的狀況。
當系統線程數<corePoolSize,建立新線程;
若>=corePoolSize,不增長新線程,加入等待隊列,若任務建立和處理速度差別太大,無界隊列會保持快速增加,知道耗盡系統內存。
優先任務隊列(帶有執行優先級的隊列):經過PriorityBlockingQueue實現,能夠控制任務的執行先後順序。高優先級的任務線執行。
回顧一下:
newFixedThreadPool()方法:它的corePoolSize和maximumPoolSize大小同樣,由於固定大小的線程池不存在線程數量的動態變化。同時,它使用無界隊列存聽任務列表,從而在任務提交頻繁的狀況下有可能耗盡系統資源。
newSingleThreadExecutor()返回單線程線程池,是newFixedThreadPool()的退化,只是簡單將線程數設爲1。
newCachedThreadPool()方法返回corePoolSize爲0,maximumPoolSize無窮大的線程池。剛開始該線程池無線程,它會將提交的線程加入SynchronousQueue,這是一種當即提交的隊列,它會迫使線程池增長新的線程執行任務。當任務執行完畢,在60秒內將線程池不用的線程回收(不留任何空閒線程)。所以,當同時有大量任務提交時,任務執行又不快,那麼系統便會開啓燈亮線程處理,很快就會耗盡系統資源。
JDK內置的拒絕策略:
能夠本身擴展RejectedExecutionHandle接口實現本身的拒絕策略,下面代碼簡單演示了自定義線程池和拒絕策略的使用:
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler(){ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+" is discard"); } }); //1 for (int i = 0; i < Integer.MAX_VALUE; i++) { es.submit(task); Thread.sleep(10); } } }
上述1處自定義了一個線程池。該線程池有5個常駐線程,而且最大線程數量也是5個。這和固定大小的線程池是同樣的。可是它卻擁有一個只有10個容器的等待隊列。在這裏,咱們自定義了拒絕策略,只是比DiscardPolicy高級一點點,把拒絕的信息打印出來,在實際應用中,咱們能夠將其記錄到日誌上。用來分析系統的負載和任務丟失狀況。
線程池的主要做用是爲了線程複用,也就是避免了線程的頻繁建立。可是,線程池最開始的線程從何而來呢?答案就是ThreadFactory。
ThreadFactory是一個接口,它只有一個方法,用來建立線程:
Thread newThread(Runnable r);
當線程池須要新建線程時,就會調用這個方法。
咱們使用自定義線程能夠更自由地設置池子中全部線程的狀態,甚至能夠設置爲守護線程:
public class TFThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread t= new Thread(r); t.setDaemon(true); System.out.println("create "+t); return t; } } ); for (int i = 0; i < 5; i++) { es.submit(task); } Thread.sleep(2000); } }
雖然JDK已經幫咱們實現了這個穩定的高性能線程池。但若是咱們須要對線程池進行一些擴展。好比,想監控每一個任務執行的開始和結束時間,或者其餘一些自定義加強功能,怎麼辦呢?
ThreadPoolExecutor:它也是一個能夠擴展的線程池。它提供了beforeExecute()、afterExecute()和terminated()三個接口對線程池進行控制。
在默認的ThreadPoolExecutor實現中,提供了空的beforeExecute()、afterExecute()實現。在實際引用中,能夠對其擴展實現對線程池運行狀態的跟蹤,輸出一些有用的調試信息,用以幫助系統故障診斷。下面演示對線程池的擴展,在這個擴展中,將記錄每個任務的執行日誌:
public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + ",Task Name=" + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行:" + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執行完成:" + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("線程池退出"); } }; for (int i = 0; i < 5; i++) { MyTask task = new MyTask("TASK-GEYM-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); } }/** ...... 正在執行:Thread ID:13,Task Name=TASK-GEYM-2 準備執行:TASK-GEYM-3 正在執行:Thread ID:14,Task Name=TASK-GEYM-3 準備執行:TASK-GEYM-4 正在執行:Thread ID:15,Task Name=TASK-GEYM-4 執行完成:TASK-GEYM-0 執行完成:TASK-GEYM-1 執行完成:TASK-GEYM-2 執行完成:TASK-GEYM-3 執行完成:TASK-GEYM-4 線程池退出 */
能夠看到,全部任務執行先後都捕獲到了。這對於應用的調試和診斷是很是有幫助的。
線程池的大小對系統的性能有必定的影響。過大或太小的線程數量都沒法發揮最優的系統性能。可是也不用作得很是精確,只要避免極大和極小兩種狀況便可。
先說明一下要解決的問題!
public class DivTask implements Runnable { int a, b; public DivTask(int a, int b) { this.a = a; this.b = b; } @Override public void run() { double re = a / b; //1 System.out.println(re); } public static void main(String[] args) { ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for(int i=0; i<5; i++) pools.submit(new DivTask(100, i)); // pools.execute(new DivTask(100, i)); } }/** 100.0 25.0 50.0 33.0 */
在上面程序中,只有四個輸出結果,少了一個,然而沒有報錯信息。使用submit會出現這樣的狀況(execute會拋出異常,具體緣由後面再看吧)。
再說下解決方案!
對於程序員來講,沒有異常堆棧是最頭疼的事。咱們能夠經過兩種方法來討回異常堆棧:
1 是放棄submit()改用execute(),如註釋所示;
pools.execute(new DivTask(100, i));
2 是改造submit():
Future<?> submit = pools.submit(new DivTask(100, i)); submit.get();
以上兩種均可以獲得部分堆棧信息:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at geym.ch3.DivTask.main(DivTask.java:21) Caused by: java.lang.ArithmeticException: / by zero at geym.ch3.DivTask.run(DivTask.java:13) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
注意,上面說的是部分!咱們只知道異常是在哪裏拋出的,也就是代碼的1處,可是不肯定線程是在哪裏提交的,任務的具體提交的位置被淹沒了!
三、本身動手,擴展ThreadPoolExecutor線程池(完全解決的辦法)
爲了少加班!咱們仍是本身動手,把堆棧的信息完全挖出來吧!擴展咱們的ThreadPoolExecutor線程池,讓它在調度任務以前,先保存一下提交任務線程的堆棧信息。
public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue); } @Override public void execute(Runnable task) { // TODO Auto-generated method stub super.execute(wrap(task, clientTrace(), Thread.currentThread().getName())); //包裝器 } @Override public Future<?> submit(Runnable task) { // TODO Auto-generated method stub return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private Exception clientTrace() { return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task,final Exception clientStack, String clientThreadName) { //1 return new Runnable() { @Override public void run() { try { task.run(); } catch (Exception e) { clientStack.printStackTrace(); try { throw e; } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } }; } public static class DivTask implements Runnable { int a,b; public DivTask(int a,int b) { this.a = a; this.b = b; } @Override public void run() { double re = a/b; System.out.println(re); } } public static void main(String[] args) { ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); for(int i=0; i<5; i++) pools.execute(new DivTask(100, i)); } }
在wrap()(1處)方法的第2個參數爲一個異常,裏面保存着提交任務的線程的堆棧信息。該方法將咱們傳入的Runnable任務進行一層包裝,使之能處理異常信息。當任務發生異常時,這個異常會被打印。
fork()用來開啓分支線程來處理任務,通常會提交給ForkJoinPool線程池進行處理,以節省系統資源。
Join()用來等待fork()的執行分支執行結束。
使用Fork/Join進行數據處理時的整體結構如圖所示:
因爲線程池的優化,提交的任務和線程數量不是一對一的關係。一般是一個線程處理多個任務,每一個線程都有一個任務隊列。當線程A把任務完成,而線程B還在有一堆任務處理時,線程A會幫助B。B從任務隊列頂部拿數據,而A則是從任務隊列的底部拿數據,這樣有利於避免數據競爭。
ForkJoinPool的一個重要的接口,能夠提交一個ForkJoinTask,ForkJoinTask支持fork()分解以及join()等待的任務,它有兩個重要子類:RecursiveAction(無返回值)和RecursiveTask(返回v類型)。
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);
使用:
public class CountTask extends RecursiveTask<Long> { private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { this.start = start; this.end = end; } public Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; if (canCompute) { //求和總數小於THRESHOLD,直接求和 for (long i = start; i <= end; i++) { sum += i; } } else { //分紅100個小任務 // 好比start=0,end=100,則每一小步計算2個數 //i=0,lastOne=0+2=2, pos=2+1=3 //i=1,lastOne=2+2=4, pos=4+1=5 //... //i=100 long step = (start + end) / 100; // long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) lastOne = end; CountTask subTask = new CountTask(pos, lastOne); pos = lastOne + 1; subTask.fork(); sum += subTask.join(); } } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0, 200000L); ForkJoinTask<Long> result = forkJoinPool.submit(task); //1 try { long res = result.get(); System.out.println("sum=" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
在1處,使用forkJoinPool提交了CountTask,CountTask構造一個計算1到200000求和的任務。在compute()方法中,遵循了下面的邏輯:
if (canCompute) { //求和總數夠小,直接求和 } else { //分紅若干個小任務 }
在使用ForkJoinPool時須要注意,若是任務的劃分層次很深,一直沒有返回,可能出現兩種狀況:
JDK提供了好用的併發容器類,使用也很方便,這裏主要講講這些工具的具體實現。
先簡單認識一下併發集合:
ConcurrentHashMap
:高效的併發HashMap。即線程安全的HashMapCopyOnWriteArrayList
:屬於List,和ArrayList是一族的。在讀多少寫的場合性能很是好,遠遠好於Vector。ConcurrentLinkedQueue
:線程安全的LinkedList。若是得到一個線程安全的HashMap?
第一種方法是:使用Collections.synchronizedMap()方法來包裝HashMap
static Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
Collections.synchronizedMap()會生成一個名爲SynchronizedMap的Map。它使用委託,將本身全部Map相關的功能交給傳入的HashMap實現,本身則主要負責保證線程安全。
第二種方法是使用ConcurrentHashMap代替HashMap,這種方式更專業,更適合併發場合。
Vector是線程安全的List,也可使用Collections.synchronizedList()方法來包裝任意List。
ConcurrentLinkedQueue算是高併發中性能最好的隊列了。
具體實現:
一、節點
做爲一個鏈表,天然須要定義一個節點:
private static class Node<E>{ volatile E item; volatile Node<E> next;
item用來表示目標元素,好比:放入String,item就是String元素。next表示Node的下一個元素。這樣Node就環環相扣,串在一塊兒了。
二、CAS操做
首先,說明一下CAS操做的原理:CAS操做包含三個操做數—— 內存位置的值(V)、預期原值(A)和新值(B)。若是內存位置的值與預期原值相匹配,那麼處理器會自動將該位置更新爲新值。不然,處理器不作任何操做。不管哪一種狀況,它都會在CAS指令以前返回該位置的值。CAS有效地說明了「我認爲位置V應該包含值A;若是包含該值,則將B放到這個位置;不然,不要更改該位置,只告訴我這個位置如今的值便可。」
CAS是一種樂觀鎖。樂觀鎖在少寫的狀況下適用,如果多寫的狀況,會致使CAS算法不斷的進行retry,反而下降了系統性能,多寫的狀況適合適用悲觀鎖。
casItem()表示設置當前Node的item值。cmp爲指望值,第二個參數val爲目標值。噹噹前值等於cmp指望值時,就會將目標值設置爲val。第二個方法相似。只是它用來設置next字段。
ConcurrentLinkedQueue內部有兩個重要的字段,head和tail,分別表示頭部和尾部。tail的更新不是及時的,而是有延遲,每次更新會跳躍兩個元素。以下圖:
原書中的源碼分析我沒怎麼看懂,有看懂的童鞋歡迎在評論中分享心得
在不少應用場景中,讀操做每每會遠遠大於寫操做。因此這種狀況下,咱們但願讀的性能好些,而寫的性能差些也無所謂。
咱們知道:在讀寫鎖ReadWriteLock中,讀讀不互斥,而讀寫,寫寫是互斥的。
而如今,JDK還提供了另一個讀寫工具類,將讀取性能發揮到極致CopyOnWriteArrayList
,它的讀讀不阻塞,讀寫也不會互相阻塞,只有寫寫須要同步等待。
它是怎麼作到讀寫不阻塞的?
CopyOnWrite在寫入操做時,對原有的數據進行復製成一個副本(而不修改原來的數據),將修改的內容寫入複製後的副本中。寫完後,再用副本替換原來的數據,這樣就不會影響讀了。
讀取的實現:
讀取沒有任何同步和鎖的操做,理由是內部數組array不會發生修改,只會被另一個array替換。
寫人的實現:
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); //1 newElements[len] = e;//2 setArray(newElements);//3 return true; } finally { lock.unlock(); } }
首先,寫入操做使用鎖,這個鎖僅限於控制寫-寫的狀況。重點在1處,進行了內部元素的複製,而生成一個新數組newElements。2處,將新元素增長到數組末尾,而後,將新數組替換老數組,修改就完成了。整個過程不會影響讀取,而且讀取線程會實時查看到這個修改(array變量是volatile的)
如何實現多個線程間的數據共享呢?好比,線程A但願給線程B發一個消息,用什麼方式好呢?
咱們但願線程A可以通知到線程B,又但願線程A不知道線程B的存在。這樣對於之後線程B的升級或維護,而不用再修改線程A有幫助。爲了實現這一點,咱們可使用一箇中間件BlockingQueue來實現。它就至關於一個意見箱,用來做爲發表意見者與接收意見者溝通的橋樑。
BlockingQueue和以前提到的ConcurrentLinkedQueue和CopyOnWriteArrayList不一樣,它是一個接口,而不是具體的實現。它的主要實現以下圖:
ArrayBlockingQueue:基於數組實現。更適合作有界隊列,擴展比較不方便
LinkedBlockingQueue:基於鏈表。更適合作無界隊列,由於其內部元素可動態增長。
BlockingQueue爲何適合做爲數據共享的通道呢?緣由在於Blocking(阻塞)。
當服務線程(指獲取隊列中消息並進行處理的線程)處理完隊列中全部的消息後,服務線程是如何知道下一條消息的到來的?BlockingQueue會讓服務線程在隊列爲空時,進行等待,當有新的消息進入隊列後,自動將線程喚醒。
它是如何工做的?以ArrayBlockingQueue爲例說明:
寫入數據:
它有一個items,items就是用來存放數據的隊列。offer()
在列隊滿時,返回false。咱們關注的是put()
方法,put()
也是將元素壓入隊列隊尾,但隊列滿了,它會一直等待,直到隊列中有空閒位置。
讀取數據:
poll()、take()兩個方法都能從隊列中的頭部彈出一個元素。不一樣的是:若是隊列爲空poll()方法直接返回null。而take()方法會等待,直到隊列內有可用元素。
從上面能夠看出,put()
、take()
方法纔是Blocking的關鍵。爲了作好通知和等待兩件事,ArrayBlockingQueue定義了三個字段:
take()操做:
當隊列爲空時,讓當前線程等待在notEmpty,新元素入隊時,則進行一次notEmpty上的通知。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //1 return dequeue(); } finally { lock.unlock(); } }
在1處,要求線程在notEmpty對象中等待。下面是元素入隊的一段代碼:
/** * 在當前put位置插入元素、進給和信號。 * 只有在持有鎖時才調用。 */ private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); //1 }
在1處,當新元素入列後,須要通知等待在notEmpty上的線程,讓它們繼續工做。
put()操做:
當隊列滿時,須要讓 壓入線程 等待:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //1 enqueue(e); } finally { lock.unlock(); } }
在1處,隊列滿時,在notFull對象中等待。
固然,當元素從隊列中挪走時,隊列中有空位時,天然也要通知等待入隊的線程:
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); //1 return x; }
咱們還會在「5.3 生產者消費者」一節中,看到他們的身影。在那裏,咱們能夠更清楚地看到如何使用BlockingQueue解耦生產者和消費者。
介紹跳錶
除了經常使用的哈希表外,還有一種有趣的數據結構:跳錶。跳錶的本質是同時維護了多個鏈表,而且鏈表是分層的。跳錶的查詢性能要比哈希表好。以下圖
最低層的鏈表維護了跳錶中全部的元素,每上面一層都是下面一層的子集,一個元素插入哪一層徹底隨機,運氣很差可能獲得性能最差的結構。可是實際工做中,它仍是表現得很好的。
跳錶內全部元素都是排序的。查找時,從頂級鏈表開始找,一旦發現被查找的元素大於當前鏈表中的取值,就會轉入下一層鏈表繼續查找。好比要查找上面跳錶結構中的7。查找過程以下圖所示:
跳錶顯然是一種空間換時間的算法。
使用跳錶實現的Map和使用哈希算法實現的Map的另外一個不一樣之處是:跳錶實現的Map是會排序的,而哈希實現的Map不排序。若須要一個有序的Map,那就選擇跳錶。
使用:ConcurrentSkipListMap
實現這一數據結構的類是ConcurrentSkipListMap
。簡單使用:
Map<Integer, Integer> map = new ConcurrentSkipListMap<>(); for (int i = 0; i<30; i++) map.put(i,i); for (Map.Entry<Integer, Integer> entry: map.entrySet() ) { System.out.println(entry.getKey()); }
跳錶有三個關鍵的數據結構組成:
Node<K,V>
:(節點,含有key、value、next元素,對Node的全部操做,都使用CAS方法)Index<K,V>
:(表示索引,它的內部包裝了node,同時增長了向下和向右的引用),整個跳錶就是根據Index進行全網的組織的。下面是三種數據結構的代碼: