CountDownLatch + Callbale+FutureTask 實現異步變同步調用

背景

經過HTTP接口實現調用MQTT Client發送數據,HTTP接口返回值爲MQTT Client發送數據的對應結果。 HTTP接口爲同步阻塞,MQTT Client 爲異步回調方式。
如何實如今HTTP接口中調用MQTT Client發送數據後,可以阻塞等待MQTT返回結果,而後將結果返回?異步

解決方法

CountDownLatch + Callbale+FutureTaskide

1.CountDownLatch做用this

CountDownLatch實如今MQTT Client 發送數據後 到接收數據後這段時間的阻塞。
HTTP每次請求,新建一個CountDownLatch,而後將CountDownLatch做爲值和deviceId做爲KEY保存到Map中,
調用MQTT Client 發送數據後,countDownLatch.await(),進行同步等待
在MQTT Client接收數據的回調方法中更加deviceId取出CountDwonLatch而後計數減一

2.Callbale+FutureTask做用線程

將調用MQTT Client發送數據的過程,封裝成Callable,投遞發送任務時,經過返回的FutureTask的get()方法,
同步阻塞,直到結果返回。

關鍵代碼

1.Map保存CountDownObj用於同步阻塞等待MQTT Client返回結果,以及將返回結果傳遞個FutureTaskcode

private final static ConcurrentMap<String, CountDownObj> countDownLatchMap = new ConcurrentHashMap<>();
    //線程池
    private final ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> {
        Thread thread = new Thread(runnable, "mqtt thread");
        return thread;
    });

2.HTTP API 調用的發送MQTT 消息數據的接口對象

/**
     * HTTP API 調用的發送MQTT 消息數據的接口
     * 同步阻塞
     */
    public Integer send(Long packageId, String deviceId) throws Exception {
        ......
       FutureTask<Integer> futureTask = sendTask(publishDto));
       return futureTask.get()
    }

3.投遞發送MQTT指令的task方法接口

/**
     * 投遞MQTT發送指令任務
     * 同步阻塞
     */ 
   private FutureTask<Integer> sendTask(PublishDto publishDto) throws Exception {
        FutureTask<Integer> futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto));
        threadPoolExecutor.execute(futureTask);
        //阻塞線程
        return futureTask;
    }

4.封裝CountDownLatch 和 Integer的對象,用於CountDownLatch阻塞控制和返回結果rem

/**
     * 封裝CountDownLatch 和 Integer
     * 用於CountDownLatch阻塞控制和返回結果
     */
    private class CountDownObj {
        private final CountDownLatch countDownLatch;
        private volatile Integer value;

        private CountDownObj(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }

        public Integer getValue() {
            return value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }
    }

5.具體發送MQTT數據的Callbale線程Task,會新建CountDownLatch,並經過CountDownLatch.await()方法阻塞,直到MQTT回調接收到數據或者超時。get

/**
     * 發送MQTT消息的任務Callable
     */
    private class GetDatapointValueCallable implements Callable<Integer> {
        private final PublishDto publishDto;

        GetDatapointValueCallable(PublishDto publishDto) {
            this.publishDto = publishDto;
        }

        @Override
        public Integer call() throws Exception {
            //mqtt client 發送數據,此處具體代碼省略
            ......
            
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch));
            //阻塞,超時時間3s
            countDownLatch.await(3, TimeUnit.SECONDS);
            //返回mqtt指令對應的結果或者null
            return countDownLatchMap.remove(publishDto.getDeviceId()).getValue();
        }

    }

6.MQTT接收數據回調,這裏經過deviceId從MAP裏面取到CountDownObj,釋放閉鎖(結束callable線程的等待)和設置MQTT返回的結果(即callable中call()返回的結果,也就是FutureTask的get()方法返回的結果)。同步

/**
     * MQTT 接收數據回調
     */
    void mqttReceiveCallback(String deviceId, String datapointId, String value) {
        ......
        
        //接收到數據後,經過閉鎖釋放阻塞的線程,同時設置結果返回給調用者
        CountDownObj countDownObj=countDownLatchMap.get(deviceId);
        if(countDownObj!=null) {
            countDownObj.setValue(Integer.parseInt(value));
            countDownObj.getCountDownLatch().countDown();
        }
        
        .......
    }
相關文章
相關標籤/搜索