原文地址:http://storm.apache.org/documentation/Distributed-RPC.htmlhtml
分佈式RPC的目的是在storm進行大量的實時計算時,可以並行的調用storm上的函數。Storm topology能夠將函數參數做爲輸入Stream,而且將被調用方法產生的結果做爲返回發送出去。java
與其說DRPC是storm的一個特色,不如說它只是storm基本概念如steams,spouts,bolts和topologies的一種表達方式。DRPC能夠獨立於storm做爲一個庫發佈,但在和storm捆綁在一塊兒時將會很是有用。git
分佈式RPC被「DRPC server」實現(storm包中已經有了對應的實例)。DRPC server協調接收一個RPC請求並將這個請求發送給storm topology,而後接收storm topology算出的結果,再將結果發送給等待中的客戶端。從客戶端的視角來看,分佈式RPC調用過程跟普通的RPC調用過程同樣。舉例:這裏有一個客戶端調用「reach」方法,輸入參數是http://twitter.com,而後獲得計算結果的例子。github
DRPCClient client = new DRPCClient("drpc-host", 3772);數據庫
String result = client.execute("reach", "http://twitter.com");apache
這個DRPC的工做流能夠描述爲:api
一個客戶端向DRPC Server發送了想要調用的方法名稱和方法參數。實現了這個方法的topology用一個DRPCSpout從DRPC Server接收了函數調用Stream。每一個遠程方法在DRPC Server上都有一個惟一的ID。Topology計算出結果以後,使用一個ReturnResults的bolt鏈接DPRC Server後,將結果交給它。DRPC Server根據方法ID匹配出結果,而後喚醒等待的客戶端,將結果發送給客戶端app
Storm中有一個LinearDRPCTopologyBuilder 的topology 生成,已經自動實現了DPRC調用過程當中的絕大部分。包括:jvm
1:配置spout分佈式
2:將結果返回給DPRC Server
3:爲bolt提供了簡單的tuple之間的聚合操做
讓咱們看一個簡單的例子,這裏有一個DPRC topology的實現,他能夠給輸入的參數後面添加一個「!」。
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
// ...
}
就像咱們看到的,代碼很是簡單。當咱們建立一個LinearDRPCTopologyBuilder時,你告訴它實現了DRPC功能的topology的名字。一個DRPC server能夠配置不少topology名稱,這些名稱不能重複。第一個bolt(不在代碼中,應該是LinearDRPCTopologyBuilder內部的—譯者注)會接受2個tuples,第一個屬性是請求id,第二個是方法的參數。LinearDRPCTopologyBuilder中最後一個bolt返回了2個tuples的輸出Stream,Stream的格式爲[id, result],固然了,過程當中產生的全部tuple的第一個屬性都是請求id。
在這個例子中,ExclaimBolt只是簡單的在tuple的第二種屬性上添加了一個「!」。LinearDRPCTopologyBuilder完成了鏈接DRPC server並返回結果的其餘過程。
DRPC能夠在本地運行,例子:
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
cluster.shutdown();
drpc.shutdown();
第一步,建立一個LocalDRPC對象。這個隊形模擬DPRC server的運行,就像LocalCluster模擬集羣運行同樣。而後你建立一個LocalCluster在本地運行topology。LinearDRPCTopologyBuilder有獨立的方法分佈建立本地topology和遠程topology。本地運行的LocalDRPC不須要綁定端口,由於在本地topology不須要端口來傳遞對象。這就是爲何createLocalTopology將LocalDRPC做爲輸入。
啓動以後,你就能夠看到DRPC調用過程在本地執行。
在實際的集羣上使用DRPC也很是簡單,只須要三步:
1:啓動DRPC server
2:配置DRPC server地址
3:將DRPC topology提交到storm集羣
啓動DRPC server就行啓動Nimbus或UI同樣簡單
bin/storm drpc
接下來,你須要爲storm集羣配置DRPC的地址,才能DRPCSpout讓知道在哪裏讀取方法調用。能夠在storm.yaml中配置或者經過topology配置。在storm.yaml中配置以下
drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"
最後,你要經過StormSubmitter啓動DPRC topology,就想啓動任何topology同樣。遠程模式運行上面的例子你能夠:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology用來爲storm集羣建立topology。
上面那個感嘆號例子用來熟悉DPRC的概念仍是過於簡單。讓咱們看一個更復雜的例子,一個真正須要經過並行運行storm來計算的DRPC方法。例子就是計算Twitter上的URL的reach。
URL的reach就是這個URL暴漏給多少用戶。爲了計算reach,你須要:
1:獲取這個URL的全部twitter
2:獲取這些twitter的follower
3:去掉重複的follower
4:計算每一個URL的follower
一個真實的計算須要數千次的數據庫交互和上百萬的flowwer記錄的計算。這是很是很是碉堡的計算。但正如你所看到的,基於storm實現這個功能卻很是的簡單。在單臺機器上,reach要花上數分鐘來計算;可是在storm集羣上,你能夠在數秒鐘就計算出最難算的URL的reach。
在storm-starter上有一個reach topology(here)樣例,這裏告訴你如何定義一個reach topology。
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
.shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
.fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
.fieldsGrouping(new Fields("id"));
這個topology計算一共須要四步:
1:GetTweeters獲取了tweeted這個URL的用戶。它將輸入stream[id, url]轉換成了一個輸出Stream[id, tweeter]。每個URL映射了多個用戶tuple。
2:獲取了每個tweeter的followers(粉絲)。它將輸入stream[id, tweeter]轉換成了一個輸出Stream [id, follower]。通過這個過程,因爲一個follower是同時tweet了同一個URL的多個用戶的粉絲,就會產生一些重複的follower tuple。
3.PartialUniquer將follower Stream按照follower和id分組,保證同一個follower會進入同一個task。因此每個PartialUniquer 的task都會接收到相互獨立的follower集合。當PartialUniquer接收了全部根據request id分配給它的follower tuples,它就會將去重以後的follower集合的數量發射出去。
4.最終,CountAggregator接收了每個PartialUniquer task發射的數量,而且經過計算總和來完成reach的計算過程。
下面來看一下PartialUniquer bolt:
public class PartialUniquer extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Set<String> _followers = new HashSet<String>();
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_followers.add(tuple.getString(1));
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _followers.size()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}
}
PartialUniquer經過繼承BaseBatchBolt實現了IBatchBolt接口。批處理bolt提供了一流的API能夠將批量的tuples看成一個批次來處理。每個request id都會建立一個batch bolt實例,而storm負責在適當的時候清理這些實例。
每當PartialUniquer在execute方法中接收到一個follower tuple,它就將follower放到request id對應的set集合中。
當發射到這個bolt的全部tuples都被處理以後,batch bolts中的finishBatch方法將會被調用。 PartialUniquer發射了一個包含follower數量的tuple。
在後臺,CoordinatedBolt用來判斷bolt是否所有接受了指定request id的全部tuple。CoordinatedBolt利用直接Stream來管理此次協調。
這個topology的剩餘部分就顯而易見了。就像你看到的,reach計算的每一步都是並行計算,並且實現DRPC topology是多麼的簡單。
LinearDRPCTopologyBuilder只能處理線下的DRPC topologies:整個計算能夠分割爲多個獨立的順序步驟。它很難處理包含有bolt分支和bolt合併的複雜topology。目前,爲了實現複雜的功能,你只能經過直接使用CoordinatedBolt。
DRPCSpout發射了[args, return-info].return-info中包含DRPC的地址和端口,就像DRPC的id同樣。
構建一個topology包含:
DRPCSpout
PrepareRequest(準備請求:產生一個request id併爲return-info和參數分別建立一個Stream)
CoordinatedBolt wrappers and direct groupings(CoordinatedBolt封裝和直接分組)
JoinResult(將result-info加入結果)
ReturnResult(鏈接DRPC server 並返回結果)
LinearDRPCTopologyBuilder是一個很是好storm高級抽象的例子
KeyedFairBolt封裝同時處理多個請求
如何直接使用CoordinatedBolt
譯者:須要詳細瞭解CoordinatedBolt,我的推薦徐明明的一個博客http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/