最近因爲業務須要,數據量比較大,須要使用多線程來分批處理,提升處理效率和能力,因而就寫了一個通用的多線程處理工具,只須要實現本身的業務邏輯就能夠正常使用,如今記錄一下java
主要是針對大數據量list,將list劃分多個線程處理json
ResultBean類: 返回結果統一bean多線程
package com.ts.common.model; import java.io.Serializable; import com.alibaba.fastjson.JSON; /** * 返回結果統一bean * * ResultBean<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:49:46 <BR> * @version 2.0 * */ public class ResultBean<T> implements Serializable { private static final long serialVersionUID = 1L; // 成功狀態 public static final int SUCCESS = 1; // 處理中狀態 public static final int PROCESSING = 0; // 失敗狀態 public static final int FAIL = -1; // 描述 private String msg = "success"; // 狀態默認成功 private int code = SUCCESS; // 備註 private String remark; // 返回數據 private T data; public ResultBean() { super(); } public ResultBean(T data) { super(); this.data = data; } /** * 使用異常建立結果 */ public ResultBean(Throwable e) { super(); this.msg = e.toString(); this.code = FAIL; } /** * * 實例化結果默認成功狀態<BR> * 方法名:newInstance<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:51:26 <BR> * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance() { ResultBean<T> instance = new ResultBean<T>(); //默認返回信息 instance.code = SUCCESS; instance.msg = "success"; return instance; } /** * * 實例化結果默認成功狀態和數據<BR> * 方法名:newInstance<BR> * 建立人:wangbeidou <BR> * 時間:2018年5月10日-下午2:13:16 <BR> * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(T data) { ResultBean<T> instance = new ResultBean<T>(); //默認返回信息 instance.code = SUCCESS; instance.msg = "success"; instance.data = data; return instance; } /** * * 實例化返回結果<BR> * 方法名:newInstance<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午4:00:53 <BR> * @param code * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(int code, String msg) { ResultBean<T> instance = new ResultBean<T>(); //默認返回信息 instance.code = code; instance.msg = msg; return instance; } /** * * 實例化返回結果<BR> * 方法名:newInstance<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午4:00:35 <BR> * @param code * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(int code, String msg, T data) { ResultBean<T> instance = new ResultBean<T>(); //默認返回信息 instance.code = code; instance.msg = msg; instance.data = data; return instance; } /** * * 設置返回數據<BR> * 方法名:setData<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:52:01 <BR> * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setData(T data){ this.data = data; return this; } /** * * 設置結果描述<BR> * 方法名:setMsg<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:52:34 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setMsg(String msg){ this.msg = msg; return this; } /** * * 設置狀態<BR> * 方法名:setCode<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午4:17:56 <BR> * @param code * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setCode(int code){ this.code = code; return this; } /** * * 設置備註)<BR> * 方法名:setRemark<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午5:47:29 <BR> * @param remark * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setRemark(String remark){ this.remark = remark; return this; } /** * * 設置成功描述和返回數據<BR> * 方法名:success<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:52:58 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> success(String msg, T data){ this.code = SUCCESS; this.data = data; this.msg = msg; return this; } /** * * 設置成功返回結果描述<BR> * 方法名:success<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:53:31 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> success(String msg){ this.code = SUCCESS; this.msg = msg; return this; } /** * * 設置處理中描述和返回數據<BR> * 方法名:success<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:52:58 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> processing(String msg, T data){ this.code = PROCESSING; this.data = data; this.msg = msg; return this; } /** * * 設置處理中返回結果描述<BR> * 方法名:success<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:53:31 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> processing(String msg){ this.code = PROCESSING; this.msg = msg; return this; } /** * * 設置失敗返回描述和返回數據<BR> * 方法名:fail<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:54:04 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> fail(String msg, T data){ this.code = FAIL; this.data = data; this.msg = msg; return this; } /** * * 設置失敗返回描述<BR> * 方法名:fail<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午3:54:32 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> fail(String msg){ this.code = FAIL; this.msg = msg; return this; } public T getData() { return data; } public String getMsg() { return msg; } public int getCode() { return code; } public String getRemark() { return remark; } /** * * 生成json字符串<BR> * 方法名:json<BR> * 建立人:wangbeidou <BR> * 時間:2018年4月12日-下午4:42:28 <BR> * @return String<BR> * @exception <BR> * @since 2.0 */ public String json(){ return JSON.toJSONString(this); } }
ITask接口: 實現本身的業務ide
package com.ts.common.multi.execute; import java.util.Map; /** * 任務處理接口 * 具體業務邏輯可實現該接口 * T 返回值類型 * E 傳入值類型 * ITask<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月4日-下午6:12:32 <BR> * @version 2.0 * */ public interface ITask<T, E> { /** * * 任務執行方法接口<BR> * 方法名:execute<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月4日-下午6:13:44 <BR> * @param e 傳入對象 * @param params 其餘輔助參數 * @return T<BR> 返回值類型 * @exception <BR> * @since 2.0 */ T execute(E e, Map<String, Object> params); }
HandleCallable類: 實現Callable接口,來處理任務工具
package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ts.common.model.ResultBean; /** * * * HandleCallable<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月4日-上午11:55:41 <BR> * * @version 2.0 * */ @SuppressWarnings("rawtypes") public class HandleCallable<E> implements Callable<ResultBean> { private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 線程名稱 private String threadName = ""; // 須要處理的數據 private List<E> data; // 輔助參數 private Map<String, Object> params; // 具體執行任務 private ITask<ResultBean<String>, E> task; public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean<List<ResultBean<String>>> call() throws Exception { // 該線程中全部數據處理返回結果 ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size()); // 返回結果集 List<ResultBean<String>> resultList = new ArrayList<>(); // 循環處理每一個數據 for (int i = 0; i < data.size(); i++) { // 須要執行的數據 E e = data.get(i); // 將數據執行結果加入到結果集中 resultList.add(task.execute(e, params)); logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1)); } logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size()); resultBean.setData(resultList); } return resultBean; } }
MultiThreadUtils類: 多線程工具類測試
package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ts.common.model.ResultBean; /** * * * MultiThreadUtils<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月8日-下午8:20:42 <BR> * @version 2.0 * */ public class MultiThreadUtils<T> { private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 線程個數,如不賦值,默認爲5 private int threadCount = 5; // 具體業務任務 private ITask<ResultBean<String>, T> task; // 線程池管理器 private CompletionService<ResultBean> pool = null; /** * * 初始化線程池和線程個數<BR> * 方法名:newInstance<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月8日-下午8:22:00 <BR> * @param threadCount * @return MultiThreadUtils<BR> * @exception <BR> * @since 2.0 */ public static MultiThreadUtils newInstance(int threadCount) { MultiThreadUtils instance = new MultiThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } /** * * 多線程分批執行list中的任務<BR> * 方法名:execute<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月8日-下午8:22:31 <BR> * @param data 線程處理的大數據量list * @param params 處理數據是輔助參數傳遞 * @param task 具體執行業務的任務接口 * @return ResultBean<BR> * @exception <BR> * @since 2.0 */ @SuppressWarnings("rawtypes") public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 建立線程池 ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根據線程池初始化線程池管理器 pool = new ExecutorCompletionService<ResultBean>(threadpool); // 開始時間(ms) long l = System.currentTimeMillis(); // 數據量大小 int length = data.size(); // 每一個線程處理的數據個數 int taskCount = length / threadCount; // 劃分每一個線程調用的數據 for (int i = 0; i < threadCount; i++) { // 每一個線程任務數據list List<T> subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 將數據分配給各個線程 HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task); // 將線程加入到線程池 pool.submit(execute); } // 總的返回結果集 List<ResultBean<String>> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每一個線程處理結果集 ResultBean<List<ResultBean<String>>> threadResult; try { threadResult = pool.take().get(); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 關閉線程池 threadpool.shutdownNow(); // 執行結束時間 long end_l = System.currentTimeMillis(); logger.info("總耗時:{}ms", (end_l - l)); return ResultBean.newInstance().setData(result); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }
測試類TestTask大數據
package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.ts.common.model.ResultBean; /** * * 具體執行業務任務 須要 實現ITask接口 在execute中重寫業務邏輯 * TestTask<BR> * 建立人:wangbeidou <BR> * 時間:2018年8月8日-下午8:40:32 <BR> * @version 2.0 * */ public class TestTask implements ITask<ResultBean<String>, Integer> { @Override public ResultBean execute(Integer e, Map<String, Object> params) { /** * 具體業務邏輯:將list中的元素加上輔助參數中的數據返回 */ int addNum = Integer.valueOf(String.valueOf(params.get("addNum"))); e = e + addNum; ResultBean<String> resultBean = ResultBean.newInstance(); resultBean.setData(e.toString()); return resultBean; } public static void main(String[] args) { // 須要多線程處理的大量數據list List<Integer> data = new ArrayList<>(10000); for(int i = 0; i < 10000; i ++){ data.add(i + 1); } // 建立多線程處理任務 MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5); ITask<ResultBean<String>, Integer> task = new TestTask(); // 輔助參數 加數 Map<String, Object> params = new HashMap<>(); params.put("addNum", 4); // 執行多線程處理,並返回處理結果 ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task); } }