java併發編程--Executor框架

摘要:java

       Eexecutor做爲靈活且強大的異步執行框架,其支持多種不一樣類型的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的線程至關於生產者,執行任務的線程至關於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命週期的支持,以及統計信息收集,應用程序管理機制和性能監視等機制。
 
1.Exexctor簡介
Executor的UML圖:(經常使用的幾個接口和子類)

Executor:一個接口,其定義了一個接收Runnable對象的方法executor,其方法簽名爲executor(Runnable command),
 
ExecutorService: 是一個比Executor使用更普遍的子類接口,其提供了生命週期管理的方法,以及可跟蹤一個或多個異步任務執行情況返回Future的方法
 
AbstractExecutorService:ExecutorService執行方法的默認實現
 
ScheduledExecutorService:一個可定時調度任務的接口
 
ScheduledThreadPoolExecutor:ScheduledExecutorService的實現,一個可定時調度任務的線程池
 
ThreadPoolExecutor:線程池,能夠經過調用Executors如下靜態工廠方法來建立線程池 並返回一個ExecutorService對象
 
2.ThreadPoolExecutor構造函數的各個參數說明
ThreadPoolExecutor方法簽名:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //後兩個參數爲可選參數
參數說明:
corePoolSize:核心線程數,若是運行的線程少於corePoolSize,則建立新線程來執行新任務,即便線程池中的其餘線程是空閒的
 
maximumPoolSize:最大線程數,可容許建立的線程數,corePoolSize和maximumPoolSize設置的邊界自動調整池大小:
corePoolSize <運行的線程數<  maximumPoolSize:僅當隊列滿時才建立新線程
corePoolSize=運行的線程數=  maximumPoolSize:建立固定大小的線程池
 
keepAliveTime:若是線程數多於corePoolSize,則這些多餘的線程的空閒時間超過keepAliveTime時將被終止
 
unit:keepAliveTime參數的時間單位
 
workQueue:保存任務的阻塞隊列,與線程池的大小有關:
  當運行的線程數少於corePoolSize時,在有新任務時直接建立新線程來執行任務而無需再進隊列
  當運行的線程數等於或多於corePoolSize,在有新任務添加時則選加入隊列,不直接建立線程
  當隊列滿時,在有新任務時就建立新線程
 
threadFactory:使用ThreadFactory建立新線程,默認使用defaultThreadFactory建立線程
 
handle:定義處理被拒絕任務的策略,默認使用ThreadPoolExecutor.AbortPolicy,任務被拒絕時將拋出RejectExecutorException
 
3.Executors:提供了一系列靜態工廠方法用於建立各類線程池
   newFixedThreadPool:建立可重用且固定線程數的線程池,若是線程池中的全部線程都處於活動狀態,此時再提交任務就在隊列中等待,直到有可用線程;若是線程池中的某個線程因爲異常而結束時,線程池就會再補充一條新線程。
方法簽名:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  //使用一個基於FIFO排序的阻塞隊列,在全部corePoolSize線程都忙時新任務將在隊列中等待
                                  new LinkedBlockingQueue<Runnable>());
}
   newSingleThreadExecutor:建立一個單線程的Executor,若是該線程由於異常而結束就新建一條線程來繼續執行後續的任務
方法簽名:
public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
                     //corePoolSize和maximumPoolSize都等於,表示固定線程池大小爲1
                        (new ThreadPoolExecutor(1, 1,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>()));
}
   newScheduledThreadPool:建立一個可延遲執行或按期執行的線程池
方法簽名:
例1:(使用newScheduledThreadPool來模擬心跳機制)
 1 public class HeartBeat {
 2     public static void main(String[] args) {
 3         ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
 4         Runnable task = new Runnable() {
 5             public void run() {
 6                 System.out.println("HeartBeat.........................");
 7             }
 8         };
 9         executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS);   //5秒後第一次執行,以後每隔3秒執行一次
10     }
11 }
輸出:
HeartBeat....................... //5秒後第一次輸出
HeartBeat....................... //每隔3秒輸出一個
   newCachedThreadPool:建立可緩存的線程池,若是線程池中的線程在60秒未被使用就將被移除,在執行新的任務時,當線程池中有以前建立的可用線程就重      用可用線程,不然就新建一條線程
方法簽名:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  //使用同步隊列,將任務直接提交給線程
                                  new SynchronousQueue<Runnable>());
}
 
例2:
public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
     ExecutorService threadPool = Executors.newCachedThreadPool();//線程池裏面的線程數會動態變化,並可在線程線被移除前重用
        for (int i = 1; i <= 3; i ++) {
            final  int task = i;   //10個任務
            //TimeUnit.SECONDS.sleep(1);
            threadPool.execute(new Runnable() {    //接受一個Runnable實例
                public void run() {
                        System.out.println("線程名字: " + Thread.currentThread().getName() +  "  任務名爲: "+task);
                }
            });
        }
    }
}
輸出:(爲每一個任務新建一條線程,共建立了3條線程)
線程名字: pool-1-thread-1 任務名爲: 1
線程名字: pool-1-thread-2 任務名爲: 2
線程名字: pool-1-thread-3 任務名爲: 3
去掉第6行的註釋其輸出以下:(始終重複利用一條線程,由於newCachedThreadPool能重用可用線程)
線程名字: pool-1-thread-1 任務名爲: 1
線程名字: pool-1-thread-1 任務名爲: 2
線程名字: pool-1-thread-1 任務名爲: 3
經過使用Executor能夠很輕易的實現各類調優  管理  監視  記錄日誌和錯誤報告等待。
 
4.Executor的生命週期
ExecutorService提供了管理Eecutor生命週期的方法,ExecutorService的生命週期包括了:運行  關閉和終止三種狀態。
 
ExecutorService在初始化建立時處於運行狀態。
shutdown方法等待提交的任務執行完成並再也不接受新任務,在完成所有提交的任務後關閉
shutdownNow方法將強制終止全部運行中的任務並再也不容許提交新任務
 
能夠將一個Runnable(如例2)或Callable(如例3)提交給ExecutorService的submit方法執行,最終返回一上Futire用來得到任務的執行結果或取消任務
例3:(任務執行完成後並返回執行結果)
public class CallableAndFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Callable<String>() {   //接受一上callable實例
            public String call() throws Exception {
                return "MOBIN";
            }
        });
        System.out.println("任務的執行結果:"+future.get());
    }
}
輸出:
任務的執行結果:MOBIN
 
ExecutorCompletionService:實現了CompletionService,將執行完成的任務放到阻塞隊列中,經過take或poll方法來得到執行結果
例4:(啓動10條線程,誰先執行完成就返回誰)
public class CompletionServiceTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);        //建立含10.條線程的線程池
        CompletionService completionService = new ExecutorCompletionService(executor);
        for (int i =1; i <=10; i ++) {
            final  int result = i;
            completionService.submit(new Callable() {
                public Object call() throws Exception {
                    Thread.sleep(new Random().nextInt(5000));   //讓當前線程隨機休眠一段時間
                    return result;
                }
            });
        }
        System.out.println(completionService.take().get());   //獲取執行結果
    }
}
輸出結果可能每次都不一樣(在1到10之間)
3
 
經過Executor來設計應用程序能夠簡化開發過程,提升開發效率,並有助於實現併發,在開發中若是須要建立線程可優先考慮使用Executor
相關文章
相關標籤/搜索