Spring Cloud Alibaba-ThreadPool(十)

線程池做用

  • 避免建立和銷燬線程的資源浪費
    • 類加載和GC銷燬須要消耗資源
  • 提升響應速度
    • 省去建立時間
  • 重複利用

重要參數(ThreadPoolExecutor)

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼
屬性 類型 含義
corePoolSize int 核心線程池大小
maximumPoolSize int 最大線程池大小
keepAliveTime long 線程池中非核心線程空閒的存活時間大小
unit TimeUnit 線程空閒存活時間單位
workQueue BlockingQueue 線程等待隊列
threadFactory ThreadFactory 線程建立工廠
handler RejectedExecutionHandler 拒絕策略
  • handler(拒絕策略)
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    複製代碼
    • AbortPolicy(默認):拋出一個異常
    • DiscardPolicy:直接丟棄任務
    • DiscardOldestPolicy:丟棄隊列裏最老的任務,將當前這個任務繼續提交給線程池
    • CallerRunsPolicy:交給線程池調用所在的線程進行處理

執行邏輯

  • 核心線程池==>>隊列==>>非核心線程==>>拒絕策略
  • 提交一個任務,線程池裏存活的核心線程數小於線程數corePoolSize時,線程池會建立一個核心線程去處理提交的任務
  • 若是線程池核心線程數已滿,即線程數已經等於corePoolSize,一個新提交的任務,會被放進任務隊列workQueue排隊等待執行
  • 當線程池裏面存活的線程數已經等於corePoolSize了,而且任務隊列workQueue也滿,判斷線程數是否達到maximumPoolSize,即最大線程數是否已滿,若是沒到達,建立一個非核心線程執行提交的任務
  • 若是當前的線程數達到了maximumPoolSize,還有新的任務過來的話,直接採用拒絕策略處理

異常處理

  • try-catch捕獲異常
  • submit執行,Future.get接受異常:執行過程保留了異常信息
  • 重寫ThreadPoolExecutor.afterExecute方法,處理傳遞的異常引用:ExtendedExecutor extends ThreadPoolExecutor
  • 實例化傳入本身的ThreadFactory,設置Thread.uncaughtExceptionHandler處理未檢測的異常:setUncaughtExceptionHandler

隊列類型

  • ArrayBlockingQueue(有界隊列):是一個用數組實現的有界阻塞隊列,按FIFO排序量
  • LinkedBlockingQueue(可設置容量隊列):基於鏈表結構的阻塞隊列,按FIFO排序任務,容量能夠選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度爲Integer.MAX_VALUE,吞吐量一般要高於ArrayBlockingQuene;newFixedThreadPool線程池使用了這個隊列
  • DelayQueue(延遲隊列):是一個任務定時週期的延遲執行的隊列。根據指定的執行時間從小到大排序,不然根據插入到隊列的前後排序。newScheduledThreadPool線程池使用了這個隊列
  • PriorityBlockingQueue(優先級隊列):具備優先級的無界阻塞隊列
  • SynchronousQueue(同步隊列):一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene,newCachedThreadPool線程池使用了這個隊列

