本文翻譯自:https://github.com/nathanmarz/storm/wiki/Distributed-RPC,做爲學習Storm DRPC的資料,轉載必須以超連接形式標明文章原始出處及本文翻譯連接。html
分佈式RPC(distributed RPC,DRPC)用於對Storm上大量的函數調用進行並行計算過程。對於每一次函數調用,Storm集羣上運行的拓撲接收調用函數的參數信息做爲輸入流,並將計算結果做爲輸出流發射出去。java
DRPC自己算不上Storm的特性,它是經過Storm的基本元素:streams,spouts,bolts,topologies而衍生的一個模式。DRPC能夠單獨做爲一個獨立於Storm的庫發佈,但因爲其重要性仍是和Storm捆綁在了一塊兒。git
DRPC經過DRPC Server來實現,DRPC Server的總體工做過程以下:github
接收到一個RPC調用請求;web
發送請求到Storm上的拓撲;數據庫
從Storm上接收計算結果;app
將計算結果返回給客戶端。jvm
以上過程,在client客戶端看來,一個DRPC調用看起來和通常的RPC調用沒什麼區別。下面代碼是client經過DRPC調用「reach」函數,參數爲「http://twitter.com」:分佈式
DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "http://twitter.com");
DRPC內部工做流程以下:ide
Client向DRPC Server發送被調用執行的DRPC函數名稱及參數。
Storm上的topology經過DRPCSpout實現這一函數,從DPRC Server接收到函數調用流;
DRPC Server會爲每次函數調用生成惟一的id;
Storm上運行的topology開始計算結果,最後經過一個ReturnResults的Bolt鏈接到DRPC Server,發送指定id的計算結果;
DRPC Server經過使用以前爲每一個函數調用生成的id,將結果關聯到對應的發起調用的client,將計算結果返回給client。
Storm提供了一個topology builder——LinearDRPCTopologyBuilder,它能夠自動完成幾乎全部的DRPC步驟。包括:
構建spout;
向DRPC Server返回結果;
爲Bolt提供函數用於對tuples進行彙集。
下面是一個簡單的例子,這個DRPC拓撲只是簡單的在輸入參數後追加「!」後返回:
public static class ExclaimBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); // ... }
由上述例子可見,咱們只需不多的工做便可完成拓撲。當建立LinearDRPCTopologyBuilder
的時候,須要指定拓撲中
DRPC
函數的名稱
「
exclamation」
。一個
DRPC Server
能夠協調多個函數,每一個函數有不一樣的函數名稱。拓撲中的第一個
bolt
的輸入是
兩
個字段:第一個是請求的
id
號;第二個是請求的參數。
LinearDRPCTopologyBuilder
同時須要最後一個
bolt
發射一個包含兩個字段的輸出流:第一個字段是請求
id
;第二個字段是計算結果。所以,全部的中間
tuples
必須包含請求
id
做爲第一個字段。
例子中,
ExclaimBolt
在輸入
tuple
的第二個字段後面追加
「!」
,
LinearDRPCTopologyBuilder
負責處理其他的協調工做:與
DRPC Server
創建鏈接,發送結果給
DRPC Server
。
DRPC能夠以本地模式運行,下面的代碼是如何在本地模式運行上面的例子:
LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown();
首先建立一個LocalDRPC
對象,該對象在本地模擬一個
DRPC Server
,正如
LocalCluster
在本地模擬一個
Storm
集羣同樣。而後建立一個
LocalCluster
對象在本地模式下運行拓撲。
LinearDRPCTopologyBuilder
含有單獨的方法用於建立本地拓撲和遠程拓撲。
本地模式下,
LocalDRPC
並不綁定任何端口,所以
Storm
的拓撲須要瞭解要通信的對象
——
這就是爲何
createLocalTopology
方法須要以
LocalDRPC
對象做爲輸入。
加載完拓撲以後,經過對
LocalDRPC
調用
execute
方法,就能夠執行
DRPC
函數調用了。
在實際的Storm集羣上運行DRPC也同樣很簡單。只需完成如下步驟:
啓動DRPC Server(s);
配置DRPC Server(s)地址;
向Storm集羣提交DRPC拓撲。
首先,經過storm腳本啓動DRPC Server:
bin/storm drpc
而後,在Storm集羣中配置DRPC Server地址,這就是DRPCSpout
讀取函數調用請求的地方。這一步的配置能夠經過
storm.yaml
文件或者拓撲的配置來完成。經過
storm.yaml
文件的配置方式以下:
drpc.servers: - "drpc1.foo.com" - "drpc2.foo.com"
最後,經過StormSubmitter
啓動
DRPC
拓撲。爲了以遠程模式運行
上面的例子,代碼以下:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology
被用於爲
Storm
集羣建立合適的拓撲。
上面的exclamation只是一個簡單的DRPC例子。下面經過一個複雜的例子介紹如何在Storm集羣內進行DRPC——計算Twitter上每一個URL的到達度(reach),也就是每一個URL暴露給的不一樣人的個數。
爲了完成這一計算,須要完成如下步驟:
獲取全部點選了(tweet)該URL的人;
獲取步驟1中全部人的關注者(followers,粉絲);
對全部關注者followers進行去重;
對步驟3中的關注者人數進行求和。
一個簡單的URL到達度計算可能涉及成千上萬次數據庫調用以及數以百萬的followers記錄,計算量很是大。有了Storm,將很容易實現這一計算過程。單機上可能須要運行幾分鐘才能完成,在Storm集羣上,即便是最難計算的URL也只須要幾秒鐘。
這個例子的代碼在storm-starter:點擊這裏。這裏是如何建立拓撲的代碼:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 3); builder.addBolt(new GetFollowers(), 12) .shuffleGrouping(); builder.addBolt(new PartialUniquer(), 6) .fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 2) .fieldsGrouping(new Fields("id"));
拓撲的執行分爲如下四步:
GetTweeters:獲取全部tweet了指定URL的用戶列表,這個Bolt將輸入流[id, url]轉換成輸出流[id, tweeter],每一個url元組被映射爲多個tweeter元組。
GetFollowers:獲取步驟1中全部用戶列表的followers,這個Bolt將輸入流[id, twetter]轉換成輸出流[id, follower],當某我的同時是多我的的關注者follower,並且這些人都tweet了指定的URL,那麼將產生重複的follower元組。
PartialUniquer:將全部followers按照follower id分組,使得同一個follower在同一個task中被處理。這個Bolt接收follower並進行去重計數。
CountAggregator:從各個PartialUniquer中接收各部分的計數結果,累加後完成到達度計算。
下面是PartialUniquer
這個
Bolt
的代碼實現:
public class PartialUniquer extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Set<String> _followers = new HashSet<String>(); @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _followers.add(tuple.getString(1)); } @Override public void finishBatch() { _collector.emit(new Values(_id, _followers.size())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "partial-count")); } }
PartialUniquer
經過繼承
BaseBatchBolt
實現了
IBatchBolt
接口,
batch bolt
提供了
API
用於將一批
tuples
做爲總體來處理。每一個請求
id
會建立一個新的
batch bolt
實例,同時
Storm
負責這些實例的清理工做。
當PartialUniquer
接收到一個
follower
元組時執行
execute
方法,將
follower
添加到請求
id
對應的
HashSet
集合中。
Batch bolt同時提供了finishBatch
方法用於當這個
task
已經處理完全部的元組時調用。
PartialUniquer
發射一個包含當前
task
所處理的
follower ids
子集去重後個數的元組。
在內部實現上,
CoordinatedBolt
用於檢測指定的
bolt
是否已經收到指定請求
id
的全部
tuples
元組。
CoordinatedBolt
使用
direct streams管理實現這一協做過程。
拓撲的其餘部分易於理解。到達度的每一步的計算過程都是並行進行的,經過DRPC實現也是很是容易的。
LinearDRPCTopologyBuilder
只能處理
「
線性的
」DRPC
拓撲
——
正如到達度這樣能夠經過一系列步驟序列來完成的計算。不難想象,
DRPC
調用中包含有更復雜的帶有分支和合並
Bolt
的拓撲。目前,必須本身直接使用
CoordinatedBolt
來完成這種非線性拓撲的計算。
DRPCSpout發射[args, return-info],其中return-info包含DRPC Server的主機和端口號,以及DRPC Server爲該次請求生成的惟一id號;
構造一個Storm拓撲包含如下部分:
DRPCSpout
PrepareRequest(生成一個請求id,爲return info建立一個流,爲args建立一個流)
CoordinatedBolt wrappers以及direct groupings
JoinResult(將結果與return info拼接起來)
ReturnResult(鏈接到DRPC Server,返回結果)
LinearDRPCTopologyBuilder是創建在Storm基本元素之上的高層抽象。
KeyedFairBolt用於組織同一時刻多請求的處理過程;
如何直接使用CoordinatedBolt
。