使用java多線程分批處理數據工具類

最近因爲業務須要,數據量比較大,須要使用多線程來分批處理,提升處理效率和能力,因而就寫了一個通用的多線程處理工具,只須要實現本身的業務邏輯就能夠正常使用,如今記錄一下java

主要是針對大數據量list,將list劃分多個線程處理json

ResultBean類: 返回結果統一bean多線程

package com.ts.common.model;

import java.io.Serializable;

import com.alibaba.fastjson.JSON;


/**
 * 返回結果統一bean
 * 
 * ResultBean<BR>
 * 建立人:wangbeidou <BR>
 * 時間:2018年4月12日-下午3:49:46 <BR>
 * @version 2.0
 *
 */
public class ResultBean<T> implements Serializable {

    private static final long serialVersionUID = 1L;
    // 成功狀態
    public static final int SUCCESS = 1;
    // 處理中狀態
    public static final int PROCESSING = 0;
    // 失敗狀態
    public static final int FAIL = -1;

    // 描述
    private String msg = "success";
    // 狀態默認成功
    private int code = SUCCESS;
    // 備註
    private String remark;
    // 返回數據
    private T data;

    public ResultBean() {
        super();
    }

    public ResultBean(T data) {
        super();
        this.data = data;
    }

    /**
     * 使用異常建立結果
     */
    public ResultBean(Throwable e) {
        super();
        this.msg = e.toString();
        this.code = FAIL;
    }
    
    /**
     * 
     * 實例化結果默認成功狀態<BR>
     * 方法名:newInstance<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:51:26 <BR>
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance() {
        ResultBean<T> instance = new ResultBean<T>();
        //默認返回信息
        instance.code = SUCCESS;
        instance.msg = "success";
        return instance;
    }
    
    /**
     * 
     * 實例化結果默認成功狀態和數據<BR>
     * 方法名:newInstance<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年5月10日-下午2:13:16 <BR>
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance(T data) {
        ResultBean<T> instance = new ResultBean<T>();
        //默認返回信息
        instance.code = SUCCESS;
        instance.msg = "success";
        instance.data = data;
        return instance;
    }
    
    /**
     * 
     * 實例化返回結果<BR>
     * 方法名:newInstance<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午4:00:53 <BR>
     * @param code
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance(int code, String msg) {
        ResultBean<T> instance = new ResultBean<T>();
        //默認返回信息
        instance.code = code;
        instance.msg = msg;
        return instance;
    }
    
    /**
     * 
     * 實例化返回結果<BR>
     * 方法名:newInstance<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午4:00:35 <BR>
     * @param code
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance(int code, String msg, T data) {
        ResultBean<T> instance = new ResultBean<T>();
        //默認返回信息
        instance.code = code;
        instance.msg = msg;
        instance.data = data;
        return instance;
    }
    
    /**
     * 
     * 設置返回數據<BR>
     * 方法名:setData<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:52:01 <BR>
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setData(T data){
        this.data = data;
        return this;
    }
    
    /**
     * 
     * 設置結果描述<BR>
     * 方法名:setMsg<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:52:34 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setMsg(String msg){
        this.msg = msg;
        return this;
    }
    
    /**
     * 
     * 設置狀態<BR>
     * 方法名:setCode<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午4:17:56 <BR>
     * @param code
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setCode(int code){
        this.code = code;
        return this;
    }
    
    /**
     * 
     * 設置備註)<BR>
     * 方法名:setRemark<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午5:47:29 <BR>
     * @param remark
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setRemark(String remark){
        this.remark = remark;
        return this;
    }
    
    /**
     * 
     * 設置成功描述和返回數據<BR>
     * 方法名:success<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:52:58 <BR>
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> success(String msg, T data){ 
        this.code = SUCCESS;
        this.data = data;
        this.msg = msg;
        return this;  
    } 
    
    /**
     * 
     * 設置成功返回結果描述<BR>
     * 方法名:success<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:53:31 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> success(String msg){ 
        this.code = SUCCESS;
        this.msg = msg;
        return this;  
    }
    
    /**
     * 
     * 設置處理中描述和返回數據<BR>
     * 方法名:success<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:52:58 <BR>
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> processing(String msg, T data){ 
        this.code = PROCESSING;
        this.data = data;
        this.msg = msg;
        return this;  
    } 
    
    /**
     * 
     * 設置處理中返回結果描述<BR>
     * 方法名:success<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:53:31 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> processing(String msg){ 
        this.code = PROCESSING;
        this.msg = msg;
        return this;  
    }
    
    /**
     * 
     * 設置失敗返回描述和返回數據<BR>
     * 方法名:fail<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:54:04 <BR>
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> fail(String msg, T data){ 
        this.code = FAIL;
        this.data = data;
        this.msg = msg;
        return this;  
    } 
    
    /**
     * 
     * 設置失敗返回描述<BR>
     * 方法名:fail<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午3:54:32 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> fail(String msg){ 
        this.code = FAIL;
        this.msg = msg;
        return this;  
    }
    
    public T getData() {  
        return data;  
    }  
    public String getMsg() {  
        return msg;  
    }  
    public int getCode() {  
        return code;  
    }  
    public String getRemark() {  
        return remark;  
    }  
    
    /**
     * 
     * 生成json字符串<BR>
     * 方法名:json<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年4月12日-下午4:42:28 <BR>
     * @return String<BR>
     * @exception <BR>
     * @since  2.0
     */
    public String json(){
        return JSON.toJSONString(this);
    }
}
View Code

