線程池——Executors

一 Executor框架

爲了更好地控制多線程,JDK提供了一套線程框架Executor,幫助開發人員有效的進行線程控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,它扮演着線程工廠的角色,咱們經過Executors能夠建立特定功能的線程池。
Executors建立線程池方法:java

  1. newFixedThreadPool():該方法返回一個固定數量的線程池,該方法的線程數始終不變,當有一個在任務提交時,若線程池中空閒,則當即執行,若沒有,則會被暫緩在一個任務隊列中等待有空閒的線程去執行
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  1. newSingleThreadExecutor():建立數量爲一個線程的線程池,若空閒則執行,若沒有空閒線程則暫緩在任務隊列中
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  1. newCacheThreadPool():返回一個可根據實際狀況調整線程個數的線程池,不限制最大線程數量,如有任務則建立線程去執行,若無任務則不繼續建立線程,而且每個空閒線程會在60秒後自動回收
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  1. newScheduledThreadPool():返回一個ScheduleExecutorService對象,但該線程池能夠指定線程的數量。
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

使用例子:緩存

class Temp extends Thread {
    @Override
    public void run() {
        System.out.println("run...");
    }
}

public class ScheduledJob {

    public static void main(String[] args) {
        Temp command = new Temp();
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleWithFixedDelay(command, 1, 3, TimeUnit.SECONDS);
        //1秒後執行線程,以後每隔3秒輪詢
    }
}

運行結果:
run...
run...
run...
run...
run...
run...
run...
...多線程

1.1 自定義線程池

若Executors工廠類沒法知足咱們的需求,能夠本身去建立自定義的線程池。其實Executors工廠類裏面的建立線程方法其內部實現均是用了ThreadPoolExecutor這個類,這個類能夠自定義線程,構造方法以下:併發

public ThreadPoolExecutor(int corePoolSize,    //表示當前建立的核心線程數
                              int maximumPoolSize,    //表示最大線程數
                              long keepAliveTime,    //線程池空閒時存活時間
                              TimeUnit unit,    //指定時間單位
                              BlockingQueue<Runnable> workQueue,    //緩存隊列
                              ThreadFactory threadFactory,    //
                              RejectedExecutionHandler handler) {    //拒絕執行的方法 
                              
    ... ...
                        
    }

這個構造方法對於隊列是什麼類型的比較關鍵:框架

  1. 使用有界任務隊列時:如有新的任務須要執行,若是線程池實際線程數小於corePoolSize,則優先建立線程;若大於corePoolSize,則會將任務加入隊列,若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,建立新的線程,若線程數大於maximumPoolSize,則執行拒絕策略。或其餘自定義方式
public class UserThreadPoolExecutor {

    public static void main(String[] args) {
        /**
         * 使用有界任務隊列時:如有新的任務須要執行,若是線程池實際線程數小於corePoolSize,則優先建立線程;
         * 若大於corePoolSize,則會將任務加入隊列,
         * 若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,建立新的線程,
         * 若線程數大於maximumPoolSize,則執行拒絕策略。
         */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,    //coreSize
                2,    //maxSize
                60,    //無效時間
                TimeUnit.SECONDS,    //單位
                new ArrayBlockingQueue<Runnable  >(3)    //有界隊列
                );
        pool.execute(new MyTask(1, "任務1"));
        pool.execute(new MyTask(2, "任務2"));
        pool.execute(new MyTask(3, "任務3"));
        pool.execute(new MyTask(4, "任務4"));
        pool.execute(new MyTask(5, "任務5"));
        pool.execute(new MyTask(6, "任務6"));
        pool.shutdown();
    }

}

