Java中的線程池功能已經比較全面了,可是在某些業務場景下還存在一些問題,思考一下,若是你有一個任務處理系統,全部任務都經過線程池處理,實際在處理問題時可能會有頻繁的CPU運算、網絡IO處理,正常狀況下1分鐘就應該處理完成的任務,若是實際上花了5分鐘甚至更多時間應該怎麼辦呢,這種狀況多是因爲代碼問題、RPC調用超時、數據異常等緣由。在這種業務場景下Java自帶的線程池彷佛不足以知足咱們的需求,咱們需求一種能夠定製化超時時間,而且超時自動中斷而且重跑任務的線程池。java
咱們的需求:redis
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
//咱們實現本身的中斷自動重跑的線程池
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
//定義監控線程
private static final ScheduledExecutorService monitorExecutor = Executors.newScheduledThreadPool(1);
//存儲線程ID和它對應的起始執行時間
private final ConcurrentMap<Integer, Long> aliveThreadRefreshTimeMap = new ConcurrentHashMap<>();
//存儲線程ID和它對應的Future對象
private final ConcurrentMap<Integer, Future<?>> aliveThreadFutureMap = new ConcurrentHashMap<>();
//存儲線程ID和它對應的Runnable對象
private final ConcurrentMap<Integer, Runnable> aliveThreadTaskMap = new ConcurrentHashMap<>();
//線程ID
private AtomicInteger aliveThreadNum = new AtomicInteger(0);
//自動重試時間
private long retryTime;
private boolean isMonitoring = false;
//構造時傳入重試時間
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long retryTime) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.retryTime = retryTime;
}
//重寫:在提交線程池後記錄線程信息
@Override
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
Future<?> future = super.submit(task);
afterSubmit(future, task);
return future;
}
//每次提交任務後將線程信息加入到對應的Map裏,而且首次提交時啓動monitor線程
private void afterSubmit(Future future, Runnable task) {
aliveThreadFutureMap.put(aliveThreadNum.get(), future);
aliveThreadTaskMap.put(aliveThreadNum.get(), task);
long currentTime = System.currentTimeMillis();
aliveThreadRefreshTimeMap.put(aliveThreadNum.get(), currentTime);
initializeMonitorThread();
aliveThreadNum.incrementAndGet();
}
//啓動一個看門狗線程
private void initializeMonitorThread() {
//看門狗線程只需啓動一次
if (isMonitoring) {
return;
}
monitorExecutor.scheduleAtFixedRate(() -> {
isMonitoring = true;
System.out.printf("monitor thread start..., aliveThreadRefreshTimeMap:%s\n", aliveThreadRefreshTimeMap);
List<Integer> removeIdList = new ArrayList<>();
//遍歷儲存線程開始時間的那個Map
for (int threadId : aliveThreadRefreshTimeMap.keySet()) {
long currentTime = System.currentTimeMillis();
long refreshTimes = currentTime - aliveThreadRefreshTimeMap.get(threadId);
System.out.printf("thread %d, refreshTimes is %d\n", threadId, refreshTimes);
//判斷時間大小,注意這裏並不意味着超時,由於咱們在線程提交時將它的時間加入map,但線程可能很快就結束了,map裏的值在結束時不會更新
if (refreshTimes > retryTime) {
System.out.printf("alive thread %d: is %dms to refresh, will restart\n", threadId,
currentTime - aliveThreadRefreshTimeMap.get(threadId));
Future future = aliveThreadFutureMap.get(threadId);
future.cancel(true);
//isCanceled返回true說明超時線程已被中斷成功,返回false說明線程早已經跑完了,沒有超時
//注意這裏用了isCanceled並無用cancel,是由於若是一個線程已經被取消後,須要resubmit時可能提交失敗,咱們須要支持對於這種狀況讓它能夠重複嘗試提交
if(!future.isCancelled()){
System.out.println("canceled failed with thread id : "+threadId+" the thread may already stopped");
removeIdList.add(threadId);
continue;
}
//超時線程從新提交
Future<?> resubmitFuture;
try {
resubmitFuture = super.submit(aliveThreadTaskMap.get(threadId));
} catch (Exception e) {
//注意這裏可能提交失敗,由於可能線程池被佔滿,走拒絕策略了。。咱們這裏的處理方式是直接跳過,這樣它還會再下一次monitor線程執行時被重試
System.out.println("error when resubmit , the threadPool may be full will retry with error info: " + e.getMessage());
continue;
}
//resubmit成功後登記信息
aliveThreadNum.incrementAndGet();
currentTime = System.currentTimeMillis();
aliveThreadRefreshTimeMap.put(aliveThreadNum.get(), currentTime);
aliveThreadFutureMap.put(aliveThreadNum.get(), resubmitFuture);
aliveThreadTaskMap.put(aliveThreadNum.get(), aliveThreadTaskMap.get(threadId));
removeIdList.add(threadId);
System.out.printf("restart success, thread id is:%d\n", aliveThreadNum.get());
}
}
for (int id : removeIdList) {
//清理不須要的已經結束的線程
aliveThreadFutureMap.remove(id);
aliveThreadTaskMap.remove(id);
aliveThreadRefreshTimeMap.remove(id);
}
//定義每一秒執行一次的定時線程
}, 0, 1, TimeUnit.SECONDS);
}
}
複製代碼
咱們定製化了本身的線程池,那麼如何使用它呢?apache
@Test
public void test1(){
//建立線程池,線程池core和max都相同
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("processor-" + "-thread-%s");
MyThreadPoolExecutor executorPool = new MyThreadPoolExecutor(
10,
10, 30L, TimeUnit.SECONDS, workQueue, builder.build(),2500L);
executorPool.submit(()->{
try {
System.out.println(Thread.currentThread().getName()+" start sleep 1 ms....");
Thread.sleep(1);
}catch (InterruptedException e){
System.out.println(Thread.currentThread().getName()+e.getMessage());
}
});
executorPool.submit(()->{
try {
System.out.println(Thread.currentThread().getName()+" start sleep 30 s....");
Thread.sleep(30000);
}catch (InterruptedException e){
System.out.println(Thread.currentThread().getName()+e.getMessage());
}
});
while (true);
}
複製代碼
由於咱們已經包裝好了,咱們只須要使用apache的builder構造一個線程池,傳入超時時間,2500ms,啓動兩個任務,其中一個任務1ms就結束,另一個任務要執行30s。咱們預期是第一個任務執行一次就完成了,第二個任務每次都會超時中斷重跑:bash
processor--thread-1 start sleep 1 ms....
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046}
processor--thread-2 start sleep 30 s....
thread 0, refreshTimes is 2
thread 1, refreshTimes is 0
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 1005
thread 1, refreshTimes is 1004
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 2004
thread 1, refreshTimes is 2002
monitor thread start..., aliveThreadRefreshTimeMap:{0=1546314267046, 1=1546314267048}
thread 0, refreshTimes is 3006
alive thread 0: is 3006ms to refresh, will restart
canceled failed with thread id : 0 the thread may already stopped
thread 1, refreshTimes is 3004
alive thread 1: is 3004ms to refresh, will restart
processor--thread-2sleep interrupted
restart success, thread id is:3
processor--thread-3 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 1996
monitor thread start..., aliveThreadRefreshTimeMap:{3=1546314270053}
thread 3, refreshTimes is 2998
alive thread 3: is 2998ms to refresh, will restart
processor--thread-3sleep interrupted
restart success, thread id is:4
processor--thread-4 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 1000
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 2000
monitor thread start..., aliveThreadRefreshTimeMap:{4=1546314273052}
thread 4, refreshTimes is 2999
alive thread 4: is 2999ms to refresh, will restart
processor--thread-4sleep interrupted
restart success, thread id is:5
processor--thread-5 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 2000
monitor thread start..., aliveThreadRefreshTimeMap:{5=1546314276051}
thread 5, refreshTimes is 3000
alive thread 5: is 3000ms to refresh, will restart
processor--thread-5sleep interrupted
restart success, thread id is:6
processor--thread-6 start sleep 30 s....
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 999
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 1999
monitor thread start..., aliveThreadRefreshTimeMap:{6=1546314279052}
thread 6, refreshTimes is 3000
alive thread 6: is 3000ms to refresh, will restart
processor--thread-6sleep interrupted
restart success, thread id is:7
processor--thread-7 start sleep 30 s....
複製代碼
符合咱們預期,canceled failed with thread id : 0 the thread may already stopped,意味着第一個線程已經跑完了,以後每隔3s,第二個線程都會被中斷而後自動重跑。 設計時,有幾點須要注意:網絡
對於實際業務來講,定製化線程池只是一種經過程序層面解決問題的思路,它也有一些缺點,例如會帶來額外的系統資源的消耗;內部若是經過scheduleAtFixedRate實現的話會有潛在的性能問題,實際上還有不少其它基於業務的方案。咱們應該選擇適合本身業務的解決方式。併發
好比咱們有一個訂單處理系統,每個筆訂單收單後都須要去各個其它服務拿數據,組成一筆有效的訂單,甚至咱們本身也須要根據不一樣的數據進行一些複雜的運算。那麼若是咱們若是不使用這種線程池,徹底能夠藉助其它中間件完成,好比我發現rpc超時時,能夠將訂單信息放入redis,信息裏有本身配置的下次訂單執行時間,再啓一個線程從redis里拉訂單進行補償。但引入redis帶來了系統額外的複雜,但相對於存於內存中的信息,redis對於訂單數據的可靠性反而更強。換句話說,你機器掛了,內存中的數據就沒有了,但若是用redis,結合業務進行控制,反而能夠達到更好的效果。異步