storm從入門到放棄教程(3)--初運行Topologies (拓撲)

概述

上一篇【 基礎概念詳細介紹 】 博文鏈接:https://my.oschina.net/u/2342969/blog/874052html

     本文將對Topologies (拓撲)進行詳細的解釋,此博文須要有必定storm基礎,如需瞭解基礎,請閱讀上一篇博文,其中將會包含內容以下:java

  1. TopologyBuilder
  2. 如何在生產集羣運行一個拓撲
  3. 本地運行拓撲

主體

 簡單使用

      TopologyBuilder爲storm聲明的拓撲暴露的java API,用於執行這個拓撲, 在java中可使用此類構建拓撲。最終,拓撲們會是 Thrift  構造,因爲 Thrift   很複雜,TopologyBuilder能夠大大的簡化建立拓撲,apache

生產環境例子以下:編程

// 建立TopologyBuilder
    TopologyBuilder builder = new TopologyBuilder();
    // 爲拓撲定義一個spout(最後一個參數是指定並行數,非必填)
    builder.setSpout("1", new TestWordSpout(true), 5);
    builder.setSpout("2", new TestWordSpout(true), 3);
    // 爲拓撲定義一個bolt(最後一個參數是指定並行數,非必填)
    builder.setBolt("3", new TestWordCounter(), 3)
            //爲bolt定義流分組
            .fieldsGrouping("1", new Fields("word"))
            .fieldsGrouping("2", new Fields("word"));
    builder.setBolt("4", new TestGlobalCount())
            .globalGrouping("1");

    Map conf = new HashMap();//爲拓撲定義配置
    conf.put(Config.TOPOLOGY_WORKERS, 4); //爲拓撲配置工做進程數
    
    //提交拓撲
    StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());

本地例子以下:bash

// 建立TopologyBuilder
    TopologyBuilder builder = new TopologyBuilder();
    // 爲拓撲定義一個spout(最後一個參數是指定並行數,非必填)
    builder.setSpout("1", new TestWordSpout(true), 5);
    builder.setSpout("2", new TestWordSpout(true), 3);
    // 爲拓撲定義一個bolt(最後一個參數是指定並行數,非必填)
    builder.setBolt("3", new TestWordCounter(), 3)
            //爲bolt定義流分組
            .fieldsGrouping("1", new Fields("word"))
            .fieldsGrouping("2", new Fields("word"));
    builder.setBolt("4", new TestGlobalCount())
            .globalGrouping("1");

    Map conf = new HashMap();//爲拓撲定義配置
    conf.put(Config.TOPOLOGY_WORKERS, 4); //爲拓撲配置工做進程數
    conf.put(Config.TOPOLOGY_DEBUG, true);// 打開DEBUG模式
    LocalCluster cluster = new LocalCluster();// 建立本地集羣(new 這個類便可)
    //向本地集羣提交拓撲
    cluster.submitTopology("mytopology", conf, builder.createTopology());
    //線程睡眠10秒
    Utils.sleep(10000);
    //關閉集羣
    cluster.shutdown();

TopologyBuilder是使用setSpout和setBolt方法影響組件ID到組件,那些方法生成的對象能夠被用於聲明那些組件的輸入。maven

在生產運行拓撲

部署步驟

      在生產運行拓撲相似於在本地運行,如下說明一些步驟:性能

  1. 使用java編程的話, 用TopologyBuilder定義拓撲(參考上一節)
  2. 使用StormSubmitter類提交拓撲到集羣,StormSubmitter提交拓撲須要 拓撲、拓撲名稱,拓撲的配置。例子以下:
    Config conf = new Config();
    conf.setNumWorkers(20);
    conf.setMaxSpoutPending(5000);
    StormSubmitter.submitTopology("mytopology", conf, topology);

     

  3. 將代碼打包成jar,須要包含代碼的依賴喲(除storm 依賴外,storm   加入了集羣classpath中),若是使用的maven,可使用這個插件--Maven Assembly Plugin,只須要在 pom.xml 添加以下依賴便可:
    <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>  
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>com.path.to.main.Class</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>

    運行 mvn assembly:assembly   便可獲得合適的jar包。在集羣classpath中已經存在storm,請確保已經排除storm.jar。ui

  4. 使用storm客戶端提交拓撲到集羣,指定jar包路徑,運行類名,任意參數。例子以下:
    storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3

    storm jar 命令能夠提交拓撲到集羣而且配置StormSubmitter類鏈接正確的集羣,在上面例子中,上傳後的allmycode.jar,storm jar 命令調用org.me.MyTopology中的main方法,將arg1,arg2,arg3三個參數傳入main方法,spa

    有時間將寫一篇如何搭建storm開發環境, 裏面會詳細講解如何經過此命令運行拓撲的..net

經常使用配置

     每一個拓撲能夠設置不少配置,全部的配置能夠查看官方文檔的Config類http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/Config.html 

以 TOPOLOGY  開頭的配置屬性能夠被重寫,其他的集羣配置沒法被重寫,下面介紹幾個拓撲經常使用設置:

  1. Config.TOPOLOGY_WORKERS: 這個配置是設置工做進程數用於執行拓撲,好比設置25個進程數,集羣中就會有25個java進程去執行全部的任務,若是一個拓撲中有150個並行任務,每一個工做進程會有6個任務線程運行任務。
  2. Config.TOPOLOGY_ACKER_EXECUTORS: 這個配置是設置檢查者數量,有配置數量的執行器去跟蹤元組樹,直到一個spout元組被徹底處理。檢查者是storm可靠性的一個總體結構。若是次配置未設置,storm將會默認使用Config.TOPOLOGY_WORKERS的值,當它的值設置爲0時,默認全部元組均被徹底處理,失去了storm的可靠性。後續將會有專門的博客講述storm的可靠性機制。請你們多多關注,收藏
  3. Config.TOPOLOGY_MAX_SPOUT_PENDING: 這個配置是設置一個單一spout任務運行期間等待的最大spout數(一個元組被認定成功或者失敗爲一個期間),爲了防止隊列爆滿,很是推薦設置此配置。
  4. Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 這個配置是設置一個spout元組被認定爲失敗以前,徹底處理完畢的時間。這個配合默認是30秒,這個默認配置能夠知足大部分拓撲。
  5. Config.TOPOLOGY_SERIALIZATIONS: 這個配置能夠爲storm註冊更多的序列器,爲元組自定義類型

停止拓撲

      使用一下命令能夠停止拓撲。

storm kill {stormname}

{stormname}使用提交拓撲時設定的名字。

它不能馬上停止這個拓撲,它會馬上中止全部spout,阻止它發送更多的元組,storm等待 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS  配置的超時時長,中止此拓撲全部工做進程,這樣子能夠給被停止的拓撲必定時間去處理完剩下的元組。

更新運行中的拓撲

    目前僅支持的操做就是中止這個拓撲,從新提交拓撲到集羣。官方準備實現 storm swap 命令去替換正在運行的拓撲,確保最大限度的減小更新時間以及新老兩個拓撲處理元組能夠平滑的過渡。

監控拓撲

     最好的監控方式是使用 storm UI,它提供了任務發生的全部錯誤信息,詳細的數據吞吐量,運行在拓撲中每一個組件的延遲性能。 也能夠查看集羣中運行日誌。

相關文章
相關標籤/搜索