開發Storm的第一步就是設計Topology,爲了方便開發者入門,首先咱們設計一個簡答的例子,該例子的主要的功能就是把每一個單詞的後面加上Hello,World後綴,而後再打印輸出,整個例子的Topology圖以下:java
整個Topology分爲三部分:apache
TestWordSpout:數據源,負責發送wordsdom
ExclamationBolt:負責把每一個單詞後面加上後綴maven
PrintBolt:負責把單詞打印輸出函數
代碼實現:ui
1.使用IDEA建立maven過程,添加Maven依賴this
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ysl</groupId> <artifactId>storm</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/storm/storm --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.ysl.WordsTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
TestWordSpout:spa
package com.ysl.spouts; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Random; public class TestWordSpout extends BaseRichSpout{ private static Logger logger = LoggerFactory.getLogger(TestWordSpout.class); private SpoutOutputCollector collector = null; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } public void nextTuple() { Utils.sleep(1000); final String[] words = new String[]{"fdfs","fdfs","ffsdfs"}; final Random random = new Random(); final String word = words[random.nextInt(words.length)]; collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
ExclamationBolt:插件
package com.ysl.bolts; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class ExclamationBolt extends BaseRichBolt{ private static Logger logger = LoggerFactory.getLogger(ExclamationBolt.class); private OutputCollector collector = null; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { this.collector.emit(tuple,new Values(tuple.getString(0)+"!!!")); this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
PrintBolt:debug
package com.ysl.bolts; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class PrintBolt extends BaseRichBolt{ private static Logger logger = LoggerFactory.getLogger(PrintBolt.class); private OutputCollector collector = null; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { logger.info(tuple.getString(0) + "......."); this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
WordsTopology:
package com.ysl; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import com.ysl.bolts.ExclamationBolt; import com.ysl.bolts.PrintBolt; import com.ysl.spouts.TestWordSpout; public class WordsTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("word",new TestWordSpout(),1); topologyBuilder.setBolt("exclaim",new ExclamationBolt(),1).shuffleGrouping("word"); topologyBuilder.setBolt("print",new PrintBolt(),1).shuffleGrouping("exclaim"); Config config = new Config(); config.setDebug(true); if(args != null && args.length > 0){ config.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0],config,topologyBuilder.createTopology()); }else{ LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("test",config,topologyBuilder.createTopology()); Utils.sleep(30000); localCluster.killTopology("test"); localCluster.shutdown(); } } }
2.打包運行
使用maven打包應用程序,命令以下:
mvn clean install
storm的運行方式有兩種:一是本地運行,適合調試和開發,本身直接在IDEA中執行main函數運行便可,本地模式的代碼中有設置睡眠時間,到時間後主動kill topoloyg
二是遠程集羣模式運行:集羣模式須要先建立一個包含程序代碼以及代碼所依賴的依賴包的jar包(有關storm的jar包不用包括, 這些jar包會在工做節點上自動被添加到classpath裏面去)。若是使用maven, 那麼插件:Maven Assembly Plugin能夠幫你打包,詳細見上述maven的設置。
遠程運行要使用storm的命令提交topology到storm集羣:
storm jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar com.ysl.WordsTopology testfrfr
執行上面的命令後,出現下面的日誌,表示執行成功:
346 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 351 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar Start uploading file '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes) [==================================================] 6196 / 6196 File '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' uploaded to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes) 363 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar 363 [main] INFO backtype.storm.StormSubmitter - Submitting topology testfrfr in distributed mode with conf {"topology.workers":3,"topology.debug":true} 448 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology:
終止一個topology
要終止一個topology, 執行:
storm kill {stormname}
其中{stormname}是提交topology給storm集羣的時候指定的名字。
storm不會立刻終止topology。相反,它會先終止全部的spout,讓它們再也不發射任何新的tuple, storm會等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒以後才殺掉全部的工做進程。這會給topology足夠的時 間來完成全部咱們執行storm kill命令的時候還沒完成的tuple。