多線程插入等待響應-單線程順序消費-非阻塞模式

1.建立一個單線程的隊列處理類
1.1實現Thread run阻塞讀隊列並處理
1.2提供submit提交請求,並返回處理future
1.3隊列請求完成後,置fulture爲complete
1.4req處理函數設爲類私有成員,能夠經過初始化動態設置app

2.建立一個調用類
2.1初始化隊列處理類實例,並賦值req處理函數,並啓動
2.2調用隊列submit,經過future接收提交的消息處理完後的反饋消息。異步

/**調用類,例如中間件接收到消息後,向隊列提交請求*/
public class WalRequestConsumer implements Consumer<Req> {
    private final SingularUpdateQueue<Req, Res> walWriterQueue;

    public WalRequestConsumer() {
        walWriterQueue = new SingularUpdateQueue<Req, Res>((message) -> {
           return  responseMessage(message);
        });
        startHandling();
    }
    private Res responseMessage(Req message) {
        return new Res();
    }
    private void startHandling() { this.walWriterQueue.start();}
    @Override
    public void accept(Req req) {
        CompletableFuture<Res> future = walWriterQueue.submit(req);
        future.whenComplete((responseMessage, error) -> {
            sendResponse(responseMessage);
        });
    }
    private void sendResponse(Res responseMessage) {
    }
}
/**異步接收,單一處理,完成響應**/
public class SingularUpdateQueue<Req, Res> extends Thread {
    private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue
            = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100);
    private Function<Req, Res> handler;
    private volatile boolean isRunning = false;

    public SingularUpdateQueue (Function<Req,Res> t){
        this.handler = t;
    }

    private Optional<RequestWrapper<Req, Res>> take() {
        try {
            return Optional.ofNullable(workQueue.poll(300, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            return Optional.empty();
        }
    }
    
    public void shutdown() {
        this.isRunning = false;
    }
    
    public CompletableFuture<Res> submit(Req request) {
        try {
            RequestWrapper requestWrapper = new RequestWrapper<Req, Res>(request);
            workQueue.put(requestWrapper);
            return requestWrapper.getFuture();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    @SneakyThrows
    @Override
    public void run() {
        isRunning = true;
        while(isRunning) {
            Optional<RequestWrapper<Req, Res>> item = take();
            item.ifPresent(requestWrapper -> {
                try {
                    Res response = handler.apply(requestWrapper.getRequest());
                    requestWrapper.complete(response);

                } catch (Exception e) {
                    requestWrapper.completeExceptionally(e);
                }
            });
        }
    }
}
相關文章
相關標籤/搜索