多線程(四)

線程池和Exector框架javascript

什麼是線程池? java

  1. 下降資源的消耗
  2. 提升響應速度,任務:T1建立線程時間,T2任務執行時間,T3線程銷燬時間,線程池沒有或者減小T1和T3
  3. 提升線程的可管理性。

線程池要作些什麼? 程序員

  1. 線程的容器和管理器
  2. 工做線程
  3. 任務接口
  4. 任務的容器
public class MyThreadPool {

    //默認的線程個數
    private int work_num = 5;

    //線程的容器
    private WorkThread[] workThreads;

    //任務隊列
    private List<Runnable> taskQueue = new LinkedList<>();

    public MyThreadPool(int work_num) {
        this.work_num = work_num;
        workThreads = new WorkThread[work_num];
        for(int i=0;i<work_num;i++){
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
    }

    //提交任務的接口
    public void execute(Runnable task){
        synchronized (taskQueue){
            taskQueue.add(task);
            taskQueue.notify();
        }
    }

    //銷燬線程池
    public void destroy(){
        System.out.println("ready stop pool....");
        for(int i=0;i<work_num;i++){
            workThreads[i].stopWorker();
            workThreads[i] = null;//加速垃圾回收
        }
        taskQueue.clear();
    }

    //工做線程
    private class WorkThread extends Thread{

        private volatile boolean on = true;

        public void run(){
            Runnable r = null;
            try{
                while(on&&!isInterrupted()){
                    synchronized (taskQueue){
                        //任務隊列中無任務,工做線程等待
                        while(on&&!isInterrupted()&&taskQueue.isEmpty()){
                            taskQueue.wait(1000);
                        }
                        //任務隊列中有任務,拿任務作事
                        if(on&&!isInterrupted()&&!taskQueue.isEmpty()){
                            r = taskQueue.remove(0);
                        }
                    }
                    if (r!=null){
                        System.out.println(getId()+" ready execute....");
                        r.run();
                    }
                    //加速垃圾回收
                    r = null;
                }

            }catch(InterruptedException e){
                System.out.println(Thread.currentThread().getId()+" is Interrupted");
            }
        }

        public void stopWorker(){
            on = false;
            interrupt();
        }

    }

}
public class TestMyThreadPool {
	public static void main(String[] args) throws InterruptedException {
		// 建立3個線程的線程池
		MyThreadPool t = new MyThreadPool(3);
		t.execute(new MyTask("testA"));
		t.execute(new MyTask("testB"));
		t.execute(new MyTask("testC"));
		t.execute(new MyTask("testD"));
		t.execute(new MyTask("testE"));
		System.out.println(t);
		Thread.sleep(3000);
		// t.destroy();// 全部線程都執行完成才destory
		System.out.println(t);
	}

	// 任務類
	static class MyTask implements Runnable {

		private String name;
		private Random r = new Random();

		public MyTask(String name) {
			this.name = name;
		}

		public String getName() {
			return name;
		}

		@Override
		public void run() {// 執行任務
			try {
				Thread.sleep(r.nextInt(1000) + 2000);
			} catch (InterruptedException e) {
				System.out.println(Thread.currentThread().getId()
						+ " sleep InterruptedException:"
						+ Thread.currentThread().isInterrupted());
			}
			System.out.println("任務 " + name + " 完成");
		}
	}
}
運行結果
13 ready execute....
com.dongnaoedu.mypool.MyThreadPool@7852e922
12 ready execute....
11 ready execute....
任務 testC 完成
13 ready execute....
任務 testA 完成
11 ready execute....
任務 testB 完成
com.dongnaoedu.mypool.MyThreadPool@7852e922
任務 testD 完成
任務 testE 完成

線程池的主要處理流程 小程序

1)線程池判斷核心線程池裏的線程是否都在執行任務。若是不是,則建立一個新的工做服務器

線程來執行任務。若是核心線程池裏的線程都在執行任務,則進入下個流程。多線程

2)線程池判斷工做隊列是否已經滿。若是工做隊列沒有滿,則將新提交的任務存儲在這併發

個工做隊列裏。若是工做隊列滿了,則進入下個流程。框架

