1.建立一個單線程的隊列處理類
1.1實現Thread run阻塞讀隊列並處理
1.2提供submit提交請求,並返回處理future
1.3隊列請求完成後,置fulture爲complete
1.4req處理函數設爲類私有成員,能夠經過初始化動態設置app
2.建立一個調用類
2.1初始化隊列處理類實例,並賦值req處理函數,並啓動
2.2調用隊列submit,經過future接收提交的消息處理完後的反饋消息。異步
/**調用類,例如中間件接收到消息後,向隊列提交請求*/ public class WalRequestConsumer implements Consumer<Req> { private final SingularUpdateQueue<Req, Res> walWriterQueue; public WalRequestConsumer() { walWriterQueue = new SingularUpdateQueue<Req, Res>((message) -> { return responseMessage(message); }); startHandling(); } private Res responseMessage(Req message) { return new Res(); } private void startHandling() { this.walWriterQueue.start();} @Override public void accept(Req req) { CompletableFuture<Res> future = walWriterQueue.submit(req); future.whenComplete((responseMessage, error) -> { sendResponse(responseMessage); }); } private void sendResponse(Res responseMessage) { } }
/**異步接收,單一處理,完成響應**/ public class SingularUpdateQueue<Req, Res> extends Thread { private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100); private Function<Req, Res> handler; private volatile boolean isRunning = false; public SingularUpdateQueue (Function<Req,Res> t){ this.handler = t; } private Optional<RequestWrapper<Req, Res>> take() { try { return Optional.ofNullable(workQueue.poll(300, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { return Optional.empty(); } } public void shutdown() { this.isRunning = false; } public CompletableFuture<Res> submit(Req request) { try { RequestWrapper requestWrapper = new RequestWrapper<Req, Res>(request); workQueue.put(requestWrapper); return requestWrapper.getFuture(); } catch (InterruptedException e) { throw new RuntimeException(e); } } @SneakyThrows @Override public void run() { isRunning = true; while(isRunning) { Optional<RequestWrapper<Req, Res>> item = take(); item.ifPresent(requestWrapper -> { try { Res response = handler.apply(requestWrapper.getRequest()); requestWrapper.complete(response); } catch (Exception e) { requestWrapper.completeExceptionally(e); } }); } } }