Storm常見模式——分佈式RPC

Storm常見模式——分佈式RPC

本文翻譯自:https://github.com/nathanmarz/storm/wiki/Distributed-RPC,做爲學習Storm DRPC的資料,轉載必須以超連接形式標明文章原始出處及本文翻譯連接html

分佈式RPC(distributed RPC,DRPC)用於對Storm上大量的函數調用進行並行計算過程。對於每一次函數調用,Storm集羣上運行的拓撲接收調用函數的參數信息做爲輸入流,並將計算結果做爲輸出流發射出去。java

DRPC自己算不上Storm的特性,它是經過Storm的基本元素:streams,spouts,bolts,topologies而衍生的一個模式。DRPC能夠單獨做爲一個獨立於Storm的庫發佈,但因爲其重要性仍是和Storm捆綁在了一塊兒。git

整體概述

DRPC經過DRPC Server來實現,DRPC Server的總體工做過程以下:github

  1. 接收到一個RPC調用請求;web

  2. 發送請求到Storm上的拓撲;數據庫

  3. 從Storm上接收計算結果;app

  4. 將計算結果返回給客戶端。jvm

以上過程,在client客戶端看來,一個DRPC調用看起來和通常的RPC調用沒什麼區別。下面代碼是client經過DRPC調用「reach」函數,參數爲「http://twitter.com」:分佈式

DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "http://twitter.com");

DRPC內部工做流程以下:ide

  1. Client向DRPC Server發送被調用執行的DRPC函數名稱及參數。

  2. Storm上的topology經過DRPCSpout實現這一函數,從DPRC Server接收到函數調用流;

  3. DRPC Server會爲每次函數調用生成惟一的id;

  4. Storm上運行的topology開始計算結果,最後經過一個ReturnResults的Bolt鏈接到DRPC Server,發送指定id的計算結果;

  5. DRPC Server經過使用以前爲每一個函數調用生成的id,將結果關聯到對應的發起調用的client,將計算結果返回給client。

 

LinearDRPCTopologyBuilder

Storm提供了一個topology builder——LinearDRPCTopologyBuilder,它能夠自動完成幾乎全部的DRPC步驟。包括:

  1. 構建spout;

  2. 向DRPC Server返回結果;

  3. 爲Bolt提供函數用於對tuples進行彙集。

下面是一個簡單的例子,這個DRPC拓撲只是簡單的在輸入參數後追加「!」後返回:

複製代碼

public static class ExclaimBolt extends BaseBasicBolt {     public void execute(Tuple tuple, BasicOutputCollector collector) {         String input = tuple.getString(1);         collector.emit(new Values(tuple.getValue(0), input + "!"));     }     public void declareOutputFields(OutputFieldsDeclarer declarer) {         declarer.declare(new Fields("id", "result"));     } } public static void main(String[] args) throws Exception {     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");     builder.addBolt(new ExclaimBolt(), 3);     // ... }

複製代碼

由上述例子可見,咱們只需不多的工做便可完成拓撲。當建立LinearDRPCTopologyBuilder的時候,須要指定拓撲中DRPC函數的名稱exclamation。一個DRPC Server能夠協調多個函數,每一個函數有不一樣的函數名稱。拓撲中的第一個bolt的輸入是個字段:第一個是請求的id號;第二個是請求的參數。

LinearDRPCTopologyBuilder同時須要最後一個bolt發射一個包含兩個字段的輸出流:第一個字段是請求id;第二個字段是計算結果。所以,全部的中間tuples必須包含請求id做爲第一個字段。

例子中,ExclaimBolt在輸入tuple的第二個字段後面追加「!」LinearDRPCTopologyBuilder負責處理其他的協調工做:與DRPC Server創建鏈接,發送結果給DRPC Server

本地模式DRPC

DRPC能夠以本地模式運行,下面的代碼是如何在本地模式運行上面的例子:

複製代碼

LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown();

複製代碼

首先建立一個LocalDRPC對象,該對象在本地模擬一個DRPC Server,正如LocalCluster在本地模擬一個Storm集羣同樣。而後建立一個LocalCluster對象在本地模式下運行拓撲。LinearDRPCTopologyBuilder含有單獨的方法用於建立本地拓撲和遠程拓撲。

本地模式下,LocalDRPC並不綁定任何端口,所以Storm的拓撲須要瞭解要通信的對象——這就是爲何createLocalTopology方法須要以LocalDRPC對象做爲輸入。

加載完拓撲以後,經過對LocalDRPC調用execute方法,就能夠執行DRPC函數調用了。

遠程模式DRPC

在實際的Storm集羣上運行DRPC也同樣很簡單。只需完成如下步驟:

  1. 啓動DRPC Server(s);

  2. 配置DRPC Server(s)地址;