3)線程池判斷線程池的線程是否都處於工做狀態。若是沒有,則建立一個新的工做線程dom

來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。異步

 

ThreadPoolExecutor執行execute()方法的示意

1)若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟

須要獲取全局鎖)。

2)若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。

3)若是沒法將任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。

4)若是建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

線程池的建立各個參數含義

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize

線程池中的核心線程數,當提交一個任務時,線程池建立一個新線程執行任務,直到當前線程數等於corePoolSize;

若是當前線程數爲corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;

若是執行了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。

maximumPoolSize

線程池中容許的最大線程數。若是當前阻塞隊列滿了,且繼續提交任務,則建立新的線程執行任務,前提是當前線程數小於maximumPoolSize

keepAliveTime

線程空閒時的存活時間,即當線程沒有任務執行時,繼續存活的時間。默認狀況下,該參數只在線程數大於corePoolSize時纔有用

TimeUnit

keepAliveTime的時間單位

workQueue

workQueue必須是BlockingQueue阻塞隊列。當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。經過workQueue,線程池實現了阻塞功能

threadFactory

建立線程的工廠,經過自定義的線程工廠能夠給每一個新建的線程設置一個具備識別度的線程名

Executors靜態工廠裏默認的threadFactory,線程的命名規則是「pool-數字-thread-數字」

RejectedExecutionHandler(飽和策略)

線程池的飽和策略,當阻塞隊列滿了,且沒有空閒的工做線程,若是繼續提交任務,必須採起一種策略處理該任務,線程池提供了4種策略:

(1)AbortPolicy:直接拋出異常,默認策略;

(2)CallerRunsPolicy:用調用者所在的線程來執行任務;

(3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;

(4)DiscardPolicy:直接丟棄任務;

固然也能夠根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。

線程池使用示例

public class UseThreadPool {

	static class MyTask implements Runnable {

		private String name;

		public MyTask(String name) {
			this.name = name;
		}

		public String getName() {
			return name;
		}

		@Override
		public void run() {// 執行任務
			try {
				Random r = new Random();
				Thread.sleep(r.nextInt(1000) + 2000);
			} catch (InterruptedException e) {
				System.out.println(Thread.currentThread().getId()
						+ " sleep InterruptedException:"
						+ Thread.currentThread().isInterrupted());
			}
			System.out.println("任務 " + name + " 完成");
		}
	}

	public static void main(String[] args) {
		// 建立線程池 2 核心線程數 4最大線程數 60存活時間 TimeUnit時間單位 任務隊列大小爲10
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
		for (int i = 0; i <= 5; i++) {
			MyTask task = new MyTask("Task_" + i);
			System.out.println("A new task will add:" + task.getName());
			// 提交到線程池
			threadPoolExecutor.execute(task);
		}
		threadPoolExecutor.shutdown();
	}

}

提交任務

Execute提交不須要用返回值的任務

Submit 提交須要返回值的任務,返回值是個Future類型的對象,調用futrure的get方法(阻塞方法)來獲取返回值

關閉線程池

ShutDown():interrupt方法來終止線程

shutDownNow() 嘗試中止全部正在執行的線程

合理地配置線程池

線程數配置:

任務:計算密集型,IO密集型,混合型

計算密集型=計算機的cpu數或計算機的cpu數+1(應付頁缺失)

IO密集型=計算機的cpu數*2

混合型,拆分紅計算密集型,IO密集型

Runtime.getRuntime().availableProcessors();當前機器中的cpu核心個數

儘可能用有界隊列,不要使用無界隊列

 

Executor框架調度模型

在HotSpot VM的線程模型中,Java線程(java.lang.Thread)被一對一映射爲本地操做系統線程。Java線程啓動時會建立一個本地操做系統線程;當該Java線程終止時,這個操做系統線程也會被回收。操做系統會調度全部線程並將它們分配給可用的CPU。

在上層,Java多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器(Executor框架)將這些任務映射爲固定數量的線程;在底層,操做系統內核將這些線程映射到硬件處理器上。

從圖中能夠看出,應用程序經過Executor框架控制上層的調度;而下層的調度由操做系統內核控制,下層的調度不受應用程序的控制。

三大組成部分:任務,任務的執行,異步計算的結果

任務

包括被執行任務須要實現的接口:Runnable接口或Callable接口。

任務的執行。

包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

異步計算的結果。

包括接口Future和實現Future接口的FutureTask類。

成員結構圖

Executor是一個接口,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。

ExecutorService接口繼承了Executor,在其上作了一些shutdown()、submit()的擴展,能夠說是真正的線程池接口;

AbstractExecutorService抽象類實現了ExecutorService接口中的大部分方法;

ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務。

ScheduledExecutorService接口繼承了ExecutorService接口,提供了帶"週期執行"功能ExecutorService;

ScheduledThreadPoolExecutor是一個實現類,能夠在給定的延遲後運行命令,或者按期執行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。

Future接口和實現Future接口的FutureTask類,表明異步計算的結果。

Runnable接口和Callable接口的實現類,均可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor執行。

Executor框架基本使用流程

主線程首先要建立實現Runnable或者Callable接口的任務對象。

工具類Executors能夠把一個Runnable對象封裝爲一個Callable對象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。而後能夠把Runnable對象直接交給ExecutorService執行(ExecutorService.execute(Runnablecommand));或者也能夠把Runnable對象或Callable對象提交給ExecutorService執行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task))。

