線程池優化之充分利用線程池資源

1、前言

  最近作了電子發票的需求,分省開票接口和發票下載接口都有必定的延遲。爲了完成開票後自動將發票插入用戶微信卡包,目前的解決方案是利用線程池,將開票後插入卡包的任務(輪詢分省發票接口,直到獲取到發票相關信息或者輪詢次數用完,若是獲取到發票信息,執行發票插入微信卡包,結束任務)放入線程池異步執行。仔細想想,這種實現方案存在一個問題,線程池沒有充分的利用。爲何沒有充分的利用?下面詳細的分析。html

2、異步線程池和異步任務包裝

  AsyncConfigurerSupport能夠幫咱們指定異步任務(注有@Async註解)對應的線程池。微信

@Configuration
public class MyAsyncConfigurer extends AsyncConfigurerSupport {
    private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class);

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setMaxPoolSize(4);
        taskExecutor.setQueueCapacity(10);
        taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("異步線程池拒絕任務..." + runnable));
        taskExecutor.setThreadFactory(new MyAsyncThreadFactory());
        taskExecutor.initialize();
        return taskExecutor;
    }

    static class MyAsyncThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        MyAsyncThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "myasync-pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}

  異步任務包裝,除了異步,還加入了retry功能,實現指定次數的接口輪詢。app

@Component
public class AsyncWrapped {
    protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class);

    @Async
    public void asyncProcess(Runnable runnable, Callback callback, Retry retry) {
        try {
            if (retry == null) {
                retry = new Retry(1);
            }
            retry.execute(ctx -> {
                runnable.run();
                return null;
            }, ctx -> {
                if (callback != null) {
                    callback.call();
                }
                return null;
            });
        } catch (Exception e) {
            LOGGER.error("異步調用異常...", e);
        }
    }
}

  業務代碼大體邏輯以下。異步

asyncWrapped.asyncProcess(() -> {
        //調用分省接口獲取發票信息
        //若是發票信息異常,拋出異常(進入下次重試)
        //不然,插入用戶微信卡包
    }, () -> {
        //輪詢次數用盡,用戶插入卡包失敗
    }
    , new Retry(2, 1000)
);

  這裏說一下爲何線程池沒有充分的利用。異步任務中包含輪詢操做,輪詢有必定的時間間隔,致使在這段時間間隔內,線程一直處於被閒置的狀態。因此爲了能更好的利用線程池資源,咱們得想辦法解決時間間隔的問題。假若有個延遲隊列,隊列裏放着咱們的異步任務(不包含重試機制),而後延遲(輪詢的時間間隔)必定時間以後,將任務放入線程池中執行,任務執行完畢以後根據是否須要再次執行決定是否再次放入到延遲隊列去,這樣每一個線程池中的線程都不會閒着,達到了充分利用的目的。async

3、定時任務線程池和實現輪詢機制

  @EnableScheduling 幫助開啓@Scheduled註解解析。註冊一個名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定時任務線程池。ide

@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {

    @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledExecutorService scheduledAnnotationProcessor() {
        return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-schedule-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

   實現輪詢任務,實現接口SchedulingConfigurer,獲取ScheduledTaskRegistrar 並指定定時任務線程池。post

@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
    this.registrar = registrar;
    this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));
    scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper();
}

  scheduledFutures提交定時任務時返回結果集,periodTasks 定時任務結果集。優化

private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();

  定時任務包裝類,包含任務的執行次數(重試次數)、重試間隔、具體任務、重試次數用盡以後的回調等,以及自動結束定時任務、重試計數重置功能。ui

private static class TimingTask {
    //重試次數
    private Integer retry;
    //任務標識
    private String taskId;
    //重試間隔
    private Long period;
    //具體任務
    private ScheduledRunnable task;
    //結束回調
    private ScheduledCallback callback;
    //重試計數
    private AtomicInteger count = new AtomicInteger(0);
    //父線程MDC
    private Map<String, String> curContext;

    public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {
        this.retry = retry;
        this.taskId = taskId;
        this.period = period;
        this.task = task;
        this.callback = callback;
        this.curContext = MDC.getCopyOfContextMap();
    }

    public Long getPeriod() {
        return period;
    }

