Storm經常使用操做命令java
一、任務提交命令:storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】apache
storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology wordcount數組
與hadoop不一樣的是:不須要指定輸入輸出路徑安全
hadoop jar /usr/local/wordcount.jar /data.txt /wcoutapp
二、殺死任務命令:storm kill 【拓撲名稱】 -w 10(執行kill命令時能夠經過-w [等待秒數]指定拓撲停用之後的等待時間)框架
storm kill topology-name -w 10maven
三、停用任務命令:storm deactive 【拓撲名稱】編輯器
storm deactive topology-nameide
咱們可以掛起或停用運行中的拓撲。當停用拓撲時,全部已分發的元組都會獲得處理,可是spouts的nextTuple方法不會被調用。銷燬一個拓撲,可使用kill命令。它會以一種安全的方式銷燬一個拓撲,首先停用拓撲,在等待拓撲消息的時間段內容許拓撲完成當前的數據流。函數
四、啓用任務命令:storm activate 【拓撲名稱】
storm activate topology-name
五、從新部署任務命令:storm rebalance 【拓撲名稱】
storm rebalance topology-name
再平衡使你重分配集羣任務。這是個很強大的命令。好比,你向一個運行中的集羣增長了節點。再平衡命令將會停用拓撲,而後在相應超時時間以後重分配worker,並重啓拓撲。
StormWordCount(重點掌握)
WordCount分析:
Java版本:
一、讀取文件中的數據,一行一行的讀取;
二、將讀到的數據進行切割;
三、對切割後的數組中的單詞進行計算。
Hadoop版本:
一、按行讀取文件中的數據;
二、在Mapper()函數中對每一行的數據進行切割,並輸出切割後的數據數組;
三、接收Mapper()中輸出的數據數組,在Reducer()函數中對數組中的單詞進行計算,將計算後的統計結果輸出。
Storm版本:
一、Spout從外部數據源中讀取數據,隨機發送一個元組對象出去;
二、SplitBolt接收Spout中輸出的元組對象,將元組中的數據切分紅單詞,並將切分後的單詞發射出去;
三、WordCountBolt接收SplitBolt中輸出的單詞數組,對裏面單詞的頻率進行累加,將累加後的結果輸出。
StormWordCount代碼實現及分析(重點掌握)
在IDEA中建立一個Maven項目,先在pom.xml添加依賴--->import changes
建立Maven項目步驟:
使用IDEA編輯器建立一個Maven項目
前提:假設您已經安裝好了IDEA編輯器,因爲編輯器自帶Maven插件,不須要單獨安裝maven。固然IDEA自己是支持安裝外部的maven的。
一、打開編輯器
筆者使用的是14.1 不是當前最新編輯器
二、建立maven項目第一步:
依次點擊軟件左上角的File->new->project
而後選擇maven,並點擊next。在這一步有一個須要注意的地方,就是爲你的項目選擇JDK或者SDK.若是您以前沒有配置過JDK,能夠點擊new按鈕,設置您JDK的home目錄。
三、填寫maven項目的groupid,和artifactid。而後點擊下一步
通常來說,groupid寫您的公司及部門或項目的名稱,好比:com.ahu
artifactid寫您的子項目或者子模塊的名字,好比當前項目是建立maven項目,咱們能夠將artifactid寫成:stormwordcount
version能夠不用修改
四、填寫項目名稱及指定項目所在的目錄
projectName:StormWordCount
location:任意地址---->好比:E:\StormWordCount
至此,建立maven項目完畢。
1 <dependencies> 2 <dependency> 3 <groupId>org.apache.storm</groupId> 4 <artifactId>storm-core</artifactId> 5 <version>0.9.5</version> 6 <!-- <scope>provided</scope>--> 7 </dependency> 8 </dependencies>
而後在寫相關代碼:
項目主要流程:
1 package com.ahu.storm; 2 3 4 import backtype.storm.Config; 5 import backtype.storm.LocalCluster; 6 import backtype.storm.StormSubmitter; 7 import backtype.storm.generated.AlreadyAliveException; 8 import backtype.storm.generated.InvalidTopologyException; 9 import backtype.storm.topology.TopologyBuilder; 10 import backtype.storm.tuple.Fields; 11 12 /** 13 * Created by ahu_lichang on 2017/5/18. 14 */ 15 public class WordCountTopologyMain { 16 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 17 //一、準備一個TopologyBuilder 18 //storm框架支持多語言,在Java環境下建立一個拓撲,須要使用TopologyBuilder 19 TopologyBuilder topologyBuilder = new TopologyBuilder(); 20 //MySpout類,在已知的英文句子中,所及發送一條句子出去 21 topologyBuilder.setSpout("mySpout", new MySpout(), 2); 22 //MySplitBolt類,主要是將一行一行的文本內容切割成單詞 23 topologyBuilder.setBolt("mybolt1", new MySplitBolt(), 2).shuffleGrouping("mySpout"); 24 //MyCountBolt類,負責對單詞的頻率進行累加 25 topologyBuilder.setBolt("mybolt2", new MyCountBolt(), 4).fieldsGrouping("mybolt1", new Fields("word")); 26 /** 27 * i 28 * am 29 * lilei 30 * love 31 * hanmeimei 32 */ 33 //二、建立一個configuration,用來指定當前topology 須要的worker的數量 34 //啓動topology的配置信息 35 Config config = new Config(); 36 //定義你但願集羣分配多少個工做進程給你來執行這個topology 37 config.setNumWorkers(2); 38 39 //三、提交任務 -----兩種模式 本地模式和集羣模式 40 //這裏將拓撲名稱寫死了mywordcount,因此在集羣上打包運行的時候,不用寫拓撲名稱了!也可用arg[0] 41 StormSubmitter.submitTopology("mywordcount", config, topologyBuilder.createTopology()); 42 //LocalCluster localCluster = new LocalCluster(); 43 //localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology()); 44 } 45 }
MySpout的實現及生命週期:
1 package com.ahu.storm; 2 3 import backtype.storm.spout.SpoutOutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichSpout; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Values; 9 10 import java.util.Map; 11 12 /** 13 * Created by ahu_lichang on 2017/5/18. 14 */ 15 public class MySpout extends BaseRichSpout { 16 //用來收集Spout輸出的Tuple 17 SpoutOutputCollector collector; 18 19 //初始化方法 20 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 21 this.collector = collector; 22 } 23 24 //storm 框架在 while(true) 調用nextTuple方法 25 public void nextTuple() { 26 collector.emit(new Values("i am lilei love hanmeimei")); 27 } 28 29 //消息源能夠發射多條消息流stream.多條消息流能夠理解爲多種類型的數據 30 public void declareOutputFields(OutputFieldsDeclarer declarer) { 31 declarer.declare(new Fields("sentence")); 32 } 33 }
MySplitBolt的實現及生命週期:
1 package com.ahu.storm; 2 3 import backtype.storm.task.OutputCollector; 4 import backtype.storm.task.TopologyContext; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseRichBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 13 /** 14 * Created by ahu_lichang on 2017/5/18. 15 */ 16 public class MySplitBolt extends BaseRichBolt { 17 OutputCollector collector; 18 19 //初始化方法 20 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 21 this.collector = collector; 22 } 23 24 // 被storm框架 while(true) 循環調用 傳入參數tuple 25 //input內容是句子,execute方法將句子切割成單詞發出 26 public void execute(Tuple input) { 27 String line = input.getString(0); 28 String[] arrWords = line.split(" "); 29 for (String word : arrWords) { 30 collector.emit(new Values(word, 1)); 31 } 32 } 33 34 public void declareOutputFields(OutputFieldsDeclarer declarer) { 35 declarer.declare(new Fields("word", "num")); 36 } 37 }
MyCountBolt的實現及生命週期:
1 package com.ahu.storm; 2 3 4 import backtype.storm.task.OutputCollector; 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.OutputFieldsDeclarer; 7 import backtype.storm.topology.base.BaseRichBolt; 8 import backtype.storm.tuple.Tuple; 9 10 import java.util.HashMap; 11 import java.util.Map; 12 13 /** 14 * Created by ahu_lichang on 2017/5/18. 15 */ 16 public class MyCountBolt extends BaseRichBolt { 17 OutputCollector collector; 18 //用來保存最後計算的結果key=單詞,value=單詞個數 19 Map<String, Integer> map = new HashMap<String, Integer>(); 20 21 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 22 this.collector = collector; 23 } 24 25 public void execute(Tuple input) { 26 String word = input.getString(0); 27 Integer num = input.getInteger(1); 28 System.out.println(Thread.currentThread().getId() + " word:" + word); 29 if (map.containsKey(word)) { 30 Integer count = map.get(word); 31 map.put(word, count + num); 32 } else { 33 map.put(word, num); 34 } 35 System.out.println("count:" + map); 36 } 37 38 public void declareOutputFields(OutputFieldsDeclarer declarer) { 39 //不輸出 40 } 41 }
兩種運行模式:
一、本地模式:直接在IDEA中的WordCountTopologyMain運行便可在控制檯觀察到輸出結果
二、集羣模式:
要打包運行。打包方法:
將jar包上傳到storm1上,去運行storm /root/stormwordcount.XXXX.jar com.ahu.storm.WordCountTopologyMain
注意:這樣打包運行的時候,會出錯:NoClassDefFoundError
這是由於打包的時候,有的jar包沒有打到裏面去,打包方式不對!須要在pom.xml指定一個Build,指定打包的方式,將全部的依賴都打成jar。
1 <build> 2 <plugins> 3 <plugin> 4 <artifactId>maven-assembly-plugin</artifactId> 5 <configuration> 6 <descriptorRefs> 7 <descriptorRef>jar-with-dependencies</descriptorRef> 8 </descriptorRefs> 9 <!-- <archive> 10 <manifest> 11 <mainClass>com.ahu.storm.hadoop.mapreduce.wordcount.WordCount</mainClass> 12 </manifest> 13 </archive>--> 14 </configuration> 15 <executions> 16 <execution> 17 <id>make-assembly</id> 18 <phase>package</phase> 19 <goals> 20 <goal>single</goal> 21 </goals> 22 </execution> 23 </executions> 24 </plugin> 25 </plugins> 26 </build>
這樣再打包運行,就不會出錯了!運行成功後,能夠在worker運行的機器上查看日誌:/export/servers/storm/logs/下查看,tail -100f worker-6701.log.1
Storm具體的任務執行流程圖
Stream Grouping詳解
Storm裏面有7種類型的stream grouping
l Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple,保證每一個bolt接收到的tuple數目大體相同。
l Fields Grouping:按字段分組,好比按userid來分組,具備一樣userid的tuple會被分到相同的Bolts裏的一個task,而不一樣的userid則會被分配到不一樣的bolts裏的task。
l All Grouping:廣播發送,對於每個tuple,全部的bolts都會收到。
l Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
l Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是同樣的效果, 有一點不一樣的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏面去執行。
l Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪一個task處理這個消息。只有被聲明爲Direct Stream的消息流能夠聲明這種分組方法。並且這種消息tuple必須使用emitDirect方法來發射。消息處理者能夠經過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
l Local or shuffle grouping:若是目標bolt有一個或者多個task在同一個工做進程中,tuple將會被隨機發生給這些tasks。不然,和普通的Shuffle Grouping行爲一致。