若是執行ExecutorService.submit(…),ExecutorService將返回一個實現Future接口的對象(到目前爲止的JDK中,返回的是FutureTask對象)。因爲FutureTask實現了Runnable,程序員也能夠建立FutureTask,而後直接交給ExecutorService執行。

最後,主線程能夠執行FutureTask.get()方法來等待任務執行完成。主線程也能夠執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。

 

ThreadPoolExecutor詳解

一般使用工廠類Executors來建立。Executors能夠建立3種類型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

ExecutorService threadPool2 = Executors.newFixedThreadPool(2);
ExecutorService threadPool3 = Executors.newSingleThreadExecutor();
ExecutorService threadPool4 = Executors.newCachedThreadPool();
ExecutorService threadPool6 = Executors.newWorkStealingPool();

FixedThreadPool詳解

建立使用固定線程數的FixedThreadPool的API。適用於爲了知足資源管理的需求,而須要限制當前線程數量的應用場景,適用於負載比較重的服務器。FixedThreadPool的corePoolSize和maximumPoolSize都被設置爲建立FixedThreadPool時指定的參數nThreads。

當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裏把keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止。

FixedThreadPool使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)。使用無界隊列做爲工做隊列會對線程池帶來以下影響。

1)當線程池中的線程數達到corePoolSize後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過corePoolSize。

2)因爲1,使用無界隊列時maximumPoolSize將是一個無效參數。

3)因爲1和2,使用無界隊列時keepAliveTime將是一個無效參數。

4)因爲使用無界隊列,運行中的FixedThreadPool(未執行方法shutdown()或

shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor詳解

建立使用單個線程的SingleThread-Executor的API,適用於須要保證順序地執行各個任務;而且在任意時間點,不會有多個線程是活動的應用場景。

corePoolSize和maximumPoolSize被設置爲1。其餘參數與FixedThreadPool相同。SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)。

CachedThreadPool詳解

建立一個會根據須要建立新線程的CachedThreadPool的API。大小無界的線程池,適用於執行不少的短時間異步任務的小程序,或者是負載較輕的服務器。

corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲Integer.MAX_VALUE,即maximumPool是無界的。這裏把keepAliveTime設置爲60L,意味着CachedThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。

FixedThreadPool和SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列。CachedThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列,但CachedThreadPool的maximumPool是無界的。這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷建立新線程。極端狀況下,CachedThreadPool會由於建立過多線程而耗盡CPU和內存資源。

WorkStealingPoolJDK7之後

利用全部運行的處理器數目來建立一個工做竊取的線程池,使用forkjoin實現。

 

ScheduledThreadPoolExecutor詳解

使用工廠類Executors來建立。Executors能夠建立2種類

型的ScheduledThreadPoolExecutor,以下。

·ScheduledThreadPoolExecutor。包含若干個線程的ScheduledThreadPoolExecutor。

·SingleThreadScheduledExecutor。只包含一個線程的ScheduledThreadPoolExecutor。

