3、Storm入門之Topology(拓撲結構)

在這一章,你將學到如何在同一個Storm拓撲結構內的不一樣組件之間傳遞元組,以及如何向一個運行中的Storm集羣發佈一個拓撲。

數據流組

設計一個拓撲時,你要作的最重要的事情之一就是定義如何在各組件之間交換數據(數據流是如何被bolts消費的)。一個數據流組指定了每一個bolt會消費哪些數據流,以及如何消費它們。java

NOTE:一個節點可以發佈一個以上的數據流,一個數據流組容許咱們選擇接收哪一個。git

數據流組在定義拓撲時設置,就像咱們在第二章看到的:github

···
    builder.setBolt("word-normalizer", new WordNormalizer())
           .shuffleGrouping("word-reader");
···

在前面的代碼塊裏,一個boltTopologyBuilder對象設定, 而後使用隨機數據流組指定數據源。數據流組一般將數據源組件的ID做爲參數,取決於數據流組的類型不一樣還有其它可選參數。apache

NOTE:每一個InputDeclarer能夠有一個以上的數據源,並且每一個數據源能夠分到不一樣的組。
編程

隨機數據流組(隨機分組)

隨機流組是最經常使用的數據流組。它只有一個參數(數據源組件),而且數據源會向隨機選擇的bolt發送元組,保證每一個消費者收到近似數量的元組。緩存

隨機數據流組用於數學計算這樣的原子操做。然而,若是操做不能被隨機分配,就像第二章爲單詞計數的例子,你就要考慮其它分組方式了。服務器

域數據流組(字段分組)

域數據流組容許你基於元組的一個或多個域控制如何把元組發送給bolts。它保證擁有相同域組合的值集發送給同一個bolt。回到單詞計數器的例子,若是你用word域爲數據流分組,word-normalizer bolt將只會把相同單詞的元組發送給同一個word-counterbolt實例。app

···
    builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));
···

NOTE: 在域數據流組中的全部域集合必須存在於數據源的域聲明中。分佈式

所有數據流組(所有分組)

所有數據流組,爲每一個接收數據的實例複製一份元組副本。這種分組方式用於向bolts發送信號。好比,你要刷新緩存,你能夠向全部的bolts發送一個刷新緩存信號。在單詞計數器的例子裏,你可使用一個所有數據流組,添加清除計數器緩存的功能(見拓撲示例ide

    public void execute(Tuple input) {
        String str = null;
        try{
            if(input.getSourceStreamId().equals("signals")){
                str = input.getStringByField("action");
                if("refreshCache".equals(str))
                    counters.clear();
            }
        }catch (IllegalArgumentException e){
            //什麼也不作
        }
        ···
    }

咱們添加了一個if分支,用來檢查源數據流。Storm容許咱們聲明具名數據流(若是你不把元組發送到一個具名數據流,默認發送到名爲」default「的數據流)。這是一個識別元組的極好的方式,就像這個例子中,咱們想識別signals同樣。 在拓撲定義中,你要向word-counter bolt添加第二個數據流,用來接收從signals-spout數據流發送到全部bolt實例的每個元組。

   builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");

signals-spout的實現請參考git倉庫

自定義數據流組(自定義分組)

你能夠經過實現backtype.storm.grouping.CustormStreamGrouping接口建立自定義數據流組,讓你本身決定哪些bolt接收哪些元組。

讓咱們修改單詞計數器示例,使首字母相同的單詞由同一個bolt接收。

public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
        int numTasks = 0;
        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }
        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

這是一個CustomStreamGrouping的簡單實現,在這裏咱們採用單詞首字母字符的整數值與任務數的餘數,決定接收元組的bolt

按下述方式word-normalizer修改便可使用這個自定義數據流組。

 builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());

直接數據流組(直接分組)

這是一個特殊的數據流組,數據源能夠用它決定哪一個組件接收元組。與前面的例子相似,數據源將根據單詞首字母決定由哪一個bolt接收元組。要使用直接數據流組,在WordNormalizer bolt中,使用emitDirect方法代替emit

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //對元組作出應答
        collector.ack(input);
    }
    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

prepare方法中計算任務數

public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

在拓撲定義中指定數據流將被直接分組:

builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

全局數據流組(全局分組)

全局數據流組把全部數據源建立的元組發送給單一目標實例(即擁有最低ID的任務)。

不分組(無分組)

寫做本書時(Stom0.7.1版),這個數據流組至關於隨機數據流組。也就是說,使用這個數據流組時,並不關心數據流是如何分組的。

LocalCluster VS StormSubmitter

