多線程的設計方法確實能夠最大限度的發揮多核處理器的計算能力,提升吞吐量和性能。可是若是不加控制隨意使用線程,對系統的性能反而會產生不利。java
和進程相比,線程雖然是一種輕量級的,可是建立和關閉依然須要花費時間,若是每個小的任務都建立一個線程,則會頗有可能出現建立和銷燬線程佔用的時間大於該線程任務所消耗的時間。其次線程自己也是須要佔用內存空間的,大量的線程會搶佔寶貴的內存資源。編程
所以線程的使用須要掌握一個度,再有限的範圍內增長線程的數量能夠提升系統的性能,一旦超過這個範圍,大量的線程只會拖垮整個系統。bash
爲了不繫統頻繁的建立和銷燬線程,咱們可讓建立的線程複用。咱們可使用一個線程池維護一些線程,當你須要使用線程的時候,能夠從池子中隨便拿一個空閒線程,當完成工做時,並不急着關閉線程,而是將這些線程退回到線程池中,方便下次使用。多線程
簡而言之,再使用線程池後,建立線程編程了從線程池中得到空閒線程,關閉線程變爲想線程池歸還線程。併發
線程池的成員都在java.util.concurrent包中,是JDK併發包的核心。其中ThreadPoolExecutor表示一個線程池。Executors類則是一個線程工廠的角色,經過Executors能夠取得一個擁有特定功能的線程池,經過Executors能夠取得一個特定功能的線程池。ide
該方法返回一個固定線程數量的線程池。該線程池中的線程數量始終不變。當有一個新的任務提交時,線程池中如有空閒線程,則當即執行。若沒有則新的任務會暫存在一個任務隊列中,待有線程空閒時,便處理任務隊列中的隊列。函數
該方法返回一個只有一個線程的線程池。如有多餘的任務被提交到線程池,任務會被保存在一個任務隊列中,待線程空閒,按先入先出的順序執行隊列中的任務。性能
該方法返回一個可根據實際狀況調整線程數量的線程池。線程池的線程數量不肯定,但如有空閒線程能夠複用,則會優先使用可複用的線程。若全部的線程都在工做,又有新的任務提交,則會建立新的線程處理任務。全部線程在當前任務執行完畢後,將返回線程池進行復用。ui
該方法返回一個ScheduledExecutorService對象,線程池大小爲1。ScheduledExecutorService接口在ExecutorService接口上擴展了在給定時間執行某任務的功能,如在某個固定的延時後執行,或者週期性執行某個任務。this
該方法會返回一個ScheduledExecutorService對象,但該線程池能夠執行線程數量。
建立固定大小的線程池
public class ThreadPoolThread {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MyTask myTask = new MyTask();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0;i< 10 ;i++){
executorService.submit(myTask);
}
}
}
}
複製代碼
1562554721820:Thread ID: 12
1562554721820:Thread ID: 15
1562554721820:Thread ID: 16
1562554721820:Thread ID: 13
1562554721820:Thread ID: 14
1562554722821:Thread ID: 15
1562554722821:Thread ID: 16
1562554722821:Thread ID: 12
1562554722821:Thread ID: 13
1562554722821:Thread ID: 14
複製代碼
計劃執行任務
newScheduledThreadPool()方法返回一個ScheduledExecutorService對象,能夠根據時間須要對線程進行調度。主要方法以下
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
複製代碼
和其餘線程不一樣,ScheduledExecutorService不必定會當即安排執行任務。他實際上是起到了計劃任務的做用,會在指定的時間對任務進行調度。
schedule()會在給定時間對任務進行一次調度。scheduleAtFixedRate()和scheduleWithFixedDelay()方法會對任務進行週期性調度,可是兩者仍是有區別的。scheduleAtFixedRate()方法的任務調度頻率是必定的,它是以上一個任務開始執行的時間爲起點,再在規定的時間調度下一次任務。而scheduleWithFixedDelay()方法是以上一個任務的結束後再通過規定時間進行任務調度。
public class ScheduleExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}
複製代碼
1562555518798
1562555520798
1562555522798
1562555524799
1562555526800
複製代碼
能夠看出任務每兩秒被調度一次。
若是任務的執行時間大於調度時間,則任務就會在上一個任務結束後當即被調用。
將代碼修改成8秒
Thread.sleep(8000);
複製代碼
1562555680333
1562555688333
1562555696333
1562555704333
複製代碼
調度程序實際上並不保證任務會無限期的持續調用,若是任務自己拋出異常,那麼後續全部執行都會中斷。
對於幾個核心的線程池,雖然看着建立的線程池有着不一樣的功能特色,可是其內部都是使用了ThreadPoolExecutor類。
看幾個線程池的建立源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
複製代碼
能夠看出他們都是ThreadPoolExecutor類的封裝,來看一下ThreadPoolExecutor類的構造:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
參數workQueue指被提交但未執行的任務隊列,他是一個BlockingQueue接口的對象,僅用於存放Runnable對象。在ThreadPoolExecutor構造中可使用一下幾種BlockingQueue接口:
拒絕策略是當任務數量超過系統實際承載能力時執行的策略。拒絕策略能夠說時系統超負荷運行時的補救措施。
JDK內置了四種拒絕策略:
上面的策略都實現了RejectedExecutionHandler接口,若是以上策略沒法知足實際開發,能夠本身擴展。
RejectedExecutionHandler接口構造:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
複製代碼
自定義拒絕策略:
//拒絕策略demo
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread ID : " + Thread.currentThread().getId());
try{
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask myTask = new MyTask();
ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.privilegedThreadFactory(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "被拒絕");
}
});
for (int i = 0;i<Integer.MAX_VALUE;i++){
es.submit(myTask);
Thread.sleep(10);
}
}
}
複製代碼
1562575292467: Thread ID : 14
1562575292478: Thread ID : 15
1562575292489: Thread ID : 16
java.util.concurrent.FutureTask@b4c966a被拒絕
java.util.concurrent.FutureTask@2f4d3709被拒絕
java.util.concurrent.FutureTask@4e50df2e被拒絕
複製代碼
ThreadFactory是一個接口,他只有一個用來建立線程的方法。
Thread newThread(Runnable r);
複製代碼
經過自定義線程建立咱們能夠跟蹤線程池在什麼時候建立了多少線程,自定義線程名等。
public class ThreadFactoryDemo {
static volatile int i = 0;
public static class TestTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) {
TestTask testTask = new TestTask();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"test--" + i);
i++;
return thread;
}
});
for (int i = 0;i<5;i++){
threadPoolExecutor.submit(testTask);
}
}
}
複製代碼
test--0
test--1
test--4
test--2
test--3
複製代碼
雖然JDK已經幫咱們實現了穩定的線程池,但若是咱們想要對線程池進行一些擴展,好比監控任務執行的開始和結束時間怎麼辦呢。
ThreadPoolExecutor是一個能夠擴展的線程池,它提供了beforExecutor(),afterExecutor()和terminated()三個接口來對其進行擴展。
public class ThreadFactoryDemo {
static volatile int i = 0;
public static class TestTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) {
TestTask testTask = new TestTask();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"test--" + i);
i++;
return thread;
}
}){
@Override
protected void beforeExecute(Thread t,Runnable r){
System.out.println("task-----準備執行");
}
};
for (int i = 0;i<5;i++){
threadPoolExecutor.submit(testTask);
}
}
}
複製代碼
task-----準備執行
task-----準備執行
test--2
task-----準備執行
test--1
task-----準備執行
test--4
task-----準備執行
test--3
test--0
複製代碼
execute提交的方式只能提交一個Runnable的對象,且該方法的返回值是void,也便是提交後若是線程運行後,和主線程就脫離了關係了,固然能夠設置一些變量來獲取到線程的運行結果。而且當線程的執行過程當中拋出了異常一般來講主線程也沒法獲取到異常的信息的,只有經過ThreadFactory主動設置線程的異常處理類才能感知到提交的線程中的異常。
sumbit()方法有三種形式:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
sumbit方法會返回一個Future對象,這個Future對象表明這線程的執行結果,當主線程調用Future的get方法的時候會獲取到從線程中返回的結果數據。若是在線程的執行過程當中發生了異常,get會獲取到異常的信息。