ThreadUtil 多線程處理List,回調處理具體的任務

每次想多線程處理一個大的結果集的時候 都須要寫一大堆代碼,本身寫了個工具類 方便使用java

package com.guige.fss.common.util;


import com.guige.fss.common.exception.BusinessException;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Created by admin on 2018/6/5.
 * @author 宋安偉
 */
public class ThreadUtil {
    //建立定長線程池,初始化線程
    private static Logger log = LoggerFactory.getLogger(ThreadUtil.class);

    /**
     * 對List進行多線程處理(限制 對List只讀 若是想修改List 能夠處理完畢後把要修改或刪除的List返回 多線程執行完後再修改或刪除)
     * @param list 要處理的List
     * @param threadSize 用幾個線程處理
     * @param threadLoadback 處理的回調(具體業務員)
     * @param <T> 每一個回調的返回結果
     * @param <V> List<V>的泛型
     * @return
     */
    public static <T,V>List<T> executorsTasks(final List<V> list,final  int threadSize,final  ThreadLoadback<T,V> threadLoadback){
        // 開始時間
        long start = System.currentTimeMillis();
        // 總數據條數
        int dataSize = list.size();
        // 線程數
        int threadNum = dataSize / threadSize + 1;
        // 定義標記,過濾threadNum爲整數
        boolean special = dataSize % threadSize == 0;
        // 建立一個線程池
        ExecutorService exec = Executors.newFixedThreadPool(threadNum);
        // 定義一個任務集合
        List<Callable<T>> tasks = new ArrayList<Callable<T>>();
        Callable<T> task = null;
        List cutList = null;

        for (int i = 0; i < threadNum; i++) {
            if (i == threadNum - 1) {
                if (special) {
                    break;
                }
                cutList = list.subList(threadSize * i, dataSize);
            } else {
                cutList = list.subList(threadSize * i, threadSize * (i + 1));
            }
            // System.out.println("第" + (i + 1) + "組:" + cutList.toString());
            final List listStr = cutList;
            task = new Callable<T>() {
                @Override
                public T  call() throws Exception {
                    // System.out.println(Thread.currentThread().getName() + "線程:" + listStr);
                return (T) threadLoadback.load(listStr);
                      //  return


                }
            };
            // 這裏提交的任務容器列表和返回的Future列表存在順序對應的關係
            tasks.add(task);
        }
        List<Future<T>> resultsFuture = null;
        try {
            log.debug("線程任務執行開始:任務數"+tasks.size());
            resultsFuture = exec.invokeAll(tasks);
            List<T> results = new ArrayList<>();
            for (Future<T> future : resultsFuture) {
                T result=future.get();
                if(result!=null) {
                    results.add(result);
                }
            }
            return results;

        } catch (Exception e) {
            e.printStackTrace();
            throw new BusinessException(e.getMessage());
        }finally {
            // 關閉線程池
            exec.shutdown();
            log.debug("線程任務執行結束");
            log.debug("執行任務消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        }

    }

    interface ThreadLoadback<T,V> {
        T load(List<V> list) throws Exception;
    }


    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        for(int i=0;i<1000;i++){
            list.add("i="+i);
        }
     List<List<Integer>> resultList=   ThreadUtil.executorsTasks(list, 10, new ThreadLoadback<List<Integer>, String>() {
         @Override
         public List<Integer> load(List<String> list) throws Exception {
                List<Integer> result= new ArrayList<>();
                for(String str:list){
                    str= str.replaceAll("i=","");
                    result.add(Integer.parseInt(str));
                    System.out.println(Thread.currentThread().getName()+"休息1秒");
                    Thread.sleep(1000L);
                }
             return result;
         }
     });
      if(!CollectionUtils.isEmpty(resultList)){
          List<Integer> integers = new ArrayList<>();
          resultList.stream().forEach(items -> {
                      if (!CollectionUtils.isEmpty(resultList)) {
                          items.stream().forEach(item -> {
                              integers.add(item);

                          });
                      }
                  }
          );
          integers.stream().forEach(item->System.out.println(item));

      }
    }


}
相關文章
相關標籤/搜索