Java 多線程開發之 Callable 與線程池

前言

咱們常見的建立線程的方式有 2 種:繼承 Thread 和 實現 Runnable 接口。java

其實,在 JDK 中還提供了另外 2 種 API 讓開發者使用。後端

2、簡單介紹

2.1 Callable

Java 5.0 在 java.util.concurrent 提供了一個新的建立執行線程的方式: 實現 Callable 接口。數組

Callable 接口相似於 Runnable,可是 Runnable 不會返回結果,而且沒法拋出通過檢查的異常,而 Callable 依賴 FutureTask 類獲取返回結果。緩存

代碼演示:服務器

public class CallableTest { 多線程

  1. public static void main(String[] args) throws Exception { MyThread mt = new MyThread(); FutureTask<Integer> result = new FutureTask<Integer>(mt); new Thread(result).start(); // 獲取運算結果是同步過程,即 call 方法執行完成,才能獲取結果
    Integer sum = result.get(); System.out.println(sum); } } class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 1; i <= 100; i++) { sum += i; } return sum; } }

     

當某個請求須要在後端完成 N 次統計結果時,咱們就可使用該方式建立 N 個線程進行(並行)統計,而不須要同步等待其餘統計操做完成後才統計另外一個結果。併發

2.2 線程池

第四種獲取線程的方法:線程池。ide

經過重用現有的線程而不是建立新的線程能夠在處理多個請求時分攤在線程建立和銷燬過程當中產生的巨大開銷,同時當請求到達時,工做線程已經存在,所以不會因爲等待建立線程而延遲任何的執行,從而提升系統的響應性。spa

2.2.1 線程池體系結構

線程池體系結構:線程

2.2.2 ThreadPoolExecutor API

ThreadPoolExecutor 用於建立線程池,它有 4 個重載構造器,咱們以最多參數的構造器講解:

  1. ThreadPoolExecutor(int corePoolSize, # 線程池核心線程個數,默認線程池線程個數爲 0,只有接到任務才新建線程 int maximumPoolSize, # 線程池最大線程數量 long keepAliveTime, # 線程池空閒時,線程存活的時間,當線程池中的線程數大於 corePoolSize 時纔會起做用 TimeUnit unit, # 時間單位 BlockingQueue<Runnable> workQueue, # 阻塞隊列,當達到線程數達到 corePoolSize 時,將任務放入隊列等待線程處理 ThreadFactory threadFactory, # 線程工廠 RejectedExecutionHandler handler) # 線程拒絕策略,當隊列滿了而且線程個數達到 maximumPoolSize 後採起的策略

     

阻塞隊列有如下 4 種:

  1. ArrayBlockingQueue:基於數組、有界,按 FIFO(先進先出)原則對元素進行排序;
    LinkedBlockingQueue:基於鏈表,按FIFO (先進先出) 排序元素,吞吐量一般要高於 ArrayBlockingQueue;
    SynchronousQueue:每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於 LinkedBlockingQueue;
    PriorityBlockingQueue:具備優先級的、無限阻塞隊列。

     

線程拒絕策略有如下 4 種:

  1. CallerRunsPolicy:若是發現線程池還在運行,就直接運行這個線程;
    DiscardOldestPolicy:在線程池的等待隊列中,將頭取出一個拋棄,而後將當前線程放進去;
    DiscardPolicy:默默丟棄,不拋出異常;
    AbortPolicy:java默認,拋出一個異常(RejectedExecutionException)。

     

實現原則:

  1. 若是當前池大小 poolSize 小於 corePoolSize ,則建立新線程執行任務;
    若是當前池大小 poolSize 大於 corePoolSize ,且等待隊列未滿,則進入等待隊列;
    若是當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待隊列已滿,則建立新線程執行任務;
    若是當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務;
    線程池裏的每一個線程執行完任務後不會馬上退出,而是會去檢查下等待隊列裏是否還有線程任務須要執行,若是在 keepAliveTime 裏等不到新的任務了,那麼線程就會退出。

     

2.2.3 內置線程池

在 java.util.concurrent 包中已經提供爲大多數使用場景的內置線程池:

  1. Executors.newSingleThreadExecutor() # 單條線程 Executors.newFixedThreadPool(int n) # 固定數目線程的線程池 Executors.newCachedThreadPool() # 建立一個可緩存的線程池,調用execute 將重用之前構造的線程(若是線程可用)。若是現有線程沒有可用的,則建立一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。 Executors.newScheduledThreadPool(int n) # 支持定時及週期性的任務執行的線程池,多數狀況下可用來替代 Timer 類。

     

上述 4 種線程池底層都是經過建立 ThreadPoolExecutor 獲取線程池。

1) newSingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) ); }

     

主要用於串行(順序執行)操做場景。

2) newFixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

     

主要用於負載比較重的場景,爲了資源的合理利用,須要限制當前線程數量。

3) newCachedThreadPool

  1. public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

     

主要用於併發執行大量短時間的小任務,或者是負載較輕的服務器。

4) newScheduledThreadPool

  1. public static ScheduledExecutorService newScheduledThreadPool(int var0) { return new ScheduledThreadPoolExecutor(var0); }

     

其中,ScheduledExecutorService 繼承 ThreadPoolExecutor。

2.2.4 提交任務

獲取 ExecutorService 對象後,咱們須要提交任務來讓線程池中的線程執行,提交任務的方法有 2 種:

  1. void execute():提交不須要返回值的任務 Future<T> submit(): 提交須要返回值的任務

     

代碼演示:

  1. // 建立 1 個線程
    ExecutorService service = Executors.newSingleThreadExecutor(); service.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":hello world"); } }); // 關閉線程池
    service.shutdown(); // 建立 5 個線程
    ExecutorService service = Executors.newFixedThreadPool(5); List<Future<String>> list = new ArrayList<>(5); for (int i = 0; i < 5; i++) { Future<String> future = service.submit(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName() + ":hello world"; } }); list.add(future); } // 打印結果
    for (Future<String> future : list) { System.out.println(future.get()); } // 關閉線程池
    service.shutdown();

     

  1. // 建立 1 個線程
    ScheduledExecutorService service = Executors.newScheduledThreadPool(1); service.schedule(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":hello world"); } }, 2, TimeUnit.SECONDS);

     

其餘線程池使用方式相似,此處再也不列舉代碼。

相關文章
相關標籤/搜索