到目前爲止,你已經用一個叫作LocalCluster的工具在你的本地機器上運行了一個拓撲。Storm的基礎工具,使你可以在本身的計算機上方便的運行和調試不一樣的拓撲。可是你怎麼把本身的拓撲提交給運行中的Storm集羣呢?Storm有一個有趣的功能,在一個真實的集羣上運行本身的拓撲是很容易的事情。要實現這一點,你須要把LocalCluster換成StormSubmitter並實現submitTopology方法, 它負責把拓撲發送給集羣。

下面是修改後的代碼:

//LocalCluster cluster = new LocalCluster();
    //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, 
    //builder.createTopology());
    StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf,
            builder.createTopology());
    //Thread.sleep(1000);
    //cluster.shutdown();

NOTE: 當你使用StormSubmitter時,你就不能像使用LocalCluster時同樣經過代碼控制集羣了。

接下來,把源碼壓縮成一個jar包,運行Storm客戶端命令,把拓撲提交給集羣。若是你已經使用了Maven, 你只須要在命令行進入源碼目錄運行:mvn package

如今你生成了一個jar包,使用storm jar命令提交拓撲(關於如何安裝Storm客戶端請參考附錄A)。命令格式:storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

對於這個例子,在拓撲工程目錄下面運行:

storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt

經過這些命令,你就把拓撲發佈集羣上了。

若是想中止或殺死它,運行:

storm kill Count-Word-Topology-With-Refresh-Cache

NOTE:拓撲名稱必須保證唯一性。

NOTE:如何安裝Storm客戶端,參考附錄A

DRPC拓撲

有一種特殊的拓撲類型叫作分佈式遠程過程調用(DRPC),它利用Storm的分佈式特性執行遠程過程調用(RPC)(見下圖)。Storm提供了一些用來實現DRPC的工具。第一個是DRPC服務器,它就像是客戶端和Storm拓撲之間的鏈接器,做爲拓撲的spout的數據源。它接收一個待執行的函數和函數參數,而後對於函數操做的每個數據塊,這個服務器都會經過拓撲分配一個請求ID用來識別RPC請求。拓撲執行最後的bolt時,它必須分配RPC請求ID和結果,使DRPC服務器把結果返回正確的客戶端。

NOTE:單實例DRPC服務器可以執行許多函數。每一個函數由一個唯一的名稱標識。

Storm提供的第二個工具(已在例子中用過)是LineDRPCTopologyBuilder,一個輔助構建DRPC拓撲的抽象概念。生成的拓撲建立DRPCSpouts——它鏈接到DRPC服務器並向拓撲的其它部分分發數據——幷包裝bolts,使結果從最後一個bolt返回。依次執行全部添加到LinearDRPCTopologyBuilder對象的bolts

做爲這種類型的拓撲的一個例子,咱們建立了一個執行加法運算的進程。雖然這是一個簡單的例子,可是這個概念能夠擴展到複雜的分佈式計算。

bolt按下面的方式聲明輸出:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id","result"));
    }

由於這是拓撲中唯一的bolt,它必須發佈RPC ID和結果。execute方法負責執行加法運算。

public void execute(Tuple input) {
        String[] numbers = input.getString(1).split("\\+");
        Integer added = 0;
        if(numbers.length<2){
            throw new InvalidParameterException("Should be at least 2 numbers");
        }
        for(String num : numbers){
            added += Integer.parseInt(num);
        }
        collector.emit(new Values(input.getValue(0),added));
    }

包含加法bolt的拓撲定義以下:

public static void main(String[] args) {
        LocalDRPC drpc = new LocalDRPC();
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
        builder.addBolt(AdderBolt(),2);
        Config conf = new Config();
        conf.setDebug(true);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("drpcder-topology", conf,
            builder.createLocalTopology(drpc));
        String result = drpc.execute("add", "1+-1");
        checkResult(result,0);
        result = drpc.execute("add", "1+1+5+10");
        checkResult(result,17);
        cluster.shutdown();
        drpc.shutdown();
    }

建立一個LocalDRPC對象在本地運行DRPC服務器。接下來,建立一個拓撲構建器(譯者注:LineDRpctopologyBuilder對象),把bolt添加到拓撲。運行DRPC對象(LocalDRPC對象)的execute方法測試拓撲。

NOTE:使用DRPCClient類鏈接遠程DRPC服務器。DRPC服務器暴露了Thrift API,所以能夠跨語言編程;而且不管是在本地仍是在遠程運行DRPC服務器,它們的API都是相同的。 對於採用Storm配置的DRPC配置參數的Storm集羣,調用構建器對象的createRemoteTopology向Storm集羣提交一個拓撲,而不是調用createLocalTopology

相關文章
相關標籤/搜索