Java線程池ExecutorService

系統裏面用到了線程池:html

 private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(16, 32,
            5L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(1000), new ThreadFactory() {

        private final ThreadGroup threadGroup = new ThreadGroup("fileTemplateMethodThreadGroup");

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(threadGroup, r, "fileTemplateMethod-thread-pool-" + threadNumber.getAndIncrement());
        }
    }, (r, executor) -> {
        if (!executor.isShutdown()) {
            /* 丟棄隊列最老的數據 */
            if (executor.getQueue().poll() != null) {
                Cat.logMetricForCount(CatConstant.METRIC_DISCARD_FILE_TASK_COUNT);
            }
            executor.execute(r);
        }
    });

我查了一下:一 Java經過Executors提供四種線程池,分別爲: 
newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。 
newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。 
newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。 
newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。java

我奇怪咱們的沒有這四個,可是有個:數據庫

 
 
private static final ExecutorService EXECUTOR_SERVICE = 
new ThreadPoolExecutor(16, 32, 5L, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000), new ThreadFactory() {})

其實咱們用的就是:newFixedThreadPool ,剛纔那四個線程池 你在看一下實現,其實就是咱們如今用的。編程

如下是newFixedThreadPool 的實現:緩存

Executors.newFixedThreadPool()
==>
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

 

看到了吧,跟咱們的寫法是同樣的,只是:用了不一樣的 LinkedBlockingQueue 也就是說當建立的線程超過最大線程數的時候,會放到 無界隊列裏面安全

LinkedBlockingQueue 咱們的是有界隊列:ArrayBlockingQueue

開篇前,咱們先來看看不使用線程池的狀況:多線程

new Thread的弊端
併發

執行一個異步任務你還只是以下new Thread嗎?  app

new Thread(new Runnable() {
 
    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
}).start();

 

那你就太out了,new Thread的弊端以下:
a. 每次new Thread新建對象性能差。
b. 線程缺少統一管理,可能無限制新建線程,相互之間競爭,及可能佔用過多系統資源致使死機或oom。
c. 缺少更多功能,如定時執行、按期執行、線程中斷。
相比new Thread,Java提供的四種線程池的好處在於:
a. 重用存在的線程,減小對象建立、消亡的開銷,性能佳。
b. 可有效控制最大併發線程數,提升系統資源的使用率,同時避免過多資源競爭,避免堵塞。
c. 提供定時執行、按期執行、單線程、併發數控制等功能。  dom

一 Java經過Executors提供四種線程池,分別爲: 
newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。 
newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。 
newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。 
newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。

2、 ExecutorService 的submit() 與execute()區別 
一、接收的參數不同 submit()能夠接受runnable和callable  有返回值
execute()接受runnable 無返回值

二、submit有返回值,而execute沒有

Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion.

用到返回值的例子,好比說我有不少個作validation的task,我但願全部的task執行完,而後每一個task告訴我它的執行結果,是成功仍是失敗,若是是失敗,緣由是什麼。

三、submit方便Exception處理

There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will Go to the uncaught exception handler (when you don’t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task’s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.

意思就是若是你在你的task裏會拋出checked或者unchecked exception,而你又但願外面的調用者可以感知這些exception並作出及時的處理,那麼就須要用到submit,經過捕獲Future.get拋出的異常。

複製代碼
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  

public class ExecutorServiceTest {  
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        List<Future<String>> resultList = new ArrayList<Future<String>>();  

        // 建立10個任務並執行  
        for (int i = 0; i < 10; i++) {  
            // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中  
            Future<String> future = executorService.submit(new TaskWithResult(i));  
            // 將任務執行結果存儲到List中  
            resultList.add(future);  
        }  
        executorService.shutdown();  

        // 遍歷任務的結果  
        for (Future<String> fs : resultList) {  
            try {  
                System.out.println(fs.get()); // 打印各個線程(任務)執行的結果  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {  
                executorService.shutdownNow();  
                e.printStackTrace();  
                return;  
            }  
        }  
    }  
}  

class TaskWithResult implements Callable<String> {  
    private int id;  

    public TaskWithResult(int id) {  
        this.id = id;  
    }  

    /** 
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,則該方法自動在一個線程上執行。 
     *  
     * @return 
     * @throws Exception 
     */  
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用,幹活!!!             " + Thread.currentThread().getName());  
        if (new Random().nextBoolean())  
            throw new TaskException("Meet error in task." + Thread.currentThread().getName());  
        // 一個模擬耗時的操做  
        for (int i = 999999999; i > 0; i--)  
            ;  
        return "call()方法被自動調用,任務的結果是:" + id + "    " + Thread.currentThread().getName();  
    }  
}  

class TaskException extends Exception {  
    public TaskException(String message) {  
        super(message);  
    }  
}  
複製代碼