ScheduledThreadPoolExecutor適用於須要多個後臺線程執行週期任務,同時爲了知足資源管理的需求而須要限制後臺線程的數量的應用場景。

SingleThreadScheduledExecutor適用於須要單個後臺線程執行週期任務,同時須要保證順序地執行各個任務的應用場景。

對這4個步驟的說明。

1)線程1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。

2)線程1執行這個ScheduledFutureTask。

3)線程1修改ScheduledFutureTask的time變量爲下次將要被執行的時間。

4)線程1把這個修改time以後的ScheduledFutureTask放回DelayQueue中(Delay-Queue.add())。

有關提交定時任務的四個方法:

//向定時任務線程池提交一個延時Runnable任務(僅執行一次)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
//向定時任務線程池提交一個延時的Callable任務(僅執行一次)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//向定時任務線程池提交一個固定時間間隔執行的任務
public ScheduledFuture<?> scheduleAtFixedRate(
                       Runnablecommand,long initialDelay,long period,TimeUnit unit)
//向定時任務線程池提交一個固定延時間隔執行的任務
public ScheduledFuture<?> scheduleWithFixedDelay(
             Runnable command, long initialDelay,long delay, TimeUnit unit);

固定時間間隔的任務不論每次任務花費多少時間,下次任務開始執行時間是肯定的,固然執行任務的時間不能超過執行週期。

固定延時間隔的任務是指每次執行完任務之後都延時一個固定的時間。因爲操做系統調度以及每次任務執行的語句可能不一樣,因此每次任務執行所花費的時間是不肯定的,也就致使了每次任務的執行週期存在必定的波動。

注意:定時或延時任務中所涉及到時間、週期不能保證明時性及準確性,實際運行中會有必定的偏差。

ScheduleThreadPoolExecutor與Timer相比的優點。

(1)Timer是基於絕對時間的延時執行或週期執行,當系統時間改變,則任務的執行會受到的影響。而ScheduleThreadPoolExecutore中,任務時基於相對時間進行週期或延時操做。

(2)Timer也能夠提交多個TimeTask任務,但只有一個線程來執行全部的TimeTask,這樣併發性受到影響。而ScheduleThreadPoolExecutore能夠設定池中線程的數量。

(3)Timer不會捕獲TimerTask的異常,只是簡單地中止,這樣勢必會影響其餘TimeTask的執行。而ScheduleThreadPoolExecutore中,若是一個線程因某些緣由中止,線程池能夠自動建立新的線程來維護池中線程的數量。

public class ScheduleTask implements Runnable {

    public static enum OperType{
        None,OnlyThrowException,CatheException
    }

    public static SimpleDateFormat formater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private OperType operType;

    public ScheduleTask(OperType operType) {
        this.operType = operType;
    }

