最近在編碼中遇到的場景,個人程序須要處理不一樣類型的任務,場景要求以下:
1.同類任務串行、不一樣類任務併發。
2.高吞吐量。
3.任務類型動態增減。java
思路一:
最直接的想法,每有一個任務種類被新建,就建立對應的處理線程。
這樣的思路問題在於線程數量不可控、建立、銷燬線程開銷大。不可取。數據結構
思路二:
比較常規的想法,全部任務共享線程池每有一個任務種類被建立,就新建一個隊列,以保證同類任務串行。
這樣的思路問題在於數據結構開銷不可控,若是是任務種類繁多,但每種任務數量並很少的狀況,那麼建如此多的隊列顯得好笑。併發
因而我指望可以使用一個線程池、一個隊列搞定這些事。app
引入disruptor依賴:框架
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
Task接口ide
public interface Task<T> { public T exec(); }
TaskEvent類this
import com.lmax.disruptor.EventFactory; public class TaskEvent<T> { private Task<T> input; //用於標記一個併發分組 private int partitionId; //disruptor的任務事件工場 public static final EventFactory<TaskEvent> FACTORY = TaskEvent::new; public Task<T> getInput() { return input; } public void setInput(Task<T> input) { this.input = input; } public int getPartitionId() { return partitionId; } public void setPartitionId(int partitionId) { this.partitionId = partitionId; } }
TaskHandler類編碼
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.LifecycleAware; public class EventAdaptor<T> implements EventHandler<TaskEvent<T>>, LifecycleAware { //決定我處理那些任務 private final int partitionId; public EventAdaptor(int partitionId) { super(); this.partitionId = partitionId; } public void onEvent(TaskEvent<T> taskEvent, long arg1, boolean arg2) throws Exception { if(partitionId == taskEvent.getPartitionId()) { taskEvent.getInput().exec(); } } @Override public void onShutdown() { Thread.currentThread().setName("handler-" + partitionId); } @Override public void onStart() { Thread.currentThread().setName("handler-" + partitionId); } }
TaskService類線程
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @Service @Singleton public class TaskService<V extends Task<T>, T> { private static final int BUFFER_SIZE = 1024; private static final int DEFAULT_POOL_SIZE = 5; private ThreadPoolExecutor executor; private Disruptor<TaskEvent> disruptor; private Map<String,Integer> taskClassMapperPartition = new ConcurrentHashMap<>(); private static final int PARALLEL_NUM = 5; private List<String> taskTypes = new ArrayList<>(); @PostConstruct public void init() { //初始化處理器和線程池 this.executor = new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.executor.prestartAllCoreThreads(); this.disruptor = new Disruptor<>(TaskEvent.FACTORY, BUFFER_SIZE, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); EventAdaptor[] handlers = new EventAdaptor[PARALLEL_NUM]; for(int i = 0; i < PARALLEL_NUM; i++) { handlers[i] = new EventAdaptor(i); } this.disruptor.handleEventsWith(handlers); this.disruptor.start(); rePartition(); } public void addTaskType(String type) { taskClassMapperPartition.put(type, taskTypes.size() % PARALLEL_NUM); taskTypes.add(type); } public void deleteTaskType(String type) { taskTypes.remove(type); taskClassMapperPartition.remove(type); rePartition(); } private void rePartition() { for(int i = 0, length = taskTypes.size(); i < length; i++) { //給各任務處理器均衡發聽任務 taskClassMapperPartition.put(taskTypes.get(i), i % PARALLEL_NUM); } } }
這裏給出的代碼是一套物業無得簡單框架,你只要實現Task接口就能夠了。設計