Java線程池源碼分析與總結(上)

線程池概述

  • 池化技術應該是最經常使用的提升程序性能的手段,包括線程池與數據庫鏈接池,常量池等等java

  • 建立與銷燬線程是比較耗費時間的,不利於處理Java程序的高併發,所以引入線程池,也就是維護一組可用的線程,若是有任務,就當即將線程池的空閒線程分配給任務,提高性能,若是線程池內全部的線程都是忙狀態的話,能夠將任務放到任務隊列,或者建立一個新的線程並放入線程池,用於處理新的任務git

  • 使用線程池的好處程序員

    • 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。github

    • 在《阿里巴巴 Java 開發手冊》「併發處理」這一章節,明確指出線程資源必須經過線程池提供,不容許在應用中自行顯示建立線程。spring

      爲何呢?docker

      使用線程池的好處是減小在建立和銷燬線程上所消耗的時間以及系統資源開銷,解決資源不足的問題。若是不使用線程池,有可能會形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。數據庫

    • 提升響應速度。當任務到達時,任務能夠不須要等待線程建立就能當即執行。編程

    • 提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。(最原始粗放的服務器實現就是請求綁定一個套接字後就新開一個線程去處理,若是請求量巨大的時候,服務器是確定要崩的,由於缺少對線程資源的管理)數組

      • 線程池監控的方法:緩存

        • SpringBoot 中的 Actuator 組件

        • 經過ThreadPoolExecutor的自有接口獲取線程池信息

          image-20210415154301230