  3. 向Storm集羣提交DRPC拓撲。

首先,經過storm腳本啓動DRPC Server:

bin/storm drpc

而後,在Storm集羣中配置DRPC Server地址,這就是DRPCSpout讀取函數調用請求的地方。這一步的配置能夠經過storm.yaml文件或者拓撲的配置來完成。經過storm.yaml文件的配置方式以下:

drpc.servers:   - "drpc1.foo.com"   - "drpc2.foo.com"

最後,經過StormSubmitter啓動DRPC拓撲。爲了以遠程模式運行上面的例子,代碼以下:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology被用於爲Storm集羣建立合適的拓撲。

一個複雜的例子

上面的exclamation只是一個簡單的DRPC例子。下面經過一個複雜的例子介紹如何在Storm集羣內進行DRPC——計算Twitter上每一個URL的到達度(reach),也就是每一個URL暴露給的不一樣人的個數。

爲了完成這一計算,須要完成如下步驟:

  1. 獲取全部點選了(tweet)該URL的人;

  2. 獲取步驟1中全部人的關注者(followers,粉絲);

  3. 對全部關注者followers進行去重;

  4. 對步驟3中的關注者人數進行求和。

一個簡單的URL到達度計算可能涉及成千上萬次數據庫調用以及數以百萬的followers記錄,計算量很是大。有了Storm,將很容易實現這一計算過程。單機上可能須要運行幾分鐘才能完成,在Storm集羣上,即便是最難計算的URL也只須要幾秒鐘。

這個例子的代碼在storm-starter:點擊這裏。這裏是如何建立拓撲的代碼:

複製代碼

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 3); builder.addBolt(new GetFollowers(), 12)         .shuffleGrouping(); builder.addBolt(new PartialUniquer(), 6)         .fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 2)         .fieldsGrouping(new Fields("id"));

複製代碼

拓撲的執行分爲如下四步:
  1. GetTweeters:獲取全部tweet了指定URL的用戶列表,這個Bolt將輸入流[id, url]轉換成輸出流[id, tweeter],每一個url元組被映射爲多個tweeter元組。

  2. GetFollowers:獲取步驟1中全部用戶列表的followers,這個Bolt將輸入流[id, twetter]轉換成輸出流[id, follower],當某我的同時是多我的的關注者follower,並且這些人都tweet了指定的URL,那麼將產生重複的follower元組。

  3. PartialUniquer:將全部followers按照follower id分組,使得同一個follower在同一個task中被處理。這個Bolt接收follower並進行去重計數。

  4. CountAggregator:從各個PartialUniquer中接收各部分的計數結果,累加後完成到達度計算。

下面是PartialUniquer這個Bolt的代碼實現:

複製代碼

public class PartialUniquer extends BaseBatchBolt {     BatchOutputCollector _collector;     Object _id;     Set<String> _followers = new HashSet<String>();          @Override     public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {         _collector = collector;         _id = id;     }     @Override     public void execute(Tuple tuple) {         _followers.add(tuple.getString(1));     }          @Override     public void finishBatch() {         _collector.emit(new Values(_id, _followers.size()));     }     @Override     public void declareOutputFields(OutputFieldsDeclarer declarer) {         declarer.declare(new Fields("id", "partial-count"));     } }

複製代碼

PartialUniquer經過繼承BaseBatchBolt實現了IBatchBolt接口,batch bolt提供了API用於將一批tuples做爲總體來處理。每一個請求id會建立一個新的batch bolt實例,同時Storm負責這些實例的清理工做。

PartialUniquer接收到一個follower元組時執行execute方法,將follower添加到請求id對應的HashSet集合中。

Batch bolt同時提供了finishBatch方法用於當這個task已經處理完全部的元組時調用。PartialUniquer發射一個包含當前task所處理的follower ids子集去重後個數的元組。

在內部實現上,CoordinatedBolt用於檢測指定的bolt是否已經收到指定請求id的全部tuples元組。CoordinatedBolt使用direct streams管理實現這一協做過程。

拓撲的其餘部分易於理解。到達度的每一步的計算過程都是並行進行的,經過DRPC實現也是很是容易的。

Non-linear DRPC拓撲

LinearDRPCTopologyBuilder只能處理線性的」DRPC拓撲——正如到達度這樣能夠經過一系列步驟序列來完成的計算。不難想象,DRPC調用中包含有更復雜的帶有分支和合並Bolt的拓撲。目前,必須本身直接使用CoordinatedBolt來完成這種非線性拓撲的計算。

LinearDRPCTopologyBuilder工做過程

  • DRPCSpout發射[args, return-info],其中return-info包含DRPC Server的主機和端口號,以及DRPC Server爲該次請求生成的惟一id號;

  • 構造一個Storm拓撲包含如下部分:

    • DRPCSpout

    • PrepareRequest(生成一個請求id,爲return info建立一個流,爲args建立一個流)

    • CoordinatedBolt wrappers以及direct groupings

    • JoinResult(將結果與return info拼接起來)

    • ReturnResult(鏈接到DRPC Server,返回結果)

  • LinearDRPCTopologyBuilder是創建在Storm基本元素之上的高層抽象。

高級進階

  • KeyedFairBolt用於組織同一時刻多請求的處理過程;

  • 如何直接使用CoordinatedBolt

相關文章
相關標籤/搜索