場景 java
=====編程
在咱們實際開發過程當中,每每會遇到執行接口邏輯以及批任務處理的的執行效率問題,在這些場景中,均可以經過使用多線程的方式,把佔據長時間的程序中的任務放到後臺去處理,更好的發揮計算機的多核cpu的優點。緩存
這篇文章只介紹開發過程當中實用的多線程代碼的三種編寫方法和實踐過程,參數說明、線程安全、多線程之間的調度策略和狀態同步這裏就很少介紹了,會在後面的文章中加以詳細說明。安全
CompletableFuture是java8新增長的類,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力。多線程
下面是簡單使用CompletableFuture進行異步任務的執行。app
@Configuration public class AsyncConfiguration { /\*\*異步執行的線程池 \*/ @Bean public TaskExecutor dataAsyncTaskExecutor(DataImportProperties importProperties){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(importProperties.getThreadNumber()); taskExecutor.setMaxPoolSize(50); taskExecutor.setThreadGroupName("data-async-importer"); taskExecutor.setThreadNamePrefix("data-async"); taskExecutor.initialize(); return taskExecutor; } }
@Resource(name = "dataAsyncTaskExecutor") private TaskExecutor taskExecutor; //異步任務 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->1,taskExecutor); //阻塞異步任務獲取結果 future.get();
若是使用CompletableFuture過程當中不傳入本身封裝的線程池,CompletableFuture會使用ForkJoinPool.commonPool(),它是一個會被不少任務 共享 的線程池,好比同一 JVM 上的全部 CompletableFuture、並行 Stream 都將共享 commonPool,除此以外,應用代碼也能使用它。框架
這種是你們比較經常使用的異步執行任務的作法。代碼比較直觀,更容易調試。下面展現一個多線程異步批次消費隊列的實踐代碼。異步
@Getter @Setter public class QueueData { /\*\*隊列數據 \*/ private Map<String,Object> data; /\*\*數據的數據標識 \*/ private String dbTableCode; }
import java.util.Map; import java.util.Queue; /\*\* \* 採用內存隊列做爲消息處理服務緩存 \*/ public class MemoryQueueService { public Queue<QueueData> queue; public MemoryQueueService(Queue<QueueData> queue){ this.queue = queue; } //推入隊列 public Integer publish(Map<String, Object> eventData) { QueueData queueData = new QueueData(); queueData.setData(eventData); queue.offer(queueData); return queue.size(); } }
import lombok.extern.slf4j.Slf4j; import java.util.\*; import java.util.concurrent.TimeUnit; /\*\* \* 內存隊列的消費端 \*/ @Slf4j public class MemoryQueueDataHandler implements Runnable { private int batchSize; private List<QueueData> eventCache; private Queue<QueueData> queue; /\*\* \* 是否運行 \*/ private boolean running; public MemoryQueueDataHandler(Queue<QueueData> queue, int batchSize) { this.queue = queue; this.batchSize = batchSize; eventCache = new ArrayList<>(batchSize); running = true; } @Override public void run() { log.info("內存隊列數據監聽handler啓動."); QueueData eventData=null; while (running || eventData!=null) { //消費 eventData = queue.poll(); if (eventData != null) { //事件消息不爲空 eventCache.add(eventData); //批量寫入 if (eventCache.size() >= batchSize) { flushCacheToDb(); } } else if(!eventCache.isEmpty()){ //緩存不爲空 flushCacheToDb(); } else { //若是隊列爲空,緩存也爲空則等待 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } } } //刷新緩存 flushCacheToDb(); } private void flushCacheToDb(){ Map<String,List<Map<String,Object>>> wData = new HashMap<>(batchSize); //構造批次 for (QueueData queueData : eventCache) { List<Map<String, Object>> cacheQueue = wData.computeIfAbsent(queueData.getDbTableCode(), k -> new ArrayList<>(batchSize)); cacheQueue.add(queueData.getData()); wData.put(queueData.getDbTableCode(),cacheQueue); } //批量寫入 for (Map.Entry<String, List<Map<String, Object>>> entry : wData.entrySet()) { //TODO 寫入邏輯 } eventCache.clear(); } public void setRunning(boolean running) { this.running = running; } }
private ExecutorService executorService; private MemoryQueueService queueService; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("event-consume-%d").build(); //後面把異步任務委託給ExecutorService executorService = new ThreadPoolExecutor( 3, //核心線程 5, //最大線程 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory ); //存放數據的隊列 ConcurrentLinkedQueue<QueueData> queue = new ConcurrentLinkedQueue<>(); //生產端 queueService = new MemoryQueueService(queue); //啓用5個線程進行消費 for (int i = 0; i < 5; i++) { //消費端 executorService.submit(new MemoryQueueDataHandler(dataEngine,queue,dataProperties.getBatchSize())); } //往隊列中緩存數據 HashMap<String,Object> map = new HashMap(); queueService.publish(map)
支和並框架的目的是以遞歸的方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體的結果,它是ExecutorService的一個實現,它把子任務分配給線程池(ForkJoinPool)中的工做線程。async
1. 基於ForkJoin封裝拆分任務,執行邏輯的抽象類ide
import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RecursiveTask; import java.util.function.Function; @Slf4j public class ListForkJoinExecution<V, R\> extends RecursiveTask<R\> { /\*\* \* 待處理數據 \*/ private transient List<V> values; /\*\* \* 單元邏輯執行函數 \*/ private transient Function<V, R> function; /\*\* \* 結果隊列 \*/ private transient ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue; public ListForkJoinExecution(List<V> values, Function<V, R> function){ this.values = values; this.function = function; } public void setResult(ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue) { this.resultQueue = resultQueue; } @Override protected R compute() { int len = values.size(); try { if(len >= 3){ int min = len / 2; // 拆分前一半 List<V> headValues = values.subList(0 , min); ListForkJoinExecution<V,R> a = new ListForkJoinExecution(headValues, function); a.setResult(resultQueue); a.fork(); resultQueue.offer(a); // 拆分後一半 List<V> endValues = values.subList(min + 1 , len); ListForkJoinExecution<V,R> b = new ListForkJoinExecution(endValues, function); b.setResult(resultQueue); b.fork(); resultQueue.offer(b); // 本次任務處理一個 R r = function.apply(values.get(min)); if (r != null) { return r; } } else if (len == 2){ List<V> headValues = values.subList(0 , 1); ListForkJoinExecution<V,R> a = new ListForkJoinExecution(headValues, function); a.setResult(resultQueue); a.fork(); resultQueue.offer(a); // 拆分後一半 List<V> endValues = values.subList(1 , 2); ListForkJoinExecution<V,R> b = new ListForkJoinExecution(endValues, function); b.setResult(resultQueue); b.fork(); resultQueue.offer(b); } else if(len == 1){ R r = function.apply(values.get(0)); if (r != null) { return r; } } }catch (Exception e){ log.error(e.getMessage(), e); } return null; } }
2. 執行forkjoin任務
import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ForkJoinPool; @Slf4j public class ForkJoinPoolRun { /\*\* \* 並行處理列表方法 \* @param task 任務 \* @param <V> 參數對象類型 \* @param <R> 返回對象類型 \* @return \*/ public static <V, R> List<R> run(ListForkJoinExecution<V, R> task){ return run(8, task); } public static <V, R> List<R> run(int poolSize, ListForkJoinExecution<V, R> task){ ForkJoinPool pool = new ForkJoinPool(poolSize); List<R> result = Lists.newArrayList(); ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue = new ConcurrentLinkedQueue<>(); try { task.setResult(resultQueue); // 執行 R r = pool.submit(task).get(); // 沒有結算結果的不追加到結果集中 if (r != null) { result.add(r); } while (resultQueue.iterator().hasNext()) { ListForkJoinExecution<V, R> poll = resultQueue.poll(); if (poll != null) { R join = poll.join(); // 沒有結算結果的不追加到結果集中 if (join != null) { result.add(join); } } } pool.shutdown(); return result; } catch (Exception e) { log.error("遍歷處理任務異常!", e); } return result; } /\*\* \* 並執行無返回方法 \* @param task 任務 \* @param <R> 返回對象類型 \* @return \*/ public static <R> void run(VoidForkJoinExecution<R> task){ run(8, task); } public static <R> void run(int poolSize, VoidForkJoinExecution<R> task){ ForkJoinPool pool = new ForkJoinPool(poolSize); try { // 執行 pool.submit(task); while (!pool.isTerminated()){ pool.shutdown(); } } catch (Exception e) { log.error("遍歷處理任務異常!", e); } } }