關於線程池,這些你可能不知道:構建屬於本身的強大線程池

前言

Java中的線程池功能已經比較全面了,可是在某些業務場景下還存在一些問題,思考一下,若是你有一個任務處理系統,全部任務都經過線程池處理,實際在處理問題時可能會有頻繁的CPU運算、網絡IO處理,正常狀況下1分鐘就應該處理完成的任務,若是實際上花了5分鐘甚至更多時間應該怎麼辦呢,這種狀況多是因爲代碼問題、RPC調用超時、數據異常等緣由。在這種業務場景下Java自帶的線程池彷佛不足以知足咱們的需求,咱們需求一種能夠定製化超時時間,而且超時自動中斷而且重跑任務的線程池。java

怎麼作

咱們的需求:redis

  • 判斷出超時的線程進行中斷
  • 中斷以後繼續提交超時線程 首先咱們須要解決如何中斷線程的問題,基礎比較好的同窗可能知道future中有一個cancel方法能夠中斷線程。 基本的思路有點相似看門狗的思想,給每個線程賦予一個ID,每一個ID裏面有線程的提交時間,線程的future對象和runnable對象,異步啓動一個監控線程,每一秒自動執行一次,判斷每個線程ID對應的提交時間是否超時,若是超時就調用future.cancel進行取消,並從新submit給線程池。

實現

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

//咱們實現本身的中斷自動重跑的線程池
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
  //定義監控線程
  private static final ScheduledExecutorService monitorExecutor = Executors.newScheduledThreadPool(1);
  //存儲線程ID和它對應的起始執行時間
  private final ConcurrentMap<Integer, Long> aliveThreadRefreshTimeMap = new ConcurrentHashMap<>();
  //存儲線程ID和它對應的Future對象
  private final ConcurrentMap<Integer, Future<?>> aliveThreadFutureMap = new ConcurrentHashMap<>();
  //存儲線程ID和它對應的Runnable對象
  private final ConcurrentMap<Integer, Runnable> aliveThreadTaskMap = new ConcurrentHashMap<>();
  //線程ID
  private AtomicInteger aliveThreadNum = new AtomicInteger(0);
  //自動重試時間
  private long retryTime;

  private boolean isMonitoring = false;
  //構造時傳入重試時間
  public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long retryTime) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    this.retryTime = retryTime;
  }

  //重寫:在提交線程池後記錄線程信息
  @Override
  public Future<?> submit(Runnable task) {
    if (task == null) {
      throw new NullPointerException();
    }
    Future<?> future = super.submit(task);
    afterSubmit(future, task);
    return future;
  }

  //每次提交任務後將線程信息加入到對應的Map裏,而且首次提交時啓動monitor線程
  private void afterSubmit(Future future, Runnable task) {
    aliveThreadFutureMap.put(aliveThreadNum.get(), future);
    aliveThreadTaskMap.put(aliveThreadNum.get(), task);
    long currentTime = System.currentTimeMillis();
    aliveThreadRefreshTimeMap.put(aliveThreadNum.get(), currentTime);
    initializeMonitorThread();
    aliveThreadNum.incrementAndGet();
  }

  //啓動一個看門狗線程
  private void initializeMonitorThread() {
    //看門狗線程只需啓動一次
    if (isMonitoring) {
      return;
    }
    monitorExecutor.scheduleAtFixedRate(() -> {
      isMonitoring = true;
      System.out.printf("monitor thread start..., aliveThreadRefreshTimeMap:%s\n", aliveThreadRefreshTimeMap);
      List<Integer> removeIdList = new ArrayList<>();
      //遍歷儲存線程開始時間的那個Map
      for (int threadId : aliveThreadRefreshTimeMap.keySet()) {
        long currentTime = System.currentTimeMillis();
        long refreshTimes = currentTime - aliveThreadRefreshTimeMap.get(threadId);
        System.out.printf("thread %d, refreshTimes is %d\n", threadId, refreshTimes);
        //判斷時間大小,注意這裏並不意味着超時,由於咱們在線程提交時將它的時間加入map,但線程可能很快就結束了,map裏的值在結束時不會更新
        if (refreshTimes > retryTime) {
          System.out.printf("alive thread %d: is %dms to refresh, will restart\n", threadId,
              currentTime - aliveThreadRefreshTimeMap.get(threadId));
          Future future = aliveThreadFutureMap.get(threadId);
          future.cancel(true);
          //isCanceled返回true說明超時線程已被中斷成功,返回false說明線程早已經跑完了,沒有超時
          //注意這裏用了isCanceled並無用cancel,是由於若是一個線程已經被取消後,須要resubmit時可能提交失敗,咱們須要支持對於這種狀況讓它能夠重複嘗試提交
          if(!future.isCancelled()){
            System.out.println("canceled failed with thread id : "+threadId+" the thread may already stopped");
            removeIdList.add(threadId);
            continue;
          }
          //超時線程從新提交
          Future<?> resubmitFuture;
          try {
            resubmitFuture = super.submit(aliveThreadTaskMap.get(threadId));
          } catch (Exception e) {
            //注意這裏可能提交失敗,由於可能線程池被佔滿,走拒絕策略了。。咱們這裏的處理方式是直接跳過,這樣它還會再下一次monitor線程執行時被重試
            System.out.println("error when resubmit , the threadPool may be full will retry with error info: " + e.getMessage());
            continue;
          }
          //resubmit成功後登記信息
          aliveThreadNum.incrementAndGet();
          currentTime = System.currentTimeMillis();
          aliveThreadRefreshTimeMap.put(aliveThreadNum.get(), currentTime);
          aliveThreadFutureMap.put(aliveThreadNum.get(), resubmitFuture);
          aliveThreadTaskMap.put(aliveThreadNum.get(), aliveThreadTaskMap.get(threadId));
          removeIdList.add(threadId);
          System.out.printf("restart success, thread id is:%d\n", aliveThreadNum.get());
        }
      }
      for (int id : removeIdList) {
        //清理不須要的已經結束的線程
        aliveThreadFutureMap.remove(id);
        aliveThreadTaskMap.remove(id);
        aliveThreadRefreshTimeMap.remove(id);
      }
    //定義每一秒執行一次的定時線程
    }, 0, 1, TimeUnit.SECONDS);
  }

}