線程池在實際項目中的使用場景

  • 線程池通常用於執行多個不相關聯的耗時任務,沒有多線程的狀況下,任務順序執行,使用了線程池的話可以讓多個不相關聯的任務同時執行。

  • 舉個項目中實際使用的例子:

    • dockerhub項目中Caches緩存中使用的定時線程池用來定時更新緩存的數據到數據庫(執行異步任務
    • 其他的例子大都相似
  • 實際使用時要注意的通常規則

    • 使用線程池,而不是建立單個線程

    • 使用ThreadPoolExecutor構造函數而不是Executors工具類,下文有具體的解釋

    • 顯式的定義線程池名字,以業務名字做區分,便於定位問題

      • 可使用自定義的ThreadFactory

        import java.util.concurrent.Executors;
        import java.util.concurrent.ThreadFactory;
        import java.util.concurrent.atomic.AtomicInteger;
        /** * 線程工廠,它設置線程名稱,有利於咱們定位問題。 */
        public final class NamingThreadFactory implements ThreadFactory {
        
            private final AtomicInteger threadNum = new AtomicInteger();
            private final ThreadFactory delegate;
            private final String name;
        
            /** * 建立一個帶名字的線程池生產工廠 */
            public NamingThreadFactory(ThreadFactory delegate, String name) {
                this.delegate = delegate;
                this.name = name; // TODO consider uniquifying this
            }
        
            @Override 
            public Thread newThread(Runnable r) {
                Thread t = delegate.newThread(r);
                t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
                return t;
            }
        
        }
        複製代碼
      • 使用guava的ThreadFactoryBuilder

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                                .setNameFormat(threadNamePrefix + "-%d")
                                .setDaemon(true).build();
        ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
        複製代碼
    • 不一樣的業務使用不一樣的線程池

      • 通常建議是不一樣的業務使用不一樣的線程池,配置線程池的時候根據當前業務的狀況對當前線程池進行配置,由於不一樣的業務的併發以及對資源的使用狀況都不一樣,重心優化系統性能瓶頸相關的業務
    • 有依賴關係的任務在使用同一個線程池在稍高的併發情況下可能會出現一種邏輯上的死鎖,大概來講就是父任務A中調用了子任務B,父任務與子任務共用一個線程池,當父任務佔據了所有的核心線程資源,而且子任務仍未執行時,沒法退出對核心線程的佔用,而與此同時子任務只能堆積在任務隊列中,沒法得到線程資源,若是又使用了無界隊列的話,則會一直堆積直到OOM,具體的參考線程池運用不當的一次線上事故

線程池類的繼承、實現關係

Executor框架

  • Executor 框架是 Java5 以後引進的,在 Java 5 以後,經過 Executor 來啓動線程比使用Threadstart 方法更好,除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點:有助於避免this逃逸問題。

    補充:this 逃逸是指在構造函數返回以前其餘線程就持有該對象的引用. 調用還沒有構造徹底的對象的方法可能引起使人疑惑的錯誤,若是用volatile修飾的話應該就能解決這個問題了,不知道Executor框架的出現是如何有助於解決此問題的呢?---不是很清楚

    Executor 框架不只包括了線程池的管理,還提供了線程工廠、隊列以及拒絕策略等,Executor 框架讓併發編程變得更加簡單

  • 實際上在Executor框架中,還有一個線程池ForkJoinPool可能用的不太多,此類繼承AbstractExecutorService,文章末尾會介紹到

  • 除了說Executor框架,還有一種說法就是JUC框架,也就是java.util.concurrent這個包下的全部的多線程相關類的總稱

Executor的框架結構

任務的提交

向線程池提交任務有兩種方法:

  1. execute方法

    1. 只接受Runnable的任務,不提供返回值,源碼分析見下文(做爲線程池的入口必定是要仔細分析的
  2. submit方法

    public Future<?> submit(Runnable task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<Void> ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
    }
    
    /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public <T> Future<T> submit(Runnable task, T result) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task, result);
      execute(ftask);
      return ftask;
    }
    
    /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
    public <T> Future<T> submit(Callable<T> task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
    }
    
    
    複製代碼
    • ThreadPoolExecutor沒有實現本身的submit方法,而是沿用的父類AbstractExecutorService的實現

    • 接受RunnableCallable的任務,並提供Future類型返回值

      • submit內部將傳入的任務統一封裝爲RunnableFuture類型,此類型實現了RunnableFuture接口,老縫合怪了~

      • 不一樣之處就在於傳入Runnable的任務獲得的Future可能沒法獲得有效的返回值,而Callable的任務可以獲得返回結果

        • 提交Runnable任務時也能夠指定一個返回結果,做爲Future的返回結果,可是這個結果顯然並非任務執行完成的返回值,而是程序員事先傳入的值,其做用相似因而一個flag值

          public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
          }
          
          public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
              throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
          }
          
          static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
              this.task = task;
              this.result = result;
            }
            public T call() {
              task.run();
              return result;
            }
          }
          複製代碼
          • 能夠清晰的看見,對於傳入的Runnable任務會被轉換爲Callable類型,若是有傳入預期的返回值,call函數中就會原封不動的返回,可是若是沒有傳入,就是返回null了
    • submit內部實際上仍然調用了execute方法

    • 此處補充CallableRunnable的差別:

      • 前者的執行方法內部能夠有返回值,而且若是沒法獲得有效返回值還能夠拋出異常,後者的執行方法中沒有返回值也不能拋出異常
    • 補充Future接口的做用

      • 能夠經過isDone判斷任務是否執行完
      • 經過get方法得到執行結果
        1. 注意此方法是阻塞方法,須要等待任務執行完畢後才能返回

銷燬(關閉)線程池

  1. shutdown方法 關閉線程池,線程池的狀態變爲 SHUTDOWN線程池再也不接受新任務了,可是隊列裏的任務得執行完畢

    1. 執行shutdown方法後,能夠執行awaitTermination方法,則會等待指定的時間讓線程池關閉,若在指定時間內關閉則返回true,不然false

    2. shutdown源碼分析

      public void shutdown() {
              final ReentrantLock mainLock = this.mainLock;
           // 上鎖
              mainLock.lock();
              try {
                  // 判斷調用者是否有權限shutdown線程池
                  checkShutdownAccess();
                  // CAS 設置線程池狀態爲SHUTDOWN
                  advanceRunState(SHUTDOWN);
                  // 中斷全部空閒線程
                  interruptIdleWorkers();
                  // 鉤子函數
                  onShutdown(); // hook for ScheduledThreadPoolExecutor
              } finally {
                  // 解鎖
                  mainLock.unlock();
              }
              // 嘗試終止線程池
              tryTerminate();
          }
      複製代碼
      • 對於interruptIdleWorkers函數的解析與tryTerminate的解析放在了後邊
  2. shutdownNow方法 閉線程池,線程的狀態變爲 STOP線程池會終止當前正在運行的任務,並中止處理排隊的任務並返回正在等待執行的任務列表

    public List<Runnable> shutdownNow() {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock;
      // 上鎖
      mainLock.lock();
      try {
        // 判斷調用者是否有權限shutdown線程池
        checkShutdownAccess();
        // CAS 設置線程池狀態爲STOP
        advanceRunState(STOP);
        // 中斷全部線程
        interruptWorkers();
        // 從隊列中獲取剩餘的未執行的工做列表
        tasks = drainQueue();
      } finally {
        mainLock.unlock();
      }
      // 嘗試終止線程池
      tryTerminate();
      // 返回未執行的任務列表
      return tasks;
    }
    複製代碼
    • interruptWorkers的解析放到了後文中
  3. 使用以下兩個方法來判斷線程池是否徹底關閉

    1. isTerminated() 當調用 shutdown() 方法後,而且全部提交的任務完成後返回爲 true,或者是執行shutdownNow後,線程池內的線程所有被中斷,工做線程數量爲0後返回true
    2. isShutdown() 當調用 shutdown() 方法後返回爲 true。

