經過HTTP接口實現調用MQTT Client發送數據,HTTP接口返回值爲MQTT Client發送數據的對應結果。 HTTP接口爲同步阻塞,MQTT Client 爲異步回調方式。
如何實如今HTTP接口中調用MQTT Client發送數據後,可以阻塞等待MQTT返回結果,而後將結果返回?異步
CountDownLatch + Callbale+FutureTaskide
1.CountDownLatch做用this
CountDownLatch實如今MQTT Client 發送數據後 到接收數據後這段時間的阻塞。 HTTP每次請求,新建一個CountDownLatch,而後將CountDownLatch做爲值和deviceId做爲KEY保存到Map中, 調用MQTT Client 發送數據後,countDownLatch.await(),進行同步等待 在MQTT Client接收數據的回調方法中更加deviceId取出CountDwonLatch而後計數減一
2.Callbale+FutureTask做用線程
將調用MQTT Client發送數據的過程,封裝成Callable,投遞發送任務時,經過返回的FutureTask的get()方法, 同步阻塞,直到結果返回。
1.Map保存CountDownObj用於同步阻塞等待MQTT Client返回結果,以及將返回結果傳遞個FutureTaskcode
private final static ConcurrentMap<String, CountDownObj> countDownLatchMap = new ConcurrentHashMap<>(); //線程池 private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> { Thread thread = new Thread(runnable, "mqtt thread"); return thread; });
2.HTTP API 調用的發送MQTT 消息數據的接口對象
/** * HTTP API 調用的發送MQTT 消息數據的接口 * 同步阻塞 */ public Integer send(Long packageId, String deviceId) throws Exception { ...... FutureTask<Integer> futureTask = sendTask(publishDto)); return futureTask.get() }
3.投遞發送MQTT指令的task方法接口
/** * 投遞MQTT發送指令任務 * 同步阻塞 */ private FutureTask<Integer> sendTask(PublishDto publishDto) throws Exception { FutureTask<Integer> futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto)); threadPoolExecutor.execute(futureTask); //阻塞線程 return futureTask; }
4.封裝CountDownLatch 和 Integer的對象,用於CountDownLatch阻塞控制和返回結果rem
/** * 封裝CountDownLatch 和 Integer * 用於CountDownLatch阻塞控制和返回結果 */ private class CountDownObj { private final CountDownLatch countDownLatch; private volatile Integer value; private CountDownObj(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }
5.具體發送MQTT數據的Callbale線程Task,會新建CountDownLatch,並經過CountDownLatch.await()方法阻塞,直到MQTT回調接收到數據或者超時。get
/** * 發送MQTT消息的任務Callable */ private class GetDatapointValueCallable implements Callable<Integer> { private final PublishDto publishDto; GetDatapointValueCallable(PublishDto publishDto) { this.publishDto = publishDto; } @Override public Integer call() throws Exception { //mqtt client 發送數據,此處具體代碼省略 ...... CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch)); //阻塞,超時時間3s countDownLatch.await(3, TimeUnit.SECONDS); //返回mqtt指令對應的結果或者null return countDownLatchMap.remove(publishDto.getDeviceId()).getValue(); } }
6.MQTT接收數據回調,這裏經過deviceId從MAP裏面取到CountDownObj,釋放閉鎖(結束callable線程的等待)和設置MQTT返回的結果(即callable中call()返回的結果,也就是FutureTask的get()方法返回的結果)。同步
/** * MQTT 接收數據回調 */ void mqttReceiveCallback(String deviceId, String datapointId, String value) { ...... //接收到數據後,經過閉鎖釋放阻塞的線程,同時設置結果返回給調用者 CountDownObj countDownObj=countDownLatchMap.get(deviceId); if(countDownObj!=null) { countDownObj.setValue(Integer.parseInt(value)); countDownObj.getCountDownLatch().countDown(); } ....... }