複製代碼

咱們定製化了本身的線程池,那麼如何使用它呢?apache

@Test
  public void test1(){
    //建立線程池,線程池core和max都相同
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
    BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("processor-" + "-thread-%s");
    MyThreadPoolExecutor executorPool = new MyThreadPoolExecutor(
        10,
        10, 30L, TimeUnit.SECONDS, workQueue, builder.build(),2500L);

    executorPool.submit(()->{
      try {
        System.out.println(Thread.currentThread().getName()+" start sleep 1 ms....");
        Thread.sleep(1);
      }catch (InterruptedException e){
        System.out.println(Thread.currentThread().getName()+e.getMessage());
      }
    });
    executorPool.submit(()->{
      try {
        System.out.println(Thread.currentThread().getName()+" start sleep 30 s....");
        Thread.sleep(30000);
      }catch (InterruptedException e){
        System.out.println(Thread.currentThread().getName()+e.getMessage());
      }
    });

    while (true);
  }
複製代碼

由於咱們已經包裝好了,咱們只須要使用apache的builder構造一個線程池,傳入超時時間,2500ms,啓動兩個任務,其中一個任務1ms就結束,另一個任務要執行30s。咱們預期是第一個任務執行一次就完成了,第二個任務每次都會超時中斷重跑:bash

processor--thread-1 start sleep 1 ms....
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046}
processor--thread-2 start sleep 30 s....
thread 0, refreshTimes is 2
thread 1, refreshTimes is 0
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 1005
thread 1, refreshTimes is 1004
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 2004
thread 1, refreshTimes is 2002
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 3006
alive thread 0: is 3006ms to refresh, will restart
canceled failed with thread id : 0 the thread may already stopped
thread 1, refreshTimes is 3004
alive thread 1: is 3004ms to refresh, will restart
processor--thread-2sleep interrupted
restart success, thread id is:3
processor--thread-3 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 1996
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 2998
alive thread 3: is 2998ms to refresh, will restart
processor--thread-3sleep interrupted
restart success, thread id is:4
processor--thread-4 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 1000
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 2000
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 2999
alive thread 4: is 2999ms to refresh, will restart
processor--thread-4sleep interrupted
restart success, thread id is:5
processor--thread-5 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 2000
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 3000
alive thread 5: is 3000ms to refresh, will restart
processor--thread-5sleep interrupted
restart success, thread id is:6
processor--thread-6 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 1999
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 3000
alive thread 6: is 3000ms to refresh, will restart
processor--thread-6sleep interrupted
restart success, thread id is:7
processor--thread-7 start sleep 30 s....
複製代碼

符合咱們預期,canceled failed with thread id : 0 the thread may already stopped,意味着第一個線程已經跑完了,以後每隔3s,第二個線程都會被中斷而後自動重跑。 設計時,有幾點須要注意:網絡

  • 併發的處理:由於有多個map,map在submit和monitor線程中都會被操做,因此須要用concurrenthashmap,同理須要用atomicInteger
  • 若是線程沒有超時很快就跑完了,理想的作法是,跑完以後直接清理掉這個線程的數據,可是基於咱們的邏輯須要封裝在內部,沒辦法侵入外部的業務,因此咱們會在內部取消的時進行判斷:由於一個線程若是已經完成,那麼他是沒法被cancel的
  • 另一點是resubmit時,若是線程池滿了,這個時候會進行拒絕策略,咱們抓到異常時,默認是不清理數據,直接跳過,這樣在下一秒monitorThread運行時,它仍是會被遍歷到進行重試(這個時候iscanceled方法返回的是true,由於它以前已經被取消過一次了,這也是用isCanceled判斷而不用Canceled判斷的緣由)。這裏理想的作法是根據不一樣的拒絕策略進行定製化的處理。你們能夠自行實現
  • scheduleAtFixedRate這個東西可能會有潛在的坑,若是某一次執行過程當中拋出了異常,那麼它就不會再執行了,因此咱們在代碼中會盡可能抓到這樣的異常本身進行處理。

結論

對於實際業務來講,定製化線程池只是一種經過程序層面解決問題的思路,它也有一些缺點,例如會帶來額外的系統資源的消耗;內部若是經過scheduleAtFixedRate實現的話會有潛在的性能問題,實際上還有不少其它基於業務的方案。咱們應該選擇適合本身業務的解決方式。併發

好比咱們有一個訂單處理系統,每個筆訂單收單後都須要去各個其它服務拿數據,組成一筆有效的訂單,甚至咱們本身也須要根據不一樣的數據進行一些複雜的運算。那麼若是咱們若是不使用這種線程池,徹底能夠藉助其它中間件完成,好比我發現rpc超時時,能夠將訂單信息放入redis,信息裏有本身配置的下次訂單執行時間,再啓一個線程從redis里拉訂單進行補償。但引入redis帶來了系統額外的複雜,但相對於存於內存中的信息,redis對於訂單數據的可靠性反而更強。換句話說,你機器掛了,內存中的數據就沒有了,但若是用redis,結合業務進行控制,反而能夠達到更好的效果。異步

相關文章
相關標籤/搜索