在storm上面運行的單詞計數拓撲Topology包含三個組件,java
WordReader(spout):從文件中讀取每行單子數據。apache
WordNormalizer(bolt):將每行數據拆分紅單個單詞。api
WordCounter(bolt):對單詞計數。maven
流程圖以下:ide
storm的拓撲結構中只會有兩種組件,spout連接數據源讀取數據並傳遞個bolt;bolt處理數據節點輸出結果或傳遞給下一個bolt。ui
建立maven工程,pom文件格式以下this
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <slf4j.version>1.7.12</slf4j.version> <logback.version>1.1.3</logback.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.7</version> <!-- 打包時不講storm打入jar包中 --> <scope>provided</scope> </dependency> <!-- logback --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <!-- 插件將依賴包打入jar包 --> <artifactId> maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef> jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>sfpay.TopologyMain</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
建立WordReader(Spout)這裏經過spout從文件中讀取數據spa
import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader implements IRichSpout { private TopologyContext context; private FileReader fileReader; private SpoutOutputCollector collector; private boolean completed; /** * conf配置對象,在定義topology對象是建立;TopologyContext對象,包含全部拓撲數據;還有SpoutOutputCollector對象,它能讓咱們發佈交給bolts處理的數據。 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //當Spout被建立以後,這個方法會被條用 System.out.println("WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector)"); this.context = context; try { this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { e.printStackTrace(); } this.collector = collector; } /** * 經過它向bolts發佈待處理的數據 <br> * 會在同一個循環內被ack()和fail()週期性的調用。沒有任務時它必須釋放對線程的控制,其它方法纔有機會得以執行 */ @Override public void nextTuple() { // 這裏讀取文件,逐行發佈數據 System.out.println("WordReader.nextTuple()"); if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { //e.printStackTrace(); } return; } String str; BufferedReader reader = new BufferedReader(fileReader); try { while ((str = reader.readLine()) != null) { this.collector.emit(new Values(str),str);// Values是一個ArrarList實現,它的元素就是傳入構造器的參數 } } catch (IOException e) { e.printStackTrace(); } finally { completed = true; } } @Override public void ack(Object msgId) { //消息發送成功調用 System.out.println("wordReader.ack(Object msgId):"+msgId); } @Override public void fail(Object msgId) { //調用失敗時調用 System.out.println("fail(Object msgId):"+msgId); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System. out.println("WordReader.declareOutputFields(OutputFieldsDeclarer declarer)"); declarer.declare(new Fields("line")); } @Override public void close() { //當Topology中止時,會調用這個方法 System.out.println("WordReader.close()"); } @Override public void activate() { System. out.println("WordReader.activate()"); } @Override public void deactivate() { System. out.println("WordReader.deactivate()"); } @Override public Map<String, Object> getComponentConfiguration() { System. out.println("WordReader.getComponentConfiguration()"); return null; } }
建立WordNormalizer(Bolt)拆分spout傳過來的數據行爲單詞。插件
import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{ private OutputCollector collector; private TopologyContext context; private Map stormConf; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { //初始化時被調用 System.out.println("WordNormalizer.prepare(Map stormConf, TopologyContext context, OutputCollector collector)"); this.stormConf = stormConf; this.context = context; this.collector = collector; } /** * 每次接收到元組時都會被調用一次,還會再發布若干個元組 */ @Override public void execute(Tuple input) { //接受從spout或其餘bolt傳過來的元組數據 System.out.println("WordNormalizer。execute(Tuple input)"); String sentence = input.getString(0);//傳入的元組列表 String[] words = sentence.split(" "); for (String word : words) { //單詞去空格,轉小寫,而後再發布出去 word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); collector.emit(new Values(word)); } } //對元組作出應答 collector.ack(input); } @Override public void cleanup() { System.out.println("WordNormalizer。cleanup()"); } /** * 聲明bolt的出參 * * 這個*bolt*只會發佈「word」域 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("WordNormalizer。declareOutputFields(OutputFieldsDeclarer declarer)"); declarer.declare(new Fields("word")); //聲明bolt將發佈一個名爲「word」的域 } @Override public Map<String, Object> getComponentConfiguration() { System.out.println("WordNormalizer。getComponentConfiguration()"); return null; } }
建立WordCounter(Bolt)對單詞進行計數線程
import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{ private Map<String,Integer> counters; private OutputCollector collector; private String name; private Integer id; /** * 初始化 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { System.out.println("WordCounter.prepare(Map stormConf, TopologyContext context, OutputCollector collector)"); this.collector = collector; this.counters = new HashMap<String, Integer>(); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } /** * 爲每一個單詞計數 */ @Override public void execute(Tuple input) { System.out.println("WordCounter.execute(Tuple input)"); String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else{ counters.put(str, counters.get(str)+1); } collector.ack(input); } /** * 這個spout結束時(集羣關閉的時候),會顯示單詞數量 */ @Override public void cleanup() { //在Bolt銷燬時,調用cleanup方法 System.out.println("WordCounter.cleanup()"); for(Map.Entry<String,Integer> entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("WordCounter.declareOutputFields(OutputFieldsDeclarer declarer)"); } @Override public Map<String, Object> getComponentConfiguration() { System.out.println("WordCounter.getComponentConfiguration()"); return null; } }
建立拓撲TopologyMain定義數據執行流程,拓撲運行支持兩種模式本地模式和遠程模式
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import bolts.WordCounter; import bolts.WordNormalizer; import spouts.WordReader; public class TopologyMain { public static void main(String[] args) { // 定義拓撲 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReader(),3); builder.setBolt("word-normalizer", new WordNormalizer(),3).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-normalizer", new Fields("word")); StormTopology topology = builder.createTopology(); // 配置 Config conf = new Config(); conf.put("wordsFile", "/opt/storm/words.txt"); conf.setDebug(false); if (args != null && args.length > 0) { // 有參數爲生產方式 try { StormSubmitter.submitTopology(args[0], conf, topology); } catch (Exception e) { e.printStackTrace(); } } else { // 沒有參數爲本地方式 // 運行拓撲 LocalCluster cluster = new LocalCluster(); // conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); cluster.submitTopology("Getting-Started-Topologie", conf, topology); try { Thread.sleep(5000); cluster.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
配置日誌文件logback,storm使用的logback日誌實現,這裏簡單定義日誌輸出級別。
<?xml version="1.0" encoding= "UTF-8"?> <configuration> <root level="ERROR" /> </configuration>
項目結構以下:
打包提交運行
咱們開發的topology會使用到storm的api,可是因爲storm集羣自己已經有了這些api,因此咱們在maven打包的時候,不須要打包進storm相關jar。因此要加上provided。
可是對於一些storm集羣自己沒有提供的依賴,例如,咱們以前提到的,Storm的最佳數據源是消息中間件,若是咱們之後的案例使用到了RocketMq,咱們須要將其打包進去。默認狀況下,maven打包時只會將咱們本身開發的代碼進行打包,可是依賴的全部jar包都不會被打包進去。此時利用Maven Assembly插件來實現這個功能。
打包命令
mvn assembly:assembly
target目錄下產生兩個jar包,一個帶依賴,一個沒有帶依賴