    @Override
    public void run() {

        switch (operType){
            case OnlyThrowException:
                System.out.println("Exception not catch:"+formater.format(new Date()));
                throw new RuntimeException("OnlyThrowException");
            case CatheException:
                try {
                    throw new RuntimeException("CatheException");
                } catch (RuntimeException e) {
                    System.out.println("Exception be catched:"+formater.format(new Date()));
                }
                break;
            case None:
                System.out.println("None :"+formater.format(new Date()));
        }
    }
}
public class TestSchedule {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);

        /**
         * 每隔一段時間打印系統時間,互不影響的建立並執行一個在給定初始延遲後首次啓用的按期操做,
         * 後續操做具備給定的週期;
         * 也就是將在 initialDelay 後開始執行,週期爲period。
         */
        exec.scheduleAtFixedRate(new ScheduleTask(ScheduleTask.OperType.None),
                1000,5000, TimeUnit.MILLISECONDS);

        // 開始執行後就觸發異常,next週期將不會運行
        exec.scheduleAtFixedRate(new ScheduleTask(ScheduleTask.OperType.OnlyThrowException),
                1000,5000, TimeUnit.MILLISECONDS);

        // 雖然拋出了運行異常,當被攔截了,next週期繼續運行
        exec.scheduleAtFixedRate(new ScheduleTask(ScheduleTask.OperType.CatheException),
                1000,5000, TimeUnit.MILLISECONDS);

        /**
         * 建立並執行一個在給定初始延遲後首次啓用的按期操做,
         * 隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
         */
        exec.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                System.out.println("scheduleWithFixedDelay:begin"
                        +ScheduleTask.formater.format(new Date()));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("scheduleWithFixedDelay:end"
                        +ScheduleTask.formater.format(new Date()));
            }
        },1000,5000,TimeUnit.MILLISECONDS);

        /**
         * 建立並執行在給定延遲後啓用的一次性操做。
         */
        exec.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("schedule running.....");
            }
        },5000,TimeUnit.MILLISECONDS);
    }
}
運行結果
None :2018-06-28 20:01:12  (5秒一次)
Exception not catch:2018-06-28 20:01:12   (執行後拋異常沒抓取 只執行了一次)
Exception be catched:2018-06-28 20:01:12    (5秒一次)
scheduleWithFixedDelay:begin2018-06-28 20:01:12  (5秒一次 停2秒)
scheduleWithFixedDelay:end2018-06-28 20:01:14     
schedule running.....   (只執行了一次)
None :2018-06-28 20:01:17
Exception be catched:2018-06-28 20:01:17
scheduleWithFixedDelay:begin2018-06-28 20:01:19
scheduleWithFixedDelay:end2018-06-28 20:01:21
None :2018-06-28 20:01:22
Exception be catched:2018-06-28 20:01:22
scheduleWithFixedDelay:begin2018-06-28 20:01:26
scheduleWithFixedDelay:end2018-06-28 20:01:28
.........
.........

scheduleAtFixedRate定時任務超時問題

若任務處理時長超出設置的定時頻率時長,本次任務執行完纔開始下次任務,下次任務已經處於超時狀態,會立刻開始執行.

若任務處理時長小於定時頻率時長,任務執行完後,定時器等待,下次任務會在定時器等待頻率時長後執行

以下例子:

設置定時任務每60s執行一次

若第一次任務時長80s,第二次任務時長20ms,第三次任務時長50ms

第一次任務第0s開始,第80s結束;

第二次任務第80s開始,第110s結束;(上次任務已超時,本次不會再等待60s,會立刻開始),

第三次任務第150s開始,第200s結束.

第四次任務第210s開始.....

Callable、Future和FutureTask詳解

Future接口和實現Future接口的FutureTask類用來表示異步計算的結果。

當咱們把Runnable接口或Callable接口的實現類提交(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會向咱們返回一個FutureTask對象。

Runnable接口和Callable接口的實現類,均可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor執行。它們之間的區別是Runnable不會返回結果,而Callable能夠返回結果。

除了能夠本身建立實現Callable接口的對象外,還可使用工廠類Executors來把一個Runnable包裝成一個Callable。

Executors提供的,把一個Runnable包裝成一個Callable的API。

public static Callable<Object> callable(Runnable task)  // 假設返回對象Callable1

Executors提供的,把一個Runnable和一個待返回的結果包裝成一個Callable的API。

public static <T> Callable<T> callable(Runnable task, T result)  // 假設返回對象Callable2

當任務成功完成後FutureTask.get()將返回該任務的結果。例如,若是提交的是對象Callable1,FutureTask.get()方法將返回null;若是提交的是對象Callable2,FutureTask.get()方法將返回result對象。

FutureTask除了實現Future接口外,還實現了Runnable接口。所以,FutureTask能夠交給Executor執行,也能夠由調用線程直接執行(FutureTask.run())。

當FutureTask處於未啓動或已啓動狀態時,執行FutureTask.get()方法將致使調用線程阻塞;當FutureTask處於已完成狀態時,執行FutureTask.get()方法將致使調用線程當即返回結果或拋出異常。

當FutureTask處於未啓動狀態時,執行FutureTask.cancel()方法將致使此任務永遠不會被執行;當FutureTask處於已啓動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程的方式來試圖中止任務;當FutureTask處於已啓動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成);當FutureTask處於已完成狀態時,執行FutureTask.cancel(…)方法將返回false。

public class ComputeTask implements Callable<Integer> {

    private Integer result =0;
    private String taskName ="";

