storm翻譯(3)Distributed RPC(分佈式遠程調用)

原文地址:http://storm.apache.org/documentation/Distributed-RPC.htmlhtml

分佈式RPC的目的是在storm進行大量的實時計算時,可以並行的調用storm上的函數。Storm topology能夠將函數參數做爲輸入Stream,而且將被調用方法產生的結果做爲返回發送出去。java

與其說DRPCstorm的一個特色,不如說它只是storm基本概念如steamsspoutsboltstopologies的一種表達方式。DRPC能夠獨立於storm做爲一個庫發佈,但在和storm捆綁在一塊兒時將會很是有用。git

高級概述

分佈式RPC被「DRPC server」實現(storm包中已經有了對應的實例)。DRPC server協調接收一個RPC請求並將這個請求發送給storm topology,而後接收storm topology算出的結果,再將結果發送給等待中的客戶端。從客戶端的視角來看,分佈式RPC調用過程跟普通的RPC調用過程同樣。舉例:這裏有一個客戶端調用「reach」方法,輸入參數是http://twitter.com,而後獲得計算結果的例子。github

DRPCClient client = new DRPCClient("drpc-host", 3772);數據庫

String result = client.execute("reach", "http://twitter.com");apache

這個DRPC的工做流能夠描述爲:api

                 

一個客戶端向DRPC Server發送了想要調用的方法名稱和方法參數。實現了這個方法的topology用一個DRPCSpoutDRPC Server接收了函數調用Stream。每一個遠程方法在DRPC Server上都有一個惟一的IDTopology計算出結果以後,使用一個ReturnResultsbolt鏈接DPRC Server後,將結果交給它。DRPC Server根據方法ID匹配出結果,而後喚醒等待的客戶端,將結果發送給客戶端app

LinearDRPCTopologyBuilder(線性)

Storm中有一個LinearDRPCTopologyBuilder topology 生成,已經自動實現了DPRC調用過程當中的絕大部分。包括:jvm

1:配置spout分佈式

2:將結果返回給DPRC Server

3:爲bolt提供了簡單的tuple之間的聚合操做

讓咱們看一個簡單的例子,這裏有一個DPRC topology的實現,他能夠給輸入的參數後面添加一個「!」。

public static class ExclaimBolt extends BaseBasicBolt {

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String input = tuple.getString(1);

        collector.emit(new Values(tuple.getValue(0), input + "!"));

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "result"));

    }

}

 

public static void main(String[] args) throws Exception {

    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");

    builder.addBolt(new ExclaimBolt(), 3);

// ...

}

就像咱們看到的,代碼很是簡單。當咱們建立一個LinearDRPCTopologyBuilder時,你告訴它實現了DRPC功能的topology的名字。一個DRPC server能夠配置不少topology名稱,這些名稱不能重複。第一個bolt不在代碼中,應該是LinearDRPCTopologyBuilder內部的譯者注)會接受2tuples,第一個屬性是請求id,第二個是方法的參數。LinearDRPCTopologyBuilder中最後一個bolt返回了2tuples的輸出StreamStream的格式爲[id, result],固然了,過程當中產生的全部tuple的第一個屬性都是請求id

在這個例子中,ExclaimBolt只是簡單的在tuple的第二種屬性上添加了一個「!」。LinearDRPCTopologyBuilder完成了鏈接DRPC server並返回結果的其餘過程。

本地DRPC模式

DRPC能夠在本地運行,例子:

LocalDRPC drpc = new LocalDRPC();

LocalCluster cluster = new LocalCluster();

 

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

 

System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

 

cluster.shutdown();

drpc.shutdown();

第一步,建立一個LocalDRPC對象。這個隊形模擬DPRC server的運行,就像LocalCluster模擬集羣運行同樣。而後你建立一個LocalCluster在本地運行topologyLinearDRPCTopologyBuilder有獨立的方法分佈建立本地topology和遠程topology。本地運行的LocalDRPC不須要綁定端口,由於在本地topology不須要端口來傳遞對象。這就是爲何createLocalTopologyLocalDRPC做爲輸入。

啓動以後,你就能夠看到DRPC調用過程在本地執行。

遠程DRPC模式

在實際的集羣上使用DRPC也很是簡單,只須要三步:

1:啓動DRPC server

2:配置DRPC server地址

3:將DRPC topology提交到storm集羣

啓動DRPC server就行啓動NimbusUI同樣簡單
bin/storm drpc

接下來,你須要爲storm集羣配置DRPC的地址,才能DRPCSpout讓知道在哪裏讀取方法調用。能夠在storm.yaml中配置或者經過topology配置。在storm.yaml中配置以下

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最後,你要經過StormSubmitter啓動DPRC topology,就想啓動任何topology同樣。遠程模式運行上面的例子你能夠:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用來爲storm集羣建立topology