Executor框架使用圖

  1. 主線程首先要建立實現 Runnable 或者 Callable 接口的任務對象。
  2. 把建立完成的實現 Runnable/Callable接口的 對象直接交給 execute 執行: ExecutorService.execute(Runnable command))或者也能夠把 Runnable 對象或Callable 對象提交給 submit 執行(ExecutorService.submit(Runnable task)ExecutorService.submit(Callable <T> task))。
  3. 若是執行 ExecutorService.submit(…)ExecutorService 將返回一個實現Future接口的對象(剛剛也提到過了執行 execute()方法和 submit()方法的區別,submit()會返回一個 FutureTask 對象
  4. 最後,主線程能夠執行 FutureTask.get()方法來等待任務執行完成。主線程也能夠執行 FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。

線程池的使用

使用Executors工具類建立線程池

  • 建立線程池的最方便的作法是使用Executors工具類,能夠建立普通的線程池與能夠執行定時任務的線程池,可是簡單的建立方法意味着封裝的程度高,就會致使自由度低,甚至有一些風險

普通的線程池

  • 固定線程數量的線程池

    • 該線程池中的線程數量始終不變。當有一個新的任務提交時,線程池中如有空閒線程,則當即執行。若沒有,則新的任務會被暫存在一個任務隊列中,待有線程空閒時,便處理在任務隊列中的任務。

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
      }
      // 默認任務隊列的長度是Integer.MAX_VALUE
      public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
      }
      複製代碼
      • 核心線程數量與最大線程數量一致,也就是核心線程就是所有可用的線程了
      • 使用的是無界的隊列,不會拒絕任務,可是也所以也帶來了隱患。
      • 因爲使用無界隊列, maximumPoolSize 將是事實上的無效參數,由於不可能存在任務隊列滿的狀況(能夠將任務隊列視做系統內最大,因此不用設置最大線程數,由於再多的任務也徹底能夠緩存在隊列中)。因此,經過建立 FixedThreadPool的源碼能夠看出建立的 FixedThreadPoolcorePoolSizemaximumPoolSize 被設置爲同一個值。
        • 由於一樣的理由,使用無界隊列時 keepAliveTime 將是一個無效參數(由於不會有核心線程以外的其他線程)(固然,若是空閒核心線程被容許超時回收的話,就是有用的了,便是,若是空閒就會當即展開回收)
      • 當線程池中的線程數達到 corePoolSize 後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過 corePoolSize;因此一旦corePoolSize設置不對的話,將會有大量任務乾等着,而且性能也沒有徹底發揮
      • 容許建立的線程個數雖然有限制,可是容許請求的隊列長度爲 Integer.MAX_VALUE ,可能堆積大量的請求,從而致使 OOM
  • 僅有一個線程的線程池

    • 能夠視爲是固定線程數量線程池的特值狀況,即nThreads爲1的狀況

      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        // 使用包裝類包裝過的,用來保證:
        return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory));
      }
      複製代碼
  • 動態分配線程數量的線程池

    • 該方法返回一個可根據實際狀況調整線程數量的線程池。線程池的線程數量不肯定,但如有空閒線程能夠複用,則會優先使用可複用的線程。若全部線程均在工做,又有新的任務提交,則會建立新的線程處理任務。全部線程在當前任務執行完畢後,將返回線程池進行復用。

      public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>(),
                                            threadFactory);
      }
      複製代碼
      • 核心線程數量設置爲0,不養閒人,沒有任務時,線程池也不會白白佔用資源
      • 由於核心線程樹爲0,線程池中的全部線程都不是核心線程,所以都會在60秒內,接不到活時被回收
      • 動態分配與按需建立的功能的實現應歸功於SynchronousQueue類型的任務隊列,這個隊列不會緩存任務,而是若是有空閒線程就必定會交給空閒線程執行,沒有空閒線程就直接建立新線程:
        • 在execute方法中首先執行 SynchronousQueue.offer(Runnable task) 提交任務到任務隊列。若是當前線程池中有閒線程正在執行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主線程執行 offer 操做與空閒線程執行的 poll 操做配對成功,主線程把任務交給空閒線程執行,execute()方法執行完成。
        • 當初始條件下線程池內的線程數量爲0時,或者線程池中沒有空閒線程時,將沒有線程執行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)這種狀況下,offer方法將返回false,此時 CachedThreadPool 會建立新線程執行任務,execute 方法執行完成
      • 雖然隊列使用的是有界隊列,可是最大線程數量是Integer.MAX_VALUE,這意味着線程池能夠不受控的一直接受任務,直到棧空間OOM

