前幾天有其餘部門人員反應派發紅包很慢,常常出現504或者無響應,因而由我這邊進行一個優化後,發放速度由原來的超時或者1分鐘變爲幾秒。java
活動後臺導入一個xls表格,大概2W左右條,通過後臺的篩選處理等邏輯後會循環調用插入數據庫的代碼。數據庫
1.這套系統是N年前的系統,舊的功能無人維護優化,裏面大量使用了jdbcTemplate。api
2.發放的時候循環發放,致使要循環2W次,並且每次插入都是須要新建一個對象,而後往裏面set數據,而後調用 jdbcTemplate 入庫。多線程
jdbcTemplate 有個batchUpdate的api,能夠經過這個api完成批處理 ide
public interface ITask<T, E> { * @param e 傳入對象
* @param params 其餘輔助參數
* @return T<BR> 返回值類型
* @exception <BR>
* @since 2.0 T execute(List e, Map<String, Object> params); }
public class HandleCallable<E> implements Callable<ResultBean> { private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 線程名稱 private String threadName = ""; // 須要處理的數據 private List<E> data; // 輔助參數 private Map<String, Object> params; // 具體執行任務 private ITask<ResultBean<String>, E> task; public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean<List<ResultBean<String>>> call() throws Exception { // 該線程中全部數據處理返回結果 ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size()); // 返回結果集 List<ResultBean<String>> resultList = new ArrayList<>(); resultList.add(task.execute(data, params)); /*resultList.add(task.execute(data, params));*/ // 循環處理每一個數據 /* for (int i = 0; i < data.size(); i++) { // 須要執行的數據 E e = data.get(i); // 將數據執行結果加入到結果集中 resultList.add(task.execute(e, params)); logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1)); }*/ logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size()); resultBean.setData(resultList); resultBean.setCode(data.size()); } return resultBean; } }
package com.i2p.util; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.i2p.util.thread.ITask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * * MultiThreadUtils<BR> * 建立人:dd <BR> * 時間:2019年1月11日 14:09:38 <BR> * @version 2.0 * */ public class MultiThreadUtils<T> { private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 線程個數,如不賦值,默認爲5 private int threadCount = 20; // 具體業務任務 // private ITask<ResultBean<String>, T> task; // 線程池管理器 private CompletionService<ResultBean> pool = null; /** * * 初始化線程池和線程個數<BR> * 方法名:newInstance<BR> * 建立人:dd <BR> * 時間:2018年8月8日-下午8:22:00 <BR> * @param threadCount * @return MultiThreadUtils<BR> * @exception <BR> * @since 2.0 */ public static MultiThreadUtils newInstance(int threadCount) { MultiThreadUtils instance = new MultiThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } /** * * 多線程分批執行list中的任務<BR> * 方法名:execute<BR> * 建立人:dd <BR> * 時間:2019年1月10日 09:00:24 <BR> * @param data 線程處理的大數據量list * @param params 處理數據是輔助參數傳遞 * @param task 具體執行業務的任務接口 * @return ResultBean<BR> * @exception <BR> * @since 2.0 */ @SuppressWarnings("rawtypes") public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 建立線程池 int num = 0; ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根據線程池初始化線程池管理器 pool = new ExecutorCompletionService<ResultBean>(threadpool); // 開始時間(ms) long l = System.currentTimeMillis(); // 數據量大小 int length = data.size(); // 每一個線程處理的數據個數 int taskCount = length / threadCount; // 劃分每一個線程調用的數據 for (int i = 0; i < threadCount; i++) { // 每一個線程任務數據list List<T> subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 將數據分配給各個線程 HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task); // 將線程加入到線程池 pool.submit(execute); } // 總的返回結果集 List<ResultBean<String>> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每一個線程處理結果集 ResultBean<List<ResultBean<String>>> threadResult; try { threadResult = pool.take().get(); if(threadResult!=null && threadResult.getData()!=null){ System.out.println("======線程" + i + "執行完畢,返回結果數據:" + threadResult.getCode()); List<ResultBean<String>> list = threadResult.getData(); num+=threadResult.getCode(); } System.out.println("每一個線程處理結果集"+threadResult.getData()); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 關閉線程池 threadpool.shutdownNow(); // 執行結束時間 long end_l = System.currentTimeMillis(); logger.info("總耗時:{}ms", (end_l - l)); logger.info("總數量:{}num:", num); return ResultBean.newInstance().setData(num); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }