基於Disruptor併發框架的分類任務併發

併發的場景

最近在編碼中遇到的場景,個人程序須要處理不一樣類型的任務,場景要求以下:
1.同類任務串行、不一樣類任務併發。
2.高吞吐量。
3.任務類型動態增減。java

思路

思路一:
最直接的想法,每有一個任務種類被新建,就建立對應的處理線程。
這樣的思路問題在於線程數量不可控、建立、銷燬線程開銷大。不可取。數據結構

思路二:
比較常規的想法,全部任務共享線程池每有一個任務種類被建立,就新建一個隊列,以保證同類任務串行。
這樣的思路問題在於數據結構開銷不可控,若是是任務種類繁多,但每種任務數量並很少的狀況,那麼建如此多的隊列顯得好笑。併發

因而我指望可以使用一個線程池、一個隊列搞定這些事。app

設計

這裏寫圖片描述

代碼實現

引入disruptor依賴:框架

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

Task接口ide

public interface Task<T> {
    public T exec();
}

TaskEvent類this

import com.lmax.disruptor.EventFactory;

public class TaskEvent<T> {

    private Task<T> input;
    
    //用於標記一個併發分組
    private int partitionId;
    
    //disruptor的任務事件工場
    public static final EventFactory<TaskEvent> FACTORY = TaskEvent::new;

    public Task<T> getInput() {
        return input;
    }

    public void setInput(Task<T> input) {
        this.input = input;
    }

    public int getPartitionId() {
        return partitionId;
    }

    public void setPartitionId(int partitionId) {
        this.partitionId = partitionId;
    }

}

TaskHandler類編碼

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;

public class EventAdaptor<T> implements EventHandler<TaskEvent<T>>, LifecycleAware {
    
    //決定我處理那些任務
    private final int partitionId;

    public EventAdaptor(int partitionId) {
        super();
        this.partitionId = partitionId;
    }

    public void onEvent(TaskEvent<T> taskEvent, long arg1, boolean arg2) throws Exception {
        if(partitionId == taskEvent.getPartitionId()) {
            taskEvent.getInput().exec();
        }
    }

    @Override
    public void onShutdown() {
        Thread.currentThread().setName("handler-" + partitionId);
    }

    @Override
    public void onStart() {
        Thread.currentThread().setName("handler-" + partitionId);
    }

}

TaskService類線程

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

@Service
@Singleton
public class TaskService<V extends Task<T>, T> {

    private static final int BUFFER_SIZE = 1024;
    
    private static final int DEFAULT_POOL_SIZE = 5;
    
    private ThreadPoolExecutor executor;
    
    private Disruptor<TaskEvent>  disruptor;
    
    private Map<String,Integer> taskClassMapperPartition = new ConcurrentHashMap<>();
    
    private static final int PARALLEL_NUM = 5;
    
    private List<String> taskTypes = new ArrayList<>();
    
    @PostConstruct
    public void init() {
        
        //初始化處理器和線程池
        this.executor = new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.prestartAllCoreThreads();
        this.disruptor = new Disruptor<>(TaskEvent.FACTORY, BUFFER_SIZE, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
    
        EventAdaptor[] handlers = new EventAdaptor[PARALLEL_NUM];
        
        for(int i = 0; i < PARALLEL_NUM; i++) {
            handlers[i] = new EventAdaptor(i);
        }
        
        this.disruptor.handleEventsWith(handlers);
        this.disruptor.start();
        
        rePartition();
    }
    
    public void addTaskType(String type) {
        taskClassMapperPartition.put(type, taskTypes.size() % PARALLEL_NUM);
        taskTypes.add(type);
    }
    
    public void deleteTaskType(String type) {
        taskTypes.remove(type);
        taskClassMapperPartition.remove(type);
        rePartition();
    }

    private void rePartition() {
        for(int i = 0, length = taskTypes.size(); i < length; i++) {
            //給各任務處理器均衡發聽任務
            taskClassMapperPartition.put(taskTypes.get(i), i % PARALLEL_NUM);
        }
    }
    
}

這裏給出的代碼是一套物業無得簡單框架,你只要實現Task接口就能夠了。設計

相關文章
相關標籤/搜索