    public void setPeriod(Long period) {
        this.period = period;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public Integer getRetry() {
        return retry;
    }

    public void setRetry(Integer retry) {
        this.retry = retry;
    }

    public AtomicInteger getCount() {
        return count;
    }

    public boolean reset() {
        for (int cnt = this.count.intValue(); cnt < this.retry; cnt = this.count.intValue()) {
            if (this.count.compareAndSet(cnt, 0)) {
                return true;
            }
        }
        return false;
    }

    public void process() {
        Map<String, String> preContext = MDC.getCopyOfContextMap();
        try {
            if (this.curContext == null) {
                MDC.clear();
            } else {
                // 將父線程的MDC內容傳給子線程
                MDC.setContextMap(this.curContext);
            }
            this.task.run();
            exitTask(false);
        } catch (Exception e) {
            LOGGER.error("定時任務異常..." + this, e);
            if (count.incrementAndGet() >= this.retry) {
                exitTask(true);
            }
        } finally {
            if (preContext == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(preContext);
            }
        }
    }

    //定時任務退出
    private void exitTask(boolean execCallback) {
        scheduledFutures.get(this.taskId).cancel(false);
        scheduledFutures.remove(this.getTaskId());
        periodTasks.remove(this.getTaskId());
        LOGGER.info("結束定時任務: " + this);
        if (execCallback && callback != null) {
            callback.call();
        }
    }

    @Override
    public String toString() {
        return ReflectionToStringBuilder.toString(this
                , ToStringStyle.JSON_STYLE
                , false
                , false
                , TimingTask.class);
    }
}

  注意上面定時任務是如何退出的,是在某一次任務執行成功以後(沒有異常拋出)或者定時任務執行次數用盡才退出的。直接調用ScheduledFuture的cancel方法能夠退出定時任務。還有就是定時任務中的日誌須要父線程中的日誌變量,因此須要對MDC進行一下處理。this

@Scope("prototype")
@Bean
public AspectTimingTask aspectTimingTask() {
    return new AspectTimingTask();
}

@Aspect
@Component
public static class ScheduledAspect {
    @Around("target(AspectTimingTask)")
    public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {
            MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;
            Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod();
            if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {
                LOGGER.info("電子發票定時任務日誌同步...");
                //其餘處理
            }
        }
        return proceedingJoinPoint.proceed();
    }
}

public static class AspectTimingTask implements Runnable {
    private TimingTask timingTask;

    @Override
    @ScheduledTask
    public void run() {
        timingTask.process();
    }

    public void setTimingTask(TimingTask timingTask) {
        this.timingTask = timingTask;
    }
}

  AspectTimingTask 是對TimingTask 的包裝類,實現了Runnable接口。主要是爲了對run接口作一層切面,獲取ProceedingJoinPoint 實例(公司中的日誌調用鏈系統須要這個參數)。AspectTimingTask 的bean實例的scope是prototype,這個注意下。

public static void register(Integer retry
        , Long period
        , String taskId
        , ScheduledRunnable task
        , ScheduledCallback callback) {
    scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback);
}

private class ScheduledTaskRegistrarHelper {
    public void register(Integer retry
            , String taskId
            , Long period
            , ScheduledRunnable task
            , ScheduledCallback callback) {
        //是否能夠重置定時任務
        TimingTask preTask = periodTasks.get(taskId);
        if (null != preTask
                && preTask.reset()
                && existTask(taskId)) {
            return;
        }

        TimingTask curTask = new TimingTask(retry, taskId, period, task, callback);
        AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class);
        aspectTimingTask.setTimingTask(curTask);
        ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);
        scheduledFutures.put(taskId, scheduledFuture);
        periodTasks.put(taskId, curTask);
        LOGGER.info("註冊定時任務: " + curTask);
    }

    private boolean existTask(String taskId) {
        return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId);
    }
}

  若是taskId的定時任務已經存在則重置定時任務,不然註冊新的定時任務。AspectTimingTask 實例經過ApplicationContext獲取,每次獲取都是一個新的實例。

  由 異步輪詢任務 優化成 定時任務,充分利用了線程池。修改以後的業務代碼以下。

ScheduledTaskRegistrarHelper.register(10
    , 5*1000L
    , "taskId"
    , () -> {
        //調用分省接口獲取發票信息
        //若是發票信息異常,拋出異常(進入下次重試)
        //不然,插入用戶微信卡包
    }
    () -> {
        //輪詢次數用盡,用戶插入卡包失敗
    }
);

  針對電子發票插入微信卡包定時任務,重試執行次數10次,每隔5秒執行一次。任務完成以後結束定時任務,執行次數用盡以後觸發插入卡包失敗動做。

4、參考  

     Spring異步調用原理及SpringAop攔截器鏈原理

     Springboot定時任務原理及如何動態建立定時任務

相關文章
相關標籤/搜索