運行結果:
run taskId = 1
run taskId = 5
Exception in thread "main"
java.util.concurrent.RejectedExecutionException: Task Thread[Thread-5,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@70dea4e[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at Executor.UserThreadPoolExecutor.main(UserThreadPoolExecutor.java:28)
run taskId = 2
run taskId = 3
run taskId = 4
分析:
任務數大於coreSize(目前爲1),則有任務加入隊列,又隊列(隊列容量爲3)已滿,則建立一個線程(目前coreSize爲2),因爲maxSize爲2,全部最多隻能再建立一個線程到線程池(目前coreSize+queue=5小於任務數6),沒法再建立線程,執行拒絕策略ide

  1. 使用無界任務隊列時:LinkedBlockingQueue。與有界隊列相比,除非系統資源耗盡,不然無界的任務隊列不存在入隊失敗的狀況。當有新任務到來,系統線程數小於corePoolSize時,則新建線程執行任務。當達到corePoolSize後,就不會繼續增長。若後續仍有新的任務加入,而又沒有空閒的線程資源,則任務直接進入隊列等待。若任務建立和處理的速度差別很大,無界隊列會保持快速增加,知道耗盡系統內存。
public class UserThreadPoolExecutor2 implements Runnable{
    
    /**
     * 當有新任務到來,系統線程數小於corePoolSize時,則新建線程執行任務,
     * 當達到corePoolSize後,就不會繼續增長,
     * 若後續仍有新的任務加入,而又沒有空閒的線程資源,則任務直接進入隊列等待。
     * 若任務建立和處理的速度差別很大,無界隊列會保持快速增加,知道耗盡系統內存
     */
    private static AtomicInteger count = new AtomicInteger(0);
    
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                5,
                10,
                120L,    //2分鐘
                TimeUnit.SECONDS,
                queue
                );
        for (int i = 0; i < 20; i++) {
            pool.execute(new UserThreadPoolExecutor2());
        }
        Thread.sleep(1000);
        System.out.println("queue size : " + queue.size());
        Thread.sleep(2000);
        //pool.shutdown();
    }

    @Override
    public void run() {
        try {
            int num = count.incrementAndGet();
            System.out.println("任務" + num);
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

運行結果:
任務2
任務4
任務1
任務3
任務5
queue size : 15
任務6
任務8
任務7
任務9
任務10
任務11
任務12
任務13
任務14
任務15
任務16
任務18
任務19
任務17
任務20線程

JDK拒絕策略:日誌

  1. AbortPolicy:直接拋出異常,系統正常工做
  2. CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務
  3. DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務
  4. DiscardPolicy:丟棄沒法處理的任務,不給於任何處理

若是須要自定義拒絕策略,能夠實現RejectedExecutionHandle接口:code

public class MyReject implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定義處理...");
        System.out.println("當前被拒絕任務爲:" + r.toString());
        //記錄日誌,等待其餘時間處理
    }
}

public class UserThreadPoolExecutor1 {

    public static void main(String[] args) throws InterruptedException {
        /**
         * 使用有界任務隊列時:如有新的任務須要執行,若是線程池實際線程數小於corePoolSize,則優先建立線程;
         * 若大於corePoolSize,則會將任務加入隊列,
         * 若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,建立新的線程,
         * 若線程數大於maximumPoolSize,則執行拒絕策略。
         */

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,    //coreSize
                2,    //maxSize
                60,    //無效時間
                TimeUnit.SECONDS,    //單位
                new ArrayBlockingQueue<Runnable>(3),    //有界隊列
                new MyReject()
                );
        pool.execute(new MyTask(1, "任務1"));
        pool.execute(new MyTask(2, "任務2"));
        pool.execute(new MyTask(3, "任務3"));
        pool.execute(new MyTask(4, "任務4"));
        pool.execute(new MyTask(5, "任務5"));
        pool.execute(new MyTask(6, "任務6"));
        
        pool.shutdown();
    }

}

運行結果:
自定義處理...
當前被拒絕任務爲:Thread[Thread-5,5,main]
run taskId = 5
run taskId = 1
run taskId = 2
run taskId = 3
run taskId = 4對象

相關文章
相關標籤/搜索