【原】經過多線程分批處理派發任務

 前言:

   前幾天有其餘部門人員反應派發紅包很慢,常常出現504或者無響應,因而由我這邊進行一個優化後,發放速度由原來的超時或者1分鐘變爲幾秒。java

  發放流程:

         活動後臺導入一個xls表格,大概2W左右條,通過後臺的篩選處理等邏輯後會循環調用插入數據庫的代碼。數據庫

   優化過程:

         分析慢的緣由:

                                  1.這套系統是N年前的系統,舊的功能無人維護優化,裏面大量使用了jdbcTemplate。api

                                  2.發放的時候循環發放,致使要循環2W次,並且每次插入都是須要新建一個對象,而後往裏面set數據,而後調用 jdbcTemplate 入庫。多線程

          優化思路 :

      1. jdbcTemplate 有個batchUpdate的api,能夠經過這個api完成批處理                 ide

      2. 測試後發現batchUpdate仍是會有瓶頸,在數據大的時候仍是有點慢,是否考慮經過多線程拆分大的單元,相似於 jdk的 forkJoin,而後每一個線程處理一批,最後的結果經過回調統一歸併。

   代碼片斷:

  •  #抽取出一個公共的接口,用於調用具體的處理方法。

    

public interface ITask<T, E> {
    * @param e 傳入對象
* @param params 其餘輔助參數
* @return T<BR> 返回值類型
* @exception <BR>
* @since 2.0
T execute(List e, Map<String, Object> params); }
  •     #因爲發放紅包須要實時展現給發放人員看,因此須要有回調處理函數,能夠將不一樣結果的線程收集起來統一給主線程返回,因此新建一個類實現 Callable。
public class HandleCallable<E> implements Callable<ResultBean> {
    private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);
    // 線程名稱 
    private String threadName = "";
    // 須要處理的數據
    private List<E> data;
    // 輔助參數
    private Map<String, Object> params;
    // 具體執行任務
    private ITask<ResultBean<String>, E> task;

    public HandleCallable(String threadName, List<E> data, Map<String, Object> params,
            ITask<ResultBean<String>, E> task) {
        this.threadName = threadName;
        this.data = data;
        this.params = params;
        this.task = task;
    }

    @Override
    public ResultBean<List<ResultBean<String>>> call() throws Exception {
        // 該線程中全部數據處理返回結果
        ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();
        if (data != null && data.size() > 0) {
            logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size());
            // 返回結果集
            List<ResultBean<String>> resultList = new ArrayList<>();
            resultList.add(task.execute(data, params));
            /*resultList.add(task.execute(data, params));*/
            // 循環處理每一個數據
          /*  for (int i = 0; i < data.size(); i++) {
                // 須要執行的數據
                E e = data.get(i);
                // 將數據執行結果加入到結果集中
                resultList.add(task.execute(e, params));
                logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1));
            }*/
            logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size());
            resultBean.setData(resultList);
            resultBean.setCode(data.size());
        }
        return resultBean;
    }

}
 
  •        #具體的處理線程的類有了,那麼還須要考慮誰來切分任務,因此新建一個 線程工具類,主要業務就是切分任務,建立具體的線程個數,統一收集結果。
package com.i2p.util; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.i2p.util.thread.ITask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * * MultiThreadUtils<BR> * 建立人:dd <BR> * 時間:2019年1月11日 14:09:38 <BR> * @version 2.0 * */ public class MultiThreadUtils<T> { private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 線程個數,如不賦值,默認爲5 private int threadCount = 20; // 具體業務任務 // private ITask<ResultBean<String>, T> task; // 線程池管理器 private CompletionService<ResultBean> pool = null; /** * * 初始化線程池和線程個數<BR> * 方法名:newInstance<BR> * 建立人:dd <BR> * 時間:2018年8月8日-下午8:22:00 <BR> * @param threadCount * @return MultiThreadUtils<BR> * @exception <BR> * @since 2.0 */ public static MultiThreadUtils newInstance(int threadCount) { MultiThreadUtils instance = new MultiThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } /** * * 多線程分批執行list中的任務<BR> * 方法名:execute<BR> * 建立人:dd <BR> * 時間:2019年1月10日 09:00:24 <BR> * @param data 線程處理的大數據量list * @param params 處理數據是輔助參數傳遞 * @param task 具體執行業務的任務接口 * @return ResultBean<BR> * @exception <BR> * @since 2.0 */ @SuppressWarnings("rawtypes") public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 建立線程池 int num = 0; ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根據線程池初始化線程池管理器 pool = new ExecutorCompletionService<ResultBean>(threadpool); // 開始時間(ms) long l = System.currentTimeMillis(); // 數據量大小 int length = data.size(); // 每一個線程處理的數據個數 int taskCount = length / threadCount; // 劃分每一個線程調用的數據 for (int i = 0; i < threadCount; i++) { // 每一個線程任務數據list List<T> subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 將數據分配給各個線程 HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task); // 將線程加入到線程池  pool.submit(execute); } // 總的返回結果集 List<ResultBean<String>> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每一個線程處理結果集 ResultBean<List<ResultBean<String>>> threadResult; try { threadResult = pool.take().get(); if(threadResult!=null && threadResult.getData()!=null){ System.out.println("======線程" + i + "執行完畢,返回結果數據:" + threadResult.getCode()); List<ResultBean<String>> list = threadResult.getData(); num+=threadResult.getCode(); } System.out.println("每一個線程處理結果集"+threadResult.getData()); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 關閉線程池  threadpool.shutdownNow(); // 執行結束時間 long end_l = System.currentTimeMillis(); logger.info("總耗時:{}ms", (end_l - l)); logger.info("總數量:{}num:", num); return ResultBean.newInstance().setData(num); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }
相關文章
相關標籤/搜索