ITask接口: 實現本身的業務ide

package com.ts.common.multi.execute;

import java.util.Map;

/**
 * 任務處理接口
 * 具體業務邏輯可實現該接口
 *  T 返回值類型
 *  E 傳入值類型
 * ITask<BR>
 * 建立人:wangbeidou <BR>
 * 時間:2018年8月4日-下午6:12:32 <BR>
 * @version 2.0
 *
 */
public interface ITask<T, E> {
    
    /**
     * 
     * 任務執行方法接口<BR>
     * 方法名:execute<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年8月4日-下午6:13:44 <BR>
     * @param e 傳入對象
     * @param params 其餘輔助參數
     * @return T<BR> 返回值類型
     * @exception <BR>
     * @since  2.0
     */
    T execute(E e, Map<String, Object> params);
}
View Code

HandleCallable類: 實現Callable接口,來處理任務工具

package com.ts.common.multi.execute;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ts.common.model.ResultBean;

/**
 * 
 * 
 * HandleCallable<BR>
 * 建立人:wangbeidou <BR>
 * 時間:2018年8月4日-上午11:55:41 <BR>
 * 
 * @version 2.0
 *
 */
@SuppressWarnings("rawtypes")
public class HandleCallable<E> implements Callable<ResultBean> {
    private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);
    // 線程名稱 
    private String threadName = "";
    // 須要處理的數據
    private List<E> data;
    // 輔助參數
    private Map<String, Object> params;
    // 具體執行任務
    private ITask<ResultBean<String>, E> task;

    public HandleCallable(String threadName, List<E> data, Map<String, Object> params,
            ITask<ResultBean<String>, E> task) {
        this.threadName = threadName;
        this.data = data;
        this.params = params;
        this.task = task;
    }

    @Override
    public ResultBean<List<ResultBean<String>>> call() throws Exception {
        // 該線程中全部數據處理返回結果
        ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();
        if (data != null && data.size() > 0) {
            logger.info("線程:{},共處理:{}個數據,開始處理......", threadName, data.size());
            // 返回結果集
            List<ResultBean<String>> resultList = new ArrayList<>();
            // 循環處理每一個數據
            for (int i = 0; i < data.size(); i++) {
                // 須要執行的數據
                E e = data.get(i);
                // 將數據執行結果加入到結果集中
                resultList.add(task.execute(e, params));
                logger.info("線程:{},第{}個數據,處理完成", threadName, (i + 1));
            }
            logger.info("線程:{},共處理:{}個數據,處理完成......", threadName, data.size());
            resultBean.setData(resultList);
        }
        return resultBean;
    }

}
View Code

MultiThreadUtils類: 多線程工具類測試

package com.ts.common.multi.execute;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ts.common.model.ResultBean;

/**
 * 
 * 
 * MultiThreadUtils<BR>
 * 建立人:wangbeidou <BR>
 * 時間:2018年8月8日-下午8:20:42 <BR>
 * @version 2.0
 *
 */