執行的結果相似於:

call()方法被自動調用,幹活!!!             pool-1-thread-1 
call()方法被自動調用,幹活!!!             pool-1-thread-2 
call()方法被自動調用,幹活!!!             pool-1-thread-3 
call()方法被自動調用,幹活!!!             pool-1-thread-5 
call()方法被自動調用,幹活!!!             pool-1-thread-7 
call()方法被自動調用,幹活!!!             pool-1-thread-4 
call()方法被自動調用,幹活!!!             pool-1-thread-6 
call()方法被自動調用,幹活!!!             pool-1-thread-7 
call()方法被自動調用,幹活!!!             pool-1-thread-5 
call()方法被自動調用,幹活!!!             pool-1-thread-8 
call()方法被自動調用,任務的結果是:0    pool-1-thread-1 
call()方法被自動調用,任務的結果是:1    pool-1-thread-2 
java.util.concurrent.ExecutionException: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3 
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) 
    at java.util.concurrent.FutureTask.get(FutureTask.java:83) 
    at com.cicc.pts.ExecutorServiceTest.main(ExecutorServiceTest.java:29) 
Caused by: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3 
    at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:57) 
    at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:1) 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:619) 

 

 

能夠看見一旦某個task出錯,其它的task就中止執行。

3、shotdown() showdownNow()區別

能夠關閉 ExecutorService,這將致使其拒絕新任務。提供兩個方法來關閉 ExecutorService。 
shutdown() 方法在終止前容許執行之前提交的任務, 
shutdownNow() 方法阻止等待任務啓動並試圖中止當前正在執行的任務。在終止時執行程序沒有任務在執行,也沒有任務在等待執行,而且沒法提交新任務。關閉未使用的 ExecutorService 以容許回收其資源。 
通常分兩個階段關閉 ExecutorService。第一階段調用 shutdown 拒絕傳入任務,而後調用 shutdownNow(若有必要)取消全部遺留的任務

1
2
// 啓動一次順序關閉,執行之前提交的任務,但不接受新任務。
     threadPool.shutdown();

 

4、Runnable()與Callable()區別

若是是一個多線程協做程序,好比菲波拉切數列,1,1,2,3,5,8…使用多線程來計算。 
但後者須要前者的結果,就須要用callable接口了。 
callable用法和runnable同樣,只不過調用的是call方法,該方法有一個泛型返回值類型,你能夠任意指定。

runnable接口實現的沒有返回值的併發編程。 

 
這裏寫圖片描述

callable實現的存在返回值的併發編程。(call的返回值String受泛型的影響) 使用Future獲取返回值。 
這裏寫圖片描述

(1). newCachedThreadPool
建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。示例代碼以下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    final int index = i;
    try {
        Thread.sleep(index * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
 
    cachedThreadPool.execute(new Runnable() {
 
        @Override
        public void run() {
            System.out.println(index);
        }
    });
}

 

線程池爲無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程。

 
(2). newFixedThreadPool
建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。示例代碼以下:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
    final int index = i;
    fixedThreadPool.execute(new Runnable() {
 
 
        @Override
        public void run() {
            try {
                System.out.println(index);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

 

由於線程池大小爲3,每一個任務輸出index後sleep 2秒,因此每兩秒打印3個數字。

定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache。
 
(3) newScheduledThreadPool
建立一個定長線程池,支持定時及週期性任務執行。延遲執行示例代碼以下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
 
    @Override
    public void run() {
        System.out.println("delay 3 seconds");
    }
}, 3, TimeUnit.SECONDS);

 

 

表示延遲3秒執行。
 
按期執行示例代碼以下:

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
 
    @Override
    public void run() {
        System.out.println("delay 1 seconds, and excute every 3 seconds");
    }
}, 1, 3, TimeUnit.SECONDS);

 

 

表示延遲1秒後每3秒執行一次。
ScheduledExecutorService比Timer更安全,功能更強大,後面會有一篇單獨進行對比。
 
(4)、newSingleThreadExecutor
建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。示例代碼以下:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    final int index = i;
    singleThreadExecutor.execute(new Runnable() {
 
        @Override
        public void run() {
            try {
                System.out.println(index);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

 

 

結果依次輸出,至關於順序執行各個任務。
現行大多數GUI程序都是單線程的。Android中單線程可用於數據庫操做,文件操做,應用批量安裝,應用批量刪除等不適合併發但可能IO阻塞性及影響UI線程響應的操做。  

  總結:

    (1)使用ExecutorService的submit函數因爲execute函數

    (2)異常如何處理,異常後其餘task中止

參考:Java ExecutorService四種線程池的例子與說明 

參考:線程池 ExecutorService 詳細介紹以及注意點區別  

參考:Java線程池ExecutorService

相關文章
相關標籤/搜索