上一頁java
package bolts; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{ private OutputCollector collector; public void cleanup(){} /** * The bolt will receive the line from the * words file and process it to Normalize this line * * The normalize will be put the words in lower case * and split the line to get all words in this */ public void execute(Tuple input) { String sentence = input.getString(0); String[]words= sentence.split(" "); for(String word:words){ word =word.trim(); if(!word.isEmpty()){ word =word.toLowerCase(); //Emit the word List a =new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } // Acknowledge the tuple collector.ack(input); } public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { this.collector=collector; } /** * The bolt will only emit the field "word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
提示:在這個類中,每調用一次execute()方法,會發送多個元組。例如,當execute()方法收到「This is the Storm book」這個句子時,該方法會發送5個新元組。web
第二個bolt,WordCounter,負責統計每一個單詞個數。當topology結束時(cleanup()方法被調用時),顯示每一個單詞的個數。數據庫
提示:第二個bolt中什麼也不發送,本例中,將數據添加到一個map對象中,可是現實生活中,bolt能夠將數據存儲到一個數據庫中。app
package bolts; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{ Integer id; String name; Map<String,Integer>counters; private OutputCollector collector; /** * At the end of the spout (when the cluster is shutdown * We will show the word counters */ @Override public void cleanup(){ System.out.println("-- Word Counter ["+name+"-"+id+"]--"); for(Map.Entry<String,Integer>entry: counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } } /** * On each word We will count */ @Override public void execute(Tuple input) { String str =input.getString(0); /** * If the word dosn't exist in the map we will create * this, if not We will add 1 */ if(!counters.containsKey(str)){ counters.put(str,1); }else{ Integer c =counters.get(str) +1; counters.put(str,c); } //Set the tuple as Acknowledge collector.ack(input); } /** * On create */ @Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { this.counters=newHashMap<String,Integer>(); this.collector=collector; this.name=context.getThisComponentId(); this.id=context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} }
execute()方法使用一個映射(Map類型)採集單詞並統計這些單詞個數。當topology結束的時候,cleanup()方法被調用而且打印出counter映射。(這僅僅是個例子,一般狀況下,當topology關閉時,你應該使用cleanup()方法關閉活動連接和其餘資源。)ide
在主類中,你將建立topology和一個LocalCluster對象,LocalCluster對象使你能夠在本地測試和調試topology。LocalCluster結合Config對象容許你嘗試不一樣的集羣配置。例如,若是不慎使用一個全局變量或者類變量,當配置不一樣數量的worker測試topology的時候,你將會發現這個錯誤。(關於config對象在第三章會有更多介紹)測試
提示:全部的topology結點應該能夠在進程間沒有數據共享的情形下獨立運行(也就是說沒有全局或者類變量),由於當topology運行在一個真實的集羣上時,這些進程可能運行在不一樣的機器上。ui
你將使用TopologyBuilder建立topology,TopologyBuilder會告訴Storm怎麼安排節點順序、它們怎麼交換數據。this
TopologyBuilder builder =new TopologyBuilder(); builder.setSpout("word-reader",new WordReader()); builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
本例中spout和bolt之間使用隨機分組(shuffleGrouping)鏈接,這種分組類型告訴Storm以隨機分佈的方式從源節點往目標節點發送消息。spa
接着,建立一個包含topology配置信息的Config對象,該配置信息在運行時會與集羣配置信息合併,而且經過prepare()方法發送到全部節點。.net
Config conf =new Config(); conf.put("wordsFile",args[0]); conf.setDebug(false);
將wordFile屬性設置爲將要被spout讀取的文件名稱(文件名在args參數中傳入),並將debug屬性設置爲true,由於你在開發過程當中,當debug爲true時,Storm會打印節點間交換的全部消息和其餘調試數據,這些信息有助於理解topology是如何運行的。
前面提到,你將使用LocalCluster來運行topology。在一個產品環境中,topology會持續運行,可是在本例中,你僅需運行topology幾秒鐘就能看到結果。
LocalCluster cluster =new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology()); Thread.sleep(1000); cluster.shutdown();
使用createTopology和submitTopology建立、運行topology,睡眠兩秒(topology運行在不一樣的線程中),而後經過關閉集羣來中止topology。
例2-3將上面代碼拼湊到一塊兒。
例2-3.src/main/java/TopologyMain.java
import spouts.WordReader; import bolts.WordCounter; import bolts.WordNormalizer; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class TopologyMain{ public static void main(String[]args)throws InterruptedException{ //Topology definition TopologyBuilder builder =new TopologyBuilder(); builder.setSpout("word-reader",new WordReader()); builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word")); //Configuration Config conf =new Config(); conf.put("wordsFile",args[0]); conf.setDebug(false); //Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1); LocalCluster cluster =new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology()); Thread.sleep(1000); cluster.shutdown(); } }
如今開始準備運行第一個topology!若是你新建一個文本文件(src/main/resources/words.txt)而且每行一個單詞,則能夠經過以下命令運行這個topology:
mvn exec:java -Dexec.mainClass=」TopologyMain」 -Dexec.args=」src/main/resources/words.txt」
例如,若是你使用以下words.txt文件:
Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great
在日誌中,你將會看到相似以下信息:
is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1
在本例中,你只使用了每一個結點的一個單一實例,假如此時有一個很是大的日誌文件怎麼去統計每一個單詞的個數?此時能夠很方便地改系統中節點數量來並行工做,如建立WordCounter的兩個實例:
1
|
builder.setBolt(
"word-counter"
,
new
WordCounter(),
2
).shuffleGrouping(
"word-normalizer"
);
|
從新運行這個程序,你將看到:
– Word Counter [word-counter-2] –
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
– Word Counter [word-counter-3] –
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1
太棒了!改變並行度,so easy(固然,在實際生活中,每一個實例運行在不一樣的機器中)。但仔細一看彷佛還有點問題:「is」和「great」這兩個單詞在每一個WordCounter實例中都被計算了一次。Why?當使用隨機分組(shuffleGrouping)時,Storm以隨機分佈的方式向每一個bolt實例發送每條消息。在這個例子中,將相同的單詞發送到同一個WordCounter實例是更理想的。爲了實現這個,你能夠將shuffleGrounping(「word-normalizer」)改爲fieldsGrouping(「word-normalizer」,new Fields(「word」))。嘗試一下並從新運行本程序來確認結果。後面的章節你將看到更多關於分組和消息流的內容。
本章咱們討論了Storm的本地操做模式和遠程操做模式的不一樣,以及用Storm開發的強大和簡便。同時也學到了更多關於Storm的基本概念,咱們將在接下來的章節深刻解釋這些概念。