public class MultiThreadUtils<T> {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);

    // 線程個數,如不賦值,默認爲5
    private int threadCount = 5;
    // 具體業務任務
    private ITask<ResultBean<String>, T> task;
    // 線程池管理器
    private CompletionService<ResultBean> pool = null;

    /**
     * 
     * 初始化線程池和線程個數<BR>
     * 方法名:newInstance<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年8月8日-下午8:22:00 <BR>
     * @param threadCount
     * @return MultiThreadUtils<BR>
     * @exception <BR>
     * @since  2.0
     */
    public static MultiThreadUtils newInstance(int threadCount) {
        MultiThreadUtils instance = new MultiThreadUtils();
        threadCount = threadCount;
        instance.setThreadCount(threadCount);

        return instance;
    }

    /**
     * 
     * 多線程分批執行list中的任務<BR>
     * 方法名:execute<BR>
     * 建立人:wangbeidou <BR>
     * 時間:2018年8月8日-下午8:22:31 <BR>
     * @param data  線程處理的大數據量list
     * @param params    處理數據是輔助參數傳遞
     * @param task        具體執行業務的任務接口
     * @return ResultBean<BR>    
     * @exception <BR>
     * @since  2.0
     */
    @SuppressWarnings("rawtypes")
    public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {
        // 建立線程池
        ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);
        // 根據線程池初始化線程池管理器
        pool = new ExecutorCompletionService<ResultBean>(threadpool);
        // 開始時間(ms)
        long l = System.currentTimeMillis();
        // 數據量大小
        int length = data.size();
        // 每一個線程處理的數據個數
        int taskCount = length / threadCount;
        // 劃分每一個線程調用的數據
        for (int i = 0; i < threadCount; i++) {
            // 每一個線程任務數據list
            List<T> subData = null;
            if (i == (threadCount - 1)) {
                subData = data.subList(i * taskCount, length);
            } else {
                subData = data.subList(i * taskCount, (i + 1) * taskCount);
            }
            // 將數據分配給各個線程
            HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);
            // 將線程加入到線程池
            pool.submit(execute);
        }
        
        // 總的返回結果集
        List<ResultBean<String>> result = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            // 每一個線程處理結果集
            ResultBean<List<ResultBean<String>>> threadResult;
            try {
                threadResult = pool.take().get();
                result.addAll(threadResult.getData());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }
        // 關閉線程池
        threadpool.shutdownNow();
        // 執行結束時間
        long end_l = System.currentTimeMillis();
        logger.info("總耗時:{}ms", (end_l - l));
        return ResultBean.newInstance().setData(result);
    }

    public int getThreadCount() {
        return threadCount;
    }

    public void setThreadCount(int threadCount) {
        this.threadCount = threadCount;
    }

}
View Code

測試類TestTask大數據

package com.ts.common.multi.execute;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.ts.common.model.ResultBean;

/**
 * 
 * 具體執行業務任務 須要 實現ITask接口  在execute中重寫業務邏輯
 * TestTask<BR>
 * 建立人:wangbeidou <BR>
 * 時間:2018年8月8日-下午8:40:32 <BR>
 * @version 2.0
 *
 */
public class TestTask implements ITask<ResultBean<String>, Integer> {

    @Override
    public ResultBean execute(Integer e, Map<String, Object> params) {
        /**
         * 具體業務邏輯:將list中的元素加上輔助參數中的數據返回
         */
        int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));
        e = e + addNum;
        ResultBean<String> resultBean = ResultBean.newInstance();
        resultBean.setData(e.toString());
        return resultBean;
    }
    
    public static void main(String[] args) {
        // 須要多線程處理的大量數據list
        List<Integer> data = new ArrayList<>(10000);
        for(int i = 0; i < 10000; i ++){
            data.add(i + 1);
        }
        
        // 建立多線程處理任務
        MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5);
        ITask<ResultBean<String>, Integer> task = new TestTask();
        // 輔助參數  加數
        Map<String, Object> params = new HashMap<>();
        params.put("addNum", 4);
        // 執行多線程處理,並返回處理結果
        ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task);
    }


}
相關文章
相關標籤/搜索