執行定時任務的線程池

定時任務線程池的建立
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}


public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue(), threadFactory, handler);
}
複製代碼
  • 雖然隊列使用的是有界隊列,可是最大線程數量是Integer.MAX_VALUE,這意味着線程池能夠不受控的一直接受任務,直到棧空間OOM

  • 須要注意的是,儘管ScheduledExecutorService是內部調用了父類ThreadPoolExecutord的構造方法,可是其內部實現的核心入口方法再也不是ThreadPoolExecutor的execute方法,而是ScheduledThreadPoolExecutor中的delayExecute方法

  • 定時任務的實現依賴於延遲隊列DelayedWorkQueue

  • 能夠發現執行定時任務可使用springboot中的@Scheduled註解,也可使用底層的定時任務線程池實際上本線程池基本不會用,由於實現定時任務有其餘的方案,好比springboot的註解與quartz等等

    備註: Quartz 是一個由 java 編寫的任務調度庫,由 OpenSymphony 組織開源出來。在實際項目開發中使用 Quartz 的仍是居多,比較推薦使用 Quartz。由於 Quartz 理論上可以同時對上萬個任務進行調度,擁有豐富的功能特性,包括任務調度、任務持久化、可集羣化、插件等等

執行不一樣種類的定時任務
  • 一次性的延遲任務 schedule方法

    public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    複製代碼
    • 這裏值得補充的是,ScheduledThreadPoolExecutor重寫了executesubmit方法,兩個方法內部實際上都是簡單地調用schedule方法來實現的
  • 以上一次任務開始爲基準固定間隔循環執行任務 scheduleAtFixedRate方法

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    複製代碼
  • 以上一次任務結束爲基準固定間隔循環執行任務 scheduleWithFixedDelay方法

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    複製代碼
  • 後二者的區別見圖

    • 引出兩個問題:

      • 若是在fixed-rate模式下,任務的執行時間大於間隔時間,那麼任務是怎樣安排執行的呢? 答案是:後續的任務會在上一個任務執行完畢後再開始執行,而無論執行間隔了,也就是延遲執行,而不是併發執行
      • 若是作定時間隔任務時,前邊的任務出現異常,後續的任務會繼續執行嗎? 答:一旦出現異常,當前的任務與後續的任務都不會再執行,而是卡住,而且能經過自定義afterExecute方法來處理異常,保證拋出異常的任務取消,而其餘任務繼續執行