一個更復雜的例子

上面那個感嘆號例子用來熟悉DPRC的概念仍是過於簡單。讓咱們看一個更復雜的例子,一個真正須要經過並行運行storm來計算的DRPC方法。例子就是計算Twitter上的URLreach

URLreach就是這個URL暴漏給多少用戶。爲了計算reach,你須要:

1:獲取這個URL的全部twitter

2:獲取這些twitterfollower

3:去掉重複的follower

4:計算每一個URLfollower

一個真實的計算須要數千次的數據庫交互和上百萬的flowwer記錄的計算。這是很是很是碉堡的計算。但正如你所看到的,基於storm實現這個功能卻很是的簡單。在單臺機器上,reach要花上數分鐘來計算;可是在storm集羣上,你能夠在數秒鐘就計算出最難算的URLreach

storm-starter上有一個reach topologyhere)樣例,這裏告訴你如何定義一個reach topology

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");

builder.addBolt(new GetTweeters(), 3);

builder.addBolt(new GetFollowers(), 12)

        .shuffleGrouping();

builder.addBolt(new PartialUniquer(), 6)

        .fieldsGrouping(new Fields("id", "follower"));

builder.addBolt(new CountAggregator(), 2)

.fieldsGrouping(new Fields("id"));

這個topology計算一共須要四步:

1:GetTweeters獲取了tweeted這個URL的用戶。它將輸入stream[id, url]轉換成了一個輸出Stream[id, tweeter]。每個URL映射了多個用戶tuple

2:獲取了每個tweeterfollowers(粉絲)。它將輸入stream[id, tweeter]轉換成了一個輸出Stream [id, follower]。通過這個過程,因爲一個follower是同時tweet了同一個URL的多個用戶的粉絲,就會產生一些重複的follower tuple

3.PartialUniquerfollower Stream按照followerid分組,保證同一個follower會進入同一個task。因此每個PartialUniquer task都會接收到相互獨立的follower集合。當PartialUniquer接收了全部根據request id分配給它的follower tuples,它就會將去重以後的follower集合的數量發射出去。

4.最終,CountAggregator接收了每個PartialUniquer task發射的數量,而且經過計算總和來完成reach的計算過程。

下面來看一下PartialUniquer bolt

public class PartialUniquer extends BaseBatchBolt {

    BatchOutputCollector _collector;

    Object _id;

    Set<String> _followers = new HashSet<String>();

 

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

        _collector = collector;

        _id = id;

    }

 

    @Override

    public void execute(Tuple tuple) {

        _followers.add(tuple.getString(1));

    }

 

    @Override

    public void finishBatch() {

        _collector.emit(new Values(_id, _followers.size()));

    }

 

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "partial-count"));

    }

}

PartialUniquer經過繼承BaseBatchBolt實現了IBatchBolt接口。批處理bolt提供了一流的API能夠將批量的tuples看成一個批次來處理。每個request id都會建立一個batch bolt實例,而storm負責在適當的時候清理這些實例。

每當PartialUniquerexecute方法中接收到一個follower tuple,它就將follower放到request id對應的set集合中。

當發射到這個bolt的全部tuples都被處理以後,batch bolts中的finishBatch方法將會被調用。 PartialUniquer發射了一個包含follower數量的tuple

在後臺,CoordinatedBolt用來判斷bolt是否所有接受了指定request id的全部tupleCoordinatedBolt利用直接Stream來管理此次協調。

這個topology的剩餘部分就顯而易見了。就像你看到的,reach計算的每一步都是並行計算,並且實現DRPC topology是多麼的簡單。

Non-linear(非線性)DRPC topologies

LinearDRPCTopologyBuilder只能處理線下的DRPC topologies:整個計算能夠分割爲多個獨立的順序步驟。它很難處理包含有bolt分支和bolt合併的複雜topology。目前,爲了實現複雜的功能,你只能經過直接使用CoordinatedBolt

LinearDRPCTopologyBuilder如何工做

DRPCSpout發射了[args, return-info].return-info中包含DRPC的地址和端口,就像DRPCid同樣。

構建一個topology包含:

  • DRPCSpout

  • PrepareRequest(準備請求:產生一個request id併爲return-info和參數分別建立一個Stream

  • CoordinatedBolt wrappers and direct groupingsCoordinatedBolt封裝和直接分組)

  • JoinResult(將result-info加入結果)

  • ReturnResult(鏈接DRPC server 並返回結果)

LinearDRPCTopologyBuilder是一個很是好storm高級抽象的例子

高級

KeyedFairBolt封裝同時處理多個請求

如何直接使用CoordinatedBolt

譯者:須要詳細瞭解CoordinatedBolt,我的推薦徐明明的一個博客http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/

相關文章
相關標籤/搜索