分佈式RPC
High level overview
LinearDRPCTopologyBuilder
Local mode DRPC
Remote mode DRPC
更復雜的例子
Non-linear DRPC topologies
LinearDRPCTopologyBuilder如何起做用
Advanced
分佈式 RPC(DRPC)的設計目標是充分利用 Storm 的計算能力實現高密度的並行實時計算。Storm 接收若干個函數參數做爲輸入流,而後經過 DRPC 輸出這些函數調用的結果。嚴格來講,DRPC 並不能算做是 Storm 的一個特性,由於它只是一種基於 Storm 原語 (Stream、Spout、Bolt、Topology) 實現的計算模式。雖然能夠將 DRPC 從 Storm 中打包出來做爲一個獨立的庫,可是與 Storm 集成在一塊兒顯然更有用。html
分佈式RPC是經過「DRPC server」協調處理的(Storm用一個包來實現該功能)。DRPC server 負責接收 RPC 請求,並將該請求發送到 Storm 中運行的 Topology,等待接收 Topology 發送的處理結果,並將該結果返回給發送請求的客戶端。所以,從客戶端的角度來講,DPRC 與普通的 RPC 調用並無什麼區別。例如,如下是一個使用參數 「http://twitter.com」 調用 「reach」 函數計算結果的例子:java
DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "http://twitter.com");
分佈式RPC工做流示意圖以下所示:
git
客戶端經過向 DRPC 服務器發送待執行函數的名稱以及該函數的參數來獲取處理結果。實現該函數的拓撲使用一個DRPCSpout 從 DRPC server中接收一個函數調用流,DRPC Server會爲每一個函數調用都標記了一個惟一的 id,隨後拓撲會執行函數來計算結果,並在拓撲的最後使用一個名爲 ReturnResults 的 bolt 鏈接到 DRPC Server,根據函數調用的 id 來將函數調用的結果返回。github
Storm中提供了名爲LinearDRPCTopologyBuilder 的topology builder,它幾乎自動完成了DRPC的全部步驟,以下所示:
1.設置spout
2.向DRPC server返回運行結果。
3.給bolts提供了彙集元組的功能。
讓咱們一塊兒看一下簡單的例子,該例子是DRPC topology的一個實現並返回結果爲輸入附加字符串「!」。數據庫
public static class ExclaimBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); }} public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); // ...}
正如你所見,當建立LinearDRPCTopologyBuilder 時,你須要讓topology知道 DRPC函數的名字。單個DRPC server負責不少函數的協調處理,且這些函數的功能不一樣。你聲明的第一個bolt將接受一個2元組,第一個域是請求id,第二個域是請求參數。LinearDRPCTopologyBuilder 中最後一個bolt會輸出形式爲[id,result]的2元組輸出流。最後,全部中間結果的元組的第一個域必須包括請求id。
在本例子中,ExclaimBolt 只是簡單地給第二個域附加字符串「!」。LinearDRPCTopologyBuilder 繼續和DRPC server通訊並將結果返回。apache
DRPC能夠以本地模式運行,下面以本地模式運行的例子:服務器
LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown();
首先你會建立一個 LocalDPRC 對象,該對象會在進程中模擬一個 DRPC 服務器,就像LocalCluster 在進程中模擬 Storm 集羣的功能同樣。而後,建立LocalCluster以本地模式運行topology 。LinearDRPCTopologyBuilder 有獨立的方法用於建立本地topologies 和遠程topologies 。在本地模式下,LocalDRPC 對象沒有綁定任何端口因此topology須要知道正在和它進行通訊的對象,這是方法createLocalTopology 接受LocalDRPC 對象做爲輸入參數的緣由。
在啓動拓撲後,你可使用 execute 方法來完成 DRPC 調用。app
在一個真實的集羣中使用 DRPC 有如下三個步驟:
1.啓動 DRPC Server;
2.配置 DRPC Server的地址;
3.將 DRPC topologies 提交到集羣運行。
能夠像 Nimbus、Supervisor 那樣使用 storm 命令來啓動 DRPC Serve,以下:
bin/storm drpc
接下來,你須要在集羣上配置 DRPC Server的地址。這是爲了讓 DRPCSpout 獲取從哪裏觸發函數調用的方法。能夠經過編輯 storm.yaml 或者添加拓撲配置的方式實現配置。配置 storm.yaml 的方式相似於下面這樣:jvm
drpc.servers: - "drpc1.foo.com" - "drpc2.foo.com"
最後,你能夠像其餘拓撲同樣使用 StormSubmitter 來啓動拓撲。如下是使用遠程模式構造拓撲的一個例子:分佈式
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology 方法是用來建立集羣模式下運行的topologies 。
上面描述的exclamation DRPC是爲了說明DRPC的簡單例子。下面讓咱們共同窗習一下更復雜的例子-Storm集羣並行計算的DRPC函數調用,該例子是計算Twitter上URL的訪問。
URL訪問是指不一樣的人在Twitter上發的推文,你須要完成以下計算:
1.獲取全部tweeted了該URL的全部人。
2.獲取全部關注了1中的全部人。
3.2中全部人的set集合。
4.統計3中set集合的個數。
一次計算可能涉及上百次的數據庫調用和數以千萬計的關注記錄,這計算規模確實很大。正如你所見,實現Storm函數是很是簡單的。在單臺機器上,計算須要1分鐘;但在集羣中,即便最難計算的URL訪問也只需數秒。
一個簡單的訪問topology 能夠在storm-starter中找到。下面是定義訪問topology 的具體步驟:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 3); builder.addBolt(new GetFollowers(), 12) .shuffleGrouping();builder.addBolt(new PartialUniquer(), 6) .fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 2) .fieldsGrouping(new Fields("id"));
topology 的執行按照如下四步驟:
1.GetTweeters 獲得tweeted了URL的用戶。它將[id,url]格式的輸入流轉換爲[id,tweeter]格式的輸出流。每一個url將映射爲多個tweeter元組。
2.GetFollowers獲得tweeters的關注者。它將[id,tweeter]格式的輸入流轉換爲[id,follower]格式的輸出流。這些任務中,可能有重複的元組,由於一些人可能關注了多我的都tweeted了一樣的URL。
3.PartialUniquer根據關注者id分組。這會致使一樣的關注者在一樣的任務中處理,因此每一個PartialUniquer 的任務將會接受多個互補的關注者集合。一旦PartialUniquer 接受了全部的關注者元組,它將會輸出關注者子集合元素的個數。
4.最後,CountAggregator 從PartialUniquer 接受部分count值而後累加完成整個計算,並返回結果。
下面看PartialUniquer bolt的代碼實現:
public class PartialUniquer extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Set<String> _followers = new HashSet<String>(); @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _followers.add(tuple.getString(1)); } @Override public void finishBatch() { _collector.emit(new Values(_id, _followers.size())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "partial-count")); }}
PartialUniquer 經過繼承BaseBatchBolt 間接實現了IBatchBolt 接口。batch bolt提供了第一個類API,用於將一批元組做爲一個具體單元進行處理。對於每一個請求id,會建立一個新的batch bolt實例,在適當時候Storm也會清理這些實例。
當PartialUniquer 在execute方法中接受了一個關注者元組時,會將該元組添加到請求id的set集合中。
當任務中的一批元組處理完成後會調用batch bolts的finishBatch 方法,該方法輸出關注者id集合元素個數形式的一元組。在後臺,CoordinatedBolt 用來監測什麼時候給定bolt已經接受到了請求id全部的元組,它用direct stream來管理。
topology 中剩下的部分都能很容易明白,正如你看到的,每一個訪問計算步驟都是並行完成的,而且定義DRPC topology是很是簡單的。
LinearDRPCTopologyBuilder 只處理線性DRPC的topology,它的計算是一個序列步驟。不難想象功能需求將須要更復雜的topology ,它可能涉及到bolts的分支與整合。
DRPCSpout 發送[args,return-info]元組。return-info是DRPC server的主機名、端口號和生成的id。
topology的構造參數包括:
DRPCSpout
PrepareRequest (生成請求id和建立返回信息stream和參數stream)
CoordinatedBolt wrappers and direct groupings
JoinResult (返回信息進行join操做)
ReturnResult (鏈接到DRPC server並返回結果)
LinearDRPCTopologyBuilder 是一個很好的例子
KeyedFairBolt 同時處理多個請求。 如何直接使用CoordinatedBolt。