    public ComputeTask(Integer result, String taskName) {
        this.result = result;
        this.taskName = taskName;
        System.out.println(taskName+"子任務已經建立");
    }

    @Override
    public Integer call() throws Exception {
        for(int i=0;i<100;i++){
            result = result+i;
        }
        Thread.sleep(2000);
        System.out.println(taskName+"子任務已經完成");
        return  result;
    }
}
public class FutureSample {
	public static void main(String[] args) {
		FutureSample futureSample = new FutureSample();
		futureSample.futureTask();
	}

	// 使用FutureTask
	private void futureTask() {
		// 建立任務集合
		List<FutureTask<Integer>> taskList = new ArrayList<>();
		ExecutorService exec = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			// 傳入Callable對象建立FutureTask對象
			FutureTask<Integer> ft = new FutureTask<Integer>(new ComputeTask(i,"task_" + i));
			taskList.add(ft);
			exec.submit(ft);
		}
		System.out.println("主線程已經提交任務,作本身的事!");

		// 開始統計各計算線程計算結果
		int totalResult = 0;
		for (FutureTask<Integer> ft : taskList) {
			try {
				// FutureTask的get方法會自動阻塞,直到獲取計算結果爲止
				totalResult = totalResult + ft.get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		System.out.println("total = " + totalResult);
		exec.shutdown();
	}

	// 使用Future
	private void future() {
		// 建立任務集合
		List<Future<Integer>> futureList = new ArrayList<>();
		ExecutorService exec = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			// 提交Callable對象建立Future對象
			Future<Integer> result = exec.submit(new ComputeTask(i, "task_" + i));
			futureList.add(result);
		}
		System.out.println("主線程已經提交任務,作本身的事!");

		// 開始統計各計算線程計算結果
		int totalResult = 0;
		for (Future<Integer> ft : futureList) {
			try {
				// FutureTask的get方法會自動阻塞,直到獲取計算結果爲止
				totalResult = totalResult + ft.get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		System.out.println("total = " + totalResult);
		exec.shutdown();
	}
}
運行結果
task_0子任務已經建立
task_1子任務已經建立
task_2子任務已經建立
task_3子任務已經建立
task_4子任務已經建立
task_5子任務已經建立
task_6子任務已經建立
task_7子任務已經建立
task_8子任務已經建立
task_9子任務已經建立
主線程已經提交任務,作本身的事!
task_4子任務已經完成
task_1子任務已經完成
task_0子任務已經完成
task_3子任務已經完成
task_2子任務已經完成
task_5子任務已經完成
task_9子任務已經完成
task_6子任務已經完成
task_7子任務已經完成
task_8子任務已經完成
total = 49545

CompletionService詳解

CompletionService實際上能夠看作是Executor和BlockingQueue的結合體。CompletionService在接收到要執行的任務時,經過相似BlockingQueue的put和take得到任務執行的結果。

CompletionService的一個實現是ExecutorCompletionService

ExecutorCompletionService把具體的計算任務交給Executor完成。在實現上,ExecutorCompletionService在構造函數中會建立一個BlockingQueue(使用的基於鏈表的無界隊列LinkedBlockingQueue),該BlockingQueue的做用是保存Executor執行的結果。當計算完成時,調用FutureTask的done方法。當提交一個任務到ExecutorCompletionService時,首先將任務包裝成QueueingFuture,它是FutureTask的一個子類,而後改寫FutureTask的done方法,以後把Executor執行的計算結果放入BlockingQueue中。

與ExecutorService最主要的區別在於submit的task不必定是按照加入時的順序完成的。CompletionService對ExecutorService進行了包裝,內部維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,纔會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,若是Queue是空的,就會阻塞在那裏,直到有完成的Future對象加入到Queue中。因此,先完成的一定先被取出。這樣就減小了沒必要要的等待時間。

public class WorkTask implements Callable<String> {

    private String name;

    public WorkTask(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        //休眠隨機時間,觀察獲取結果的行爲。
        int sleepTime = new Random().nextInt(1000);
        Thread.sleep(sleepTime);
        String str = name+" sleept time:"+sleepTime;
        System.out.println(str+" finished....");
        return str;
    }
}
public class CompletionTest {
	private final int POOL_SIZE = 5;
	private final int TOTAL_TASK = 10;

	// 方法一,本身寫集合來實現獲取線程池中任務的返回結果
	public void testByQueue() throws InterruptedException, ExecutionException {
		ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
		BlockingQueue<Future<String>> queue = new LinkedBlockingDeque<>();
		// 向裏面扔任務
		for (int i = 0; i < TOTAL_TASK; i++) {
			Future<String> future = pool.submit(new WorkTask("ExecTask" + i));
			queue.add(future);
		}

		// 檢查線程池任務執行結果
		for (int i = 0; i < TOTAL_TASK; i++) {
			System.out.println("ExecTask:" + queue.take().get());
		}

		pool.shutdown();

	}

	// 方法二,經過CompletionService來實現獲取線程池中任務的返回結果
	public void testByCompletion() throws InterruptedException,
			ExecutionException {
		ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
		CompletionService<String> service = new ExecutorCompletionService<String>(pool);

		// 向裏面扔任務
		for (int i = 0; i < TOTAL_TASK; i++) {
			service.submit(new WorkTask("ExecTask" + i));
		}

		// 檢查線程池任務執行結果
		for (int i = 0; i < TOTAL_TASK; i++) {
			Future<String> future = service.take();
			System.out.println("CompletionService:" + future.get());
		}

	}

	public static void main(String[] args) throws ExecutionException,
			InterruptedException {
		CompletionTest completionTest = new CompletionTest();
		// completionTest.testByQueue();
		completionTest.testByCompletion();
	}

}
運行結果
completionTest.testByQueue()

ExecTask4 sleept time:12 finished....
ExecTask1 sleept time:148 finished....
ExecTask3 sleept time:652 finished....
ExecTask6 sleept time:522 finished....
ExecTask5 sleept time:666 finished....
ExecTask0 sleept time:714 finished....
ExecTask:ExecTask0 sleept time:714
ExecTask:ExecTask1 sleept time:148
ExecTask8 sleept time:136 finished....
ExecTask2 sleept time:829 finished....
ExecTask:ExecTask2 sleept time:829
ExecTask:ExecTask3 sleept time:652
ExecTask:ExecTask4 sleept time:12
ExecTask:ExecTask5 sleept time:666
ExecTask:ExecTask6 sleept time:522
ExecTask9 sleept time:230 finished....
ExecTask7 sleept time:796 finished....
ExecTask:ExecTask7 sleept time:796
ExecTask:ExecTask8 sleept time:136
ExecTask:ExecTask9 sleept time:230


===============================
completionTest.testByCompletion();

ExecTask2 sleept time:89 finished....
CompletionService:ExecTask2 sleept time:89
ExecTask4 sleept time:137 finished....
CompletionService:ExecTask4 sleept time:137
ExecTask6 sleept time:495 finished....
CompletionService:ExecTask6 sleept time:495
ExecTask3 sleept time:738 finished....
CompletionService:ExecTask3 sleept time:738
ExecTask0 sleept time:812 finished....
CompletionService:ExecTask0 sleept time:812
ExecTask1 sleept time:972 finished....
CompletionService:ExecTask1 sleept time:972
ExecTask5 sleept time:921 finished....
CompletionService:ExecTask5 sleept time:921
ExecTask9 sleept time:473 finished....
CompletionService:ExecTask9 sleept time:473
ExecTask7 sleept time:711 finished....
CompletionService:ExecTask7 sleept time:711
ExecTask8 sleept time:880 finished....
CompletionService:ExecTask8 sleept time:880

總結:

使用方法一,本身建立一個集合來保存Future存根並循環調用其返回結果的時候,主線程並不能保證首先得到的是最早完成任務的線程返回值。它只是按加入線程池的順序返回。由於take方法是阻塞方法,後面的任務完成了,前面的任務卻沒有完成,主程序就那樣等待在那兒,只到前面的完成了,它才知道原來後面的也完成了。

使用方法二,使用CompletionService來維護處理線程不的返回結果時,主線程老是可以拿到最早完成的任務的返回值,而無論它們加入線程池的順序。

相關文章
相關標籤/搜索