上一篇【 基礎概念詳細介紹 】 博文鏈接:https://my.oschina.net/u/2342969/blog/874052html
本文將對Topologies (拓撲)進行詳細的解釋,此博文須要有必定storm基礎,如需瞭解基礎,請閱讀上一篇博文,其中將會包含內容以下:java
TopologyBuilder爲storm聲明的拓撲暴露的java API,用於執行這個拓撲, 在java中可使用此類構建拓撲。最終,拓撲們會是 Thrift 構造,因爲 Thrift 很複雜,TopologyBuilder能夠大大的簡化建立拓撲,apache
生產環境例子以下:編程
// 建立TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); // 爲拓撲定義一個spout(最後一個參數是指定並行數,非必填) builder.setSpout("1", new TestWordSpout(true), 5); builder.setSpout("2", new TestWordSpout(true), 3); // 爲拓撲定義一個bolt(最後一個參數是指定並行數,非必填) builder.setBolt("3", new TestWordCounter(), 3) //爲bolt定義流分組 .fieldsGrouping("1", new Fields("word")) .fieldsGrouping("2", new Fields("word")); builder.setBolt("4", new TestGlobalCount()) .globalGrouping("1"); Map conf = new HashMap();//爲拓撲定義配置 conf.put(Config.TOPOLOGY_WORKERS, 4); //爲拓撲配置工做進程數 //提交拓撲 StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
本地例子以下:bash
// 建立TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); // 爲拓撲定義一個spout(最後一個參數是指定並行數,非必填) builder.setSpout("1", new TestWordSpout(true), 5); builder.setSpout("2", new TestWordSpout(true), 3); // 爲拓撲定義一個bolt(最後一個參數是指定並行數,非必填) builder.setBolt("3", new TestWordCounter(), 3) //爲bolt定義流分組 .fieldsGrouping("1", new Fields("word")) .fieldsGrouping("2", new Fields("word")); builder.setBolt("4", new TestGlobalCount()) .globalGrouping("1"); Map conf = new HashMap();//爲拓撲定義配置 conf.put(Config.TOPOLOGY_WORKERS, 4); //爲拓撲配置工做進程數 conf.put(Config.TOPOLOGY_DEBUG, true);// 打開DEBUG模式 LocalCluster cluster = new LocalCluster();// 建立本地集羣(new 這個類便可) //向本地集羣提交拓撲 cluster.submitTopology("mytopology", conf, builder.createTopology()); //線程睡眠10秒 Utils.sleep(10000); //關閉集羣 cluster.shutdown();
TopologyBuilder是使用setSpout和setBolt方法影響組件ID到組件,那些方法生成的對象能夠被用於聲明那些組件的輸入。maven
在生產運行拓撲相似於在本地運行,如下說明一些步驟:性能
Config conf = new Config(); conf.setNumWorkers(20); conf.setMaxSpoutPending(5000); StormSubmitter.submitTopology("mytopology", conf, topology);
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.path.to.main.Class</mainClass> </manifest> </archive> </configuration> </plugin>
運行 mvn assembly:assembly 便可獲得合適的jar包。在集羣classpath中已經存在storm,請確保已經排除storm.jar。ui
storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
storm jar 命令能夠提交拓撲到集羣而且配置StormSubmitter類鏈接正確的集羣,在上面例子中,上傳後的allmycode.jar,storm jar 命令調用org.me.MyTopology中的main方法,將arg1,arg2,arg3三個參數傳入main方法,spa
有時間將寫一篇如何搭建storm開發環境, 裏面會詳細講解如何經過此命令運行拓撲的..net
每一個拓撲能夠設置不少配置,全部的配置能夠查看官方文檔的Config類http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/Config.html
以 TOPOLOGY 開頭的配置屬性能夠被重寫,其他的集羣配置沒法被重寫,下面介紹幾個拓撲經常使用設置:
使用一下命令能夠停止拓撲。
storm kill {stormname}
{stormname}使用提交拓撲時設定的名字。
它不能馬上停止這個拓撲,它會馬上中止全部spout,阻止它發送更多的元組,storm等待 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 配置的超時時長,中止此拓撲全部工做進程,這樣子能夠給被停止的拓撲必定時間去處理完剩下的元組。
目前僅支持的操做就是中止這個拓撲,從新提交拓撲到集羣。官方準備實現 storm swap 命令去替換正在運行的拓撲,確保最大限度的減小更新時間以及新老兩個拓撲處理元組能夠平滑的過渡。
最好的監控方式是使用 storm UI,它提供了任務發生的全部錯誤信息,詳細的數據吞吐量,運行在拓撲中每一個組件的延遲性能。 也能夠查看集羣中運行日誌。