Storm DRPC實現機制分析剖析

        DRPC是創建在Storm基本概念(Topology、Spout、Bolt、Stream等)之上的高層抽象,我的理解它的目標是在Storm 集羣之上提供一種分佈式的RPC框架,以便可以利用Storm快速的實現RPC請求的分佈式計算過程,即發起一次RPC請求,多個worker計算節點參 與計算,最後彙總後將計算結果返回給客戶端。java

 

Storm中使用Thrift做爲其RPC框架,一樣地,DRPC的實現也是構建在Thrift協議之上,相關的源碼文件以下:git

1. storm-core/src/storm.thrift,定義了Storm中實現的Thrift協議,其中有兩個service是與DRPC相關的:DistributedRPC和DistributedRPCInvocations,它們的接口定義以下:github

DistributedRPC.Iface:定義了execute方法,用於客戶端發起RPC請求;apache

DistributedRPCInvocations.Iface:定義了fetchRequest、failRequest、result方法,分別用於獲取RPC請求、將RPC請求標記爲失敗、返回RPC請求的處理結果。架構

 

2. storm-core/src/clj/backtype/storm/daemon/drpc.clj,實現了DRPC的Thrift服務端(即DRPC Server),使用Clojure語言實現。框架

 

3. storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java 和storm-core/src/jvm/backtype/storm/utils/DRPCClient.java,做爲RPC客戶端,實現了 DistributedRPC.Iface接口,用於客戶端向DRPC Server發起RPC請求。jvm

 

4. storm-core/src/jvm/backtype/storm/generated /DistributedRPCInvocations.java和storm-core/src/jvm/backtype/storm/drpc /DRPCInvocationsClient.java,做爲RPC客戶端,實現了DistributedRPCInvocations.Iface接 口,用於DRPC Topology觸發執行DRPC Request並返回結果給DRPC Server。分佈式

從中能夠看出,對於DRPC Server來講,DRPC Client和DRPC Topology都是Thrift的客戶端,只是分別調用了不一樣的Thrift服務而已。fetch

 

Storm DRPC實現架構ui

1. 首先,前提是集羣上已經運行了DRPC Topology,每一個DRPC服務註冊了一個RPC方法,包含方法名稱和參數形式(上圖中假設Topology已經啓動運行);

 

2. 接下來是處理流程,客戶端經過DRPCClient調用execute方法,發起一次RPC調用給DRPC Server,目前受限的是隻支持一個String類型的DRPC方法調用參數,社區中正在討論對此進行擴展;

 

3. 而後,DRPC Server中有一個handler-server pool,用於接收RPC請求,併爲每一個請求生成惟一的request id,生成一條DRPC Request記錄,並放到request queue中等待被消費(計算);

 

4. 最後,DRPC Topology中的相關模塊(DRPC Spout、ReturnResults Bolt,後面會介紹)經過invoke-server pool從request queue中取出該方法的RPC請求,並將處理結果(成功/失敗)返回給DRPC Server,直到最終返回給阻塞着的DRPC Client。

 

Storm DRPC拓撲數據流

其 中,DRPC Topology由1個DRPCSpout、1個Prepare-Request Bolt、若干個User Bolts(即用戶經過LinearDRPCTopologyBuilder添加的Bolts)、1個JoinResult Bolt和1個ReturnResults Bolt組成。除了User Bolts之外,其餘的都是由LinearDRPCTopologyBuilder內置添加到Topology中的。接下來,咱們從數據流的流動關係來 看,這些Spout和Bolts是如何工做的:

 

1. DRPCSpout中維護了若干個DRPCInvocationsClient,經過fetchRequest方法從DRPC Server讀取須要提交到Topology中計算的RPC請求,而後發射一條數據流給Prepare-Request Bolt:<」args」, ‘」return-info」>,其中args表示RPC請求的參數,而return-info中則包含了發起此次RPC請求的RPC Server信息(host、port、request id),用於後續在ReturnResults Bolt中返回計算結果時使用。

 

2. Prepare-Request Bolt接收到數據流後,會新生成三條數據流:

<」request」, 」args」>:發給用戶定義的User Bolts,提取args後進行DRPC的實際計算過程;

<」request」, 」return-info」>:發給JoinResult Bolt,用於和User Bolts的計算結果作join之後將結果返回給客戶端;

<」request」>:在用戶自定義Bolts實現了FinishedCallback接口的狀況下,做爲ID流發給用戶定義的最後一級Bolt,用於判斷batch是否處理完成。

 

3. User Bolts按照用戶定義的計算邏輯,以及RPC調用的參數args,進行業務計算,並最終輸出一條數據流給JoinResult Bolt:<」request」, 」result」>。

 

4. JoinResult Bolt將上游發來的<」request」, 」return-info」>和<」request」, 」result」>兩條數據流作join,而後輸出一條新的數據流給ReturnResults Bolt: <」result」, 」return-info」>。

 

5. ReturnResults Bolt接收到數據流後,從return-info中提取出host、port、request id,根據host和port生成DRPCInvocationsClient對象,並調用result方法將request id及result返回給DRPC Server,若是result方法調用成功,則對tuple進行ack,不然對tuple進行fail,並最終在DRPCSpout中檢測到tuple 失敗後,調用failRequest方法通知DRPC Server該RPC請求執行失敗。

 

參考資料連接

1. https://github.com/apache/incubator-storm

2. https://github.com/nathanmarz/storm-starter

轉載地址:http://blog.yoodb.com/yoodb/article/detail/1112

相關文章
相關標籤/搜索