本文創意來自一次業務需求,此次須要接入一個第三方外部服務。因爲這個服務只提供異步 API,爲了避免影響現有系統同步處理的方式,接入該外部服務時,應用對外屏蔽這種差別,內部實現異步請求同步。git
全文摘要:github
現有一個系統,總體架構以下所示:apache
這是一個很常見的同步設計方案,上游系統須要等待下游系統接口返回調用結果。編程
如今須要接入另一個第三方服務 B,該服務與服務 A 最大區別在於,這是一個異步 API
。調用以後,僅僅返回受理成功,處理結果後續經過異步通知返回。網絡
接入以後,總體架構以下所示:架構
因爲網絡隔離策略,通知接收程序與通訊服務須要單獨分開部署。若沒此要求,能夠將通訊服務 B 與通知接收程序合併成一個應用。併發
另外圖中全部應用採用雙節點部署。異步
爲了避免影響 OpenAPI
上游系統同步處理邏輯,通訊服務 B 調用第三方服務以後,不能馬上返回,須要等待結果通知,拿到具體返回結果。這就須要通訊服務 B 內部將異步轉爲同步。socket
這就是一個典型的異步轉同步問題,整個過程涉及兩個問題。ide
問題 1 的解決方案參考了 Dubbo 設計思路。
咱們在使用 Dubbo 調用遠程服務時,默認狀況下,這是一種阻塞式調用方式,即 Consumer 端代碼一直阻塞等待,直到 Provider 端返回爲止。
因爲 Dubbo 底層基於 Netty
發送網絡請求,這其是一個異步的過程。爲了讓業務線程能同步等待,這個過程就須要將異步轉爲同步。
Dubbo 發起遠程調用代碼位於 DubboInvoker#doInvoke
:
Dubbo 版本爲:2.6.X 版本。2,7.X 重構
DefaultFuture
,可是本質原理仍是同樣。
默認狀況下,Dubbo 支持同步調用方式,這裏將會建立 DefaultFuture
對象。
這裏有個很是重要邏輯,每一個請求生成一個惟一 ID,而後將 ID
與 DefaultFuture
映射關係,存入 Map
中。
這個請求 ID 在之因此這麼重要,是由於消費者併發調用服務發送請求,同時將會有多個業務線程進入阻塞。當收到響應以後,咱們須要喚醒正確的等待線程,並將處理結果返回。
經過 ID 這個惟一映射的關係,很天然能夠找到其對應 DefaultFuture
,喚醒其對應的業務線程。
來源:Dubbo 官網
業務線程調用 DefaultFuture#get
方法進入阻塞。這段代碼比較簡單,經過調用 Condition#await
阻塞線層。
當消費者接收到服務提供者的返回結果,將會調用 DefaultFuture#received
方法。
經過響應對象中的惟一 ID,找到其對應 DefaultFuture
對象,從而將結果設置 DefaultFuture
對象中,而後喚醒的相應的業務線程。
這裏實際有個優化點,使用 done#signalAll 代替 done#signal。使用 condition 等待通知機制的時候須要注意這一點。
正常狀況下,當消費者接收到響應以後,將會從 FUTURES
這個 Map
移除 DefaultFuture
。
可是在異常狀況下,服務提供者若處理緩慢,不能及時返回響應結果,消費者業務線程將會由於超時甦醒。這種狀況下 FUTURES
積壓了無效 DefaultFuture
對象。若是不及時清理,極端狀況下,將會發生 OOM 。
DefaultFuture
內部將會開啓一個異步線程,定時輪詢 FUTURES
判斷 DefaultFuture
超時時間,及時清理已經無效(超時)的 DefaultFuture
。
根據 Dubbo 解決思路,問題 1 解決辦法就比較簡單了。具體流程以下:
Map
存儲對應關係,並使業務線程阻塞等待這個設計過程須要注意設置合理的超時時間,這個超時時間須要考慮遠程服務調用耗時,能夠參考以下公式:
1業務線程等待時間=通訊服務 B 接口的超時時間 - 調用第三方服務 B 接口消耗時間
這裏就不貼出具體的代碼,詳細代碼參考 Dubbo DefaultFuture
。
接下來重點看下通知服務如何將結果轉發給正確的通訊服務 B 的節點。這裏想到兩種方案:
通訊服務 B 使用 SocketServer 構建一個服務接收程序,當通知接收程序收到第三方服務 B 通知時,經過 Socket
將結果轉發給通訊服務 B。
整個系統架構以下所示:
因爲生產服務雙節點部署,通知接收程序就不能寫死轉發地址。這裏咱們將請求 ID 與通訊服務 B socket
服務地址關係存入 Redis
中,而後通知接收程序經過 ID 找到正確的地址。
這個方案說實話有點複雜。
第一 SocketServer 編碼難度較大,編寫一個高效 SocketServer 就比較難,一不當心可能產生各類 Bug。
第二通訊服務 B 服務地址配置在配置文件中,因爲兩個節點地址不一樣,這就致使同一應用存在不一樣配置。這對於後面維護就很不友好。
第三額外引入 Redis
依賴,系統複雜度變高。
相對 SocketServer
方案,MQ
方案相對簡單,這裏採用 MQ
廣播消費的方式,架構如圖所示:
通知接收程序收到異步通知以後,直接將結果發送到 MQ
。
通訊服務 B 開啓廣播消費模式,拉取 MQ
消息。
通訊服務 B_1 拉取消息,經過請求 ID 映射關係,沒找到內部等待的線程,知道這不是本身的等待消息,因而 B_1 直接丟棄便可。
通訊服務 B_2 拉取消息,經過請求 ID 映射關係,順利找到正在等待的線程,而後能夠喚醒等待線程,返回最後的結果。
對比 SocketServer
方案,MQ
方案總體流程比較簡單,編程難度低,也沒用存在特殊的配置。
不過這個方案十分依賴 MQ
消息實時性,若 MQ
消息投遞延遲很高,這就會致使通訊服務 B 業務線程超時甦醒,業務異常返回。
這裏咱們選擇使用 RocketMQ
,長輪詢 Pull
方式,可保證消息很是實時,
綜上,這裏採用 MQ
的方案。
異步轉同步咱們須要解決同步阻塞,以及如何喚醒的問題。
阻塞/喚醒能夠分別使用 Condition#await/signalAll
。不過這個過程咱們須要生成一個惟一請求 ID,而且保存這個 ID 與業務線程映射關係。後續等到結果返回咱們才能經過惟一 ID 喚醒正確等待線程。
只要瞭解上面幾點,異步轉同步的問題就就能夠迎刃而解。