線程池類型

  • newFixedThreadPool(固定數目線程的線程池)
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    複製代碼
    • 特色
      • 核心線程數和最大線程數大小同樣
      • 沒有所謂的非空閒時間,即keepAliveTime爲0
      • 阻塞隊列爲無界隊列LinkedBlockingQueue
    • 工做機制
      • 提交任務
      • 若是線程數少於核心線程,建立核心線程執行任務
      • 若是線程數等於核心線程,把任務添加到LinkedBlockingQueue阻塞隊列(OOM)
      • 若是線程執行完任務,去阻塞隊列取任務,繼續執行
    • 使用場景 CPU密集型的任務,確保CPU在長期被工做線程使用的狀況下,儘量的少的分配線程,即適用執行長期的任務
  • newCachedThreadPool(可緩存線程的線程池)
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    複製代碼
    • 特色
      • 核心線程數爲0
      • 最大線程數爲Integer.MAX_VALUE
      • 阻塞隊列是SynchronousQueue
      • 非核心線程空閒存活時間爲60秒
    • 工做機制
      • 提交任務
      • 由於沒有核心線程,因此任務直接加到SynchronousQueue隊列
      • 判斷是否有空閒線程,若是有,就去取出任務執行
      • 若是沒有空閒線程,就新建一個線程執行
      • 執行完任務的線程,還能夠存活60秒,若是在這期間,接到任務,能夠繼續活下去;不然,被銷燬
    • 使用場景
      • 併發執行大量短時間的小任務
  • newSingleThreadExecutor(單線程的線程池)
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    複製代碼
    • 特色
      • 核心線程數爲1
      • 最大線程數也爲1
      • keepAliveTime爲0
      • 阻塞隊列是LinkedBlockingQueue
    • 工做機制
      • 提交任務
      • 線程池是否有一條線程在,若是沒有,新建線程執行任務
      • 若是有,任務加到阻塞隊列
      • 當前的惟一線程,從隊列取任務,執行完一個,再繼續取,一我的(一條線程)夜以繼日地幹活
    • 使用場景
  • newScheduledThreadPool(定時及週期執行的線程池)
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    複製代碼
    • 特色
      • 最大線程數爲Integer.MAX_VALUE
      • 阻塞隊列是DelayedWorkQueue
      • keepAliveTime爲0
      • scheduleAtFixedRate() :按某種速率週期執行
      • scheduleWithFixedDelay():在某個延遲後執行
    • 工做機制
      • 添加一個任務
      • 線程池中的線程從 DelayQueue 中取任務
      • 線程從 DelayQueue 中獲取 time 大於等於當前時間的task
      • 執行完後修改這個 task 的 time 爲下次被執行的時間
      • 這個 task 放回DelayQueue隊列中
    • 使用場景
      • 週期性執行任務的場景,須要限制線程數量的場景

自定義ThreadPool

  • copy一份AbortPolicy改個名字,加些本身想加的內容(例如加個輸出)。
public static class MyPolicy implements RejectedExecutionHandler {
        public MyPolicy() { }
        public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor e) {
            // 新增
            System.out.println("拒絕");
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
複製代碼
  • copy一份DefaultThreadFactory改個名字,加些本身想加的內容(例如加個輸出)。
static class MyThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            // 新增
            System.out.println(t.getName() + " has been created");
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
複製代碼
  • 測試
public static void main(String agrs[]) throws Exception{
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadFactory threadFactory = new MyThreadFactory();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue, threadFactory,new MyPolicy());
        executor.prestartAllCoreThreads(); // 預啓動全部核心線程
        for (int i = 1; i <= 10; i++) {
            executor.execute(()-> System.out.println(Thread.currentThread().getId()));
        }

        System.in.read(); //阻塞主線程
    }
複製代碼
  • 結果
    • 核心線程2個,非核心線程2個,隊列緩存2個,等前四個運行完了,繼續運行緩存中的2個,符合預期

線程池的狀態

  • RUNNING = -1
    • 該狀態的線程池會接收新任務,並處理阻塞隊列中的任務
    • 調用線程池的shutdown()方法,能夠切換到SHUTDOWN狀態
    • 調用線程池的shutdownNow()方法,能夠切換到STOP狀態
  • SHUTDOWN = 0
    • 該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務
    • 隊列爲空,而且線程池中執行的任務也爲空,進入TIDYING狀態
  • STOP = 1
    • 該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,並且會中斷正在運行的任務
    • 線程池中執行的任務爲空,進入TIDYING狀態
  • TIDYING = 2
    • 該狀態代表全部的任務已經運行終止,記錄的任務數量爲0
    • terminated()執行完畢,進入TERMINATED狀態
  • TERMINATED = 3
    • 該狀態表示線程池完全終止
相關文章
相關標籤/搜索