定時任務線程池的大體工做原理的理解
  • 上邊說過了,定時任務線程池的核心入口就是上邊三種類型的任務方法中都有的一個方法--就是delayedExecute,可是在說這個關鍵的入口方法以前,不得說下,調用方法前對於提交的任務的包裝,包裝這一塊設計到的類比較多,先用一張類圖大體把握

    image-20210425151258617

  • 首先包裝爲ScheduledFutureTask

    // 用於包裝schedule(Runnable)提交的任務
    // result爲null,ns是納秒爲單位的,要觸發執行任務的系統時間
    ScheduledFutureTask(Runnable r, V result, long ns) {
      super(r, result);
      this.time = ns;
      this.period = 0;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // 包裝scheduleWithFixedDelay和scheduleAtFixedRate提交的任務
    // result 爲null
    // ns是納秒爲單位的,下一次要觸發執行任務的系統時間
    // period是以納秒爲單位的任務循環週期
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // 包裝schedule(Callable)提交的任務
    // ns是納秒爲單位的,要觸發執行任務的系統時間
    ScheduledFutureTask(Callable<V> callable, long ns) {
      super(callable);
      this.time = ns;
      this.period = 0;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    
    // 關鍵的run方法
    public void run() {
      // 首先判斷是否是週期性執行的任務
      boolean periodic = isPeriodic();
      // 判斷當前的線程池可否執行定時任務,若是不能則取消任務
      if (!canRunInCurrentRunState(periodic))
        cancel(false);
      else if (!periodic)
        // 若是不是週期性任務,也就是一次性的定時任務的話,直接執行提交的任務
        ScheduledFutureTask.super.run();
      // 若是是週期性執行的任務,首先執行提交的任務,並將任務的狀態重置爲初始化狀態,以備下一次執行
      else if (ScheduledFutureTask.super.runAndReset()) {
        // 執行完畢後計算下一次執行的時間
        setNextRunTime();
        // 從新提交當前的任務到延時隊列中,用於下一個週期的執行
        reExecutePeriodic(outerTask);
      }
    }
    
    // 計算下一次要執行任務的時間
    // time表示下一次執行任務的時間,period是用來計算time的週期時間
    private void setNextRunTime() {
      long p = period;
      if (p > 0)
        // scheduleAtFixedRate
        // 在第一次執行完任務後,下一次要執行的時間就是徹底按照週期來執行,無論到底何時執行完的(也就是now),以後的每次執行都是如此
        time += p;
      else
        // scheduleWithFixedDelay
        // 第一次執行完任務後,下一次要執行的時間是以當前時間爲基準計算的,也就是上一次完成任務的時間爲基準計算的,以後的每次執行都是如此
        time = triggerTime(-p);
    }
    
    // 用於在延遲隊列中按照下一次觸發的順序進行排序
    public int compareTo(Delayed other) {
      if (other == this) // compare zero if same object
        return 0;
      if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
        // 觸發時間一致的,按照提交的順序來
        else if (sequenceNumber < x.sequenceNumber)
          return -1;
        else
          return 1;
      }
      long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
    
    // 計算從當前時刻到下次執行任務還有多長時間
    public long getDelay(TimeUnit unit) {
      return unit.convert(time - now(), NANOSECONDS);
    }
    複製代碼
    • scheduleWithFixedDelayscheduleAtFixedRate在實現時的區別就在於這次包裝過程當中,前者傳入的週期是unit.toNanos(-delay)然後者是unit.toNanos(perioid)
      • 其原理在於setNextRunTime方法中,詳見方法註釋
    • 在這次包裝過程當中,定時循環任務與一次行的定時任務在實現上除了period以外還有一個區別就是outerTask
      • 定時循環任務會持有此屬性,以便可以在本輪任務執行完畢後,將當前的任務從新提交到延遲隊列中,以備下一輪週期的執行,參考reExecutePeriodic方法
    • getDelay方法最主要的應用就是在延時隊列的take poll這兩個獲取任務的方法中,起到了控制獲取任務的時間的做用
      • 應該能夠這樣說,從大致上看,延遲隊列能夠按照延遲時間進行排序+延遲隊列中可使用getDelay方法來控制獲取任務的時延--這兩個特性是直觀上的延遲任務線程池起做用的關鍵
  • 其次包裝爲RunnableScheduleFuture

    protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) {
      return task;
    }
    protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) {
      return task;
    }
    複製代碼
    • 其實是直接返回RunnableScheduledFuture,可是沒有看懂爲何要用這樣的一個方法類型提高
  • 定時任務線程池的入口方法delayedExecute

    private void delayedExecute(RunnableScheduledFuture<?> task) {
      // 1. 判斷線程池是否是shutdown狀態,若是是執行拒絕策略
      if (isShutdown())
        reject(task);
      else {
        // 2. 首先就是向DelayedWorkQueue中添加任務
        super.getQueue().add(task);
        // 3. 無論是通常的線程池仍是執行定時任務的線程池,都會在向隊列中添加完任務後執行re-check
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
          task.cancel(false);
        else
          // 4. 若是經過了recheck,執行此方法
          // 確保線程池內有線程運行
          ensurePrestart();
      }
    }
    
    void ensurePrestart() {
      int wc = workerCountOf(ctl.get());
      // 對於Executors建立的線程池來講,核心線程數量爲0,因此會保證有非核心線程執行
      if (wc < corePoolSize)
        addWorker(null, true);
      else if (wc == 0)
        addWorker(null, false);
    }
    複製代碼
    • 若是線程池狀態不是SHUTDOWN的話,直接向隊列中添加任務,而沒有直接讓線程去執行任務的場景
  • addWorker開始,後續的就是標準的線程池的線程管理與任務獲取的流程了,也就是說定時任務線程池與通常線程池的主要區別在於任務調度部分,而鏈接任務管理與線程管理的通道--延時隊列也須要大體瞭解下

    static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
     
     // 任務調度時提交任務的方法就是add方法
     public boolean add(Runnable e) {
       return offer(e);
     }
      
      public boolean offer(Runnable x) {
        if (x == null)
          throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          int i = size;
          if (i >= queue.length)
            grow();
          size = i + 1;
          if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
          } else {
            // 按照排序規則,選擇合適的位置插入到隊列中
            siftUp(i, e);
          }
          if (queue[0] == e) {
            leader = null;
            available.signal();
          }
        } finally {
          lock.unlock();
        }
        return true;
      }
      // 按照排序規則,選擇合適的位置插入到隊列中
      private void siftUp(int k, RunnableScheduledFuture<?> key) {
        while (k > 0) {
          int parent = (k - 1) >>> 1;
          RunnableScheduledFuture<?> e = queue[parent];
          // 按照RunnableScheduledFuture的time屬性進行排序
          if (key.compareTo(e) >= 0)
            break;
          queue[k] = e;
          setIndex(e, k);
          k = parent;
        }
        queue[k] = key;
        setIndex(key, k);
      }
      
      // getTask中,核心線程取任務(無超時時間)
      // 若是當前不能獲取,就阻塞等待
      public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
              available.await();
            else {
              // 調用getDelay方法獲得須要延時等待的時間
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                return finishPoll(first);
              first = null; // don't retain ref while waiting
              if (leader != null)
                available.await();
              else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                  available.awaitNanos(delay);
                } finally {
                  if (leader == thisThread)
                    leader = null;
                }
              }
            }
          }
        } finally {
          if (leader == null && queue[0] != null)
            available.signal();
          lock.unlock();
        }
      }
      
      // getTask中,非核心線程取任務或則核心線程獲取任務(容許超時回收)
      public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
              if (nanos <= 0)
                return null;
              else
                nanos = available.awaitNanos(nanos);
            } else {
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                return finishPoll(first);
              if (nanos <= 0)
                return null;
              first = null; // don't retain ref while waiting
              if (nanos < delay || leader != null)
                nanos = available.awaitNanos(nanos);
              else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                  long timeLeft = available.awaitNanos(delay);
                  nanos -= delay - timeLeft;
                } finally {
                  if (leader == thisThread)
                    leader = null;
                }
              }
            }
          }
        } finally {
          if (leader == null && queue[0] != null)
            available.signal();
          lock.unlock();
        }
      }
      
    }
    複製代碼
    • DelayedWorkQueue的內部存儲是RunnableScheduledFuture類型的數組
    • 提交任務與獲取任務用的是同一把鎖

Java線程池源碼分析與總結(下)

參考

  1. Java線程池實現原理及其在美團業務中的實踐--美團技術團隊
  2. Java線程池學習總結
  3. Java線程池
相關文章
相關標籤/搜索