前面說過了Storm的測試項目,那麼此時咱們更想本身寫一個小項目來練練手,首先咱們本身的Windows系統上首先應該安裝好maven,而後啓動Eclipse for JavaEE版本,接下來開始創建項目並開發java
注意,在開發過程當中,不管是Windows仍是Linux都要徹底關閉防火牆,避免網絡的問題apache
單擊"File"->"New"->"Maven Project"數組
接下來的界面默認便可,單擊Next服務器
下一步,繼續單擊Next便可網絡
而後,在Group Id輸入:org.apache.storm 在Artifact Id輸入:firststorm 這裏能夠本身定義,在Version中輸入版本號:0.9.6,這裏其實默認0.1.0沒有問題,這個和storm的版本號沒有任何關係,這裏是咱們項目的版本號,由於只是測試,輸入0.9.6是爲了更簡單;Package包名會自動根據輸入生成,咱們默認便可,而後單擊Finish,稍等右下角滾動條滾動完畢,一個基本的Maven項目就創建成功了,具體結構和上一個測試案例相同,這時在包org.apache.storm.firststorm下有一個默認的類App.java,由Maven自動生成,這個能夠忽略,也能夠刪除dom
而後打開項目根目錄下的pom.xml文件,這個就是構建項目的配置文件,咱們在dependencies標籤之間,添加一個節點,代碼以下:maven
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.6</version> <scope>provided</scope> </dependency>
加入位置以下圖所示,其餘的不用動便可ide
最終pom.xml的代碼以下:oop
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>org.apache.storm</groupId> 6 <artifactId>firststorm</artifactId> 7 <version>0.9.6</version> 8 <packaging>jar</packaging> 9 10 11 <name>firststorm</name> 12 <url>http://maven.apache.org</url> 13 14 <properties> 15 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 16 </properties> 17 18 <dependencies> 19 <dependency> 20 <groupId>junit</groupId> 21 <artifactId>junit</artifactId> 22 <version>3.8.1</version> 23 <scope>test</scope> 24 </dependency> 25 26 <dependency> 27 <groupId>org.apache.storm</groupId> 28 <artifactId>storm-core</artifactId> 29 <version>0.9.6</version> 30 <scope>provided</scope> 31 </dependency> 32 </dependencies> 33 </project>
更簡單的方法咱們能夠直接複製上一個案例中的pom.xml文件直接使用,如今咱們保存pom.xml文件,保存的時候maven會自動下載相關依賴並放到Maven Dependencies下,這些jar包能夠點擊下拉查看,而且會自動添加到項目classpath中,做爲編譯使用,等jar包所有下載完畢,如今開始編寫具體的計算邏輯了,在這個項目中咱們把全部的類都創建在包org.apache.storm.firststorm下測試
首先創建RandomSpout類做爲數據源,而且繼承於父類BaseRichSpout,肯定後能夠看到系統自動補全3個方法:nextTuple,open和declareOutputFields
咱們如今就須要重寫這3個方法,open方法是數據源的初始化,nextTuple的做用是把Tuple發送至下游,declareOutputFields用來定義輸出字段,下面咱們手動分配一個數組,而且隨機取裏面的元素,代碼以下:
1 package org.apache.storm.firststorm; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout; 10 import backtype.storm.tuple.Fields; 11 import backtype.storm.tuple.Values; 12 13 public class RandomSpout extends BaseRichSpout { 14 15 private SpoutOutputCollector collector; 16 private static String[] words = {"Hadoop","Storm","Apache","Linux","Nginx","Tomcat","Spark"}; 17 18 19 public void nextTuple() { 20 String word = words[new Random().nextInt(words.length)]; 21 collector.emit(new Values(word)); 22 23 } 24 25 public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) { 26 this.collector = arg2; 27 } 28 29 public void declareOutputFields(OutputFieldsDeclarer arg0) { 30 arg0.declare(new Fields("randomstring")); 31 } 32 33 }
代碼很簡單,確定能夠看懂,而後新建一個類SenqueceBolt,繼承於BaseBasicBolt類,而且重寫方法execute和declareOutputFields,這個類就是用於執行具體的做業,準確的說是execute方法用來執行相關的計算,這裏只是簡單的輸出,代碼以下:
1 package org.apache.storm.firststorm; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Tuple; 7 8 public class SenqueceBolt extends BaseBasicBolt { 9 10 public void execute(Tuple arg0, BasicOutputCollector arg1) { 11 String word = (String) arg0.getValue(0); 12 String out = "Hello " + word + "!"; 13 System.out.println(out); 14 } 15 16 public void declareOutputFields(OutputFieldsDeclarer arg0) { 17 18 } 19 20 }
最後創建一個類FirstStorm,這個類是主類,在main方法中定義Topology,而且綜合設置Spout和Bolt,從而調用其中的方法,這裏流式計算時間設置爲30s,代碼以下:
1 package org.apache.storm.firststorm; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.generated.AlreadyAliveException; 7 import backtype.storm.generated.InvalidTopologyException; 8 import backtype.storm.topology.TopologyBuilder; 9 import backtype.storm.utils.Utils; 10 11 public class FirstStorm { 12 13 public static void main(String[] args) { 14 TopologyBuilder builder = new TopologyBuilder(); 15 builder.setSpout("spout", new RandomSpout()); 16 builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 17 Config conf = new Config(); 18 conf.setDebug(false); 19 if(args != null && args.length > 0) { 20 conf.setNumWorkers(3); 21 try { 22 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 23 } catch (AlreadyAliveException e) { 24 // TODO Auto-generated catch block 25 e.printStackTrace(); 26 } catch (InvalidTopologyException e) { 27 // TODO Auto-generated catch block 28 e.printStackTrace(); 29 } 30 } else { 31 LocalCluster cluster = new LocalCluster(); 32 cluster.submitTopology("firststorm", conf, builder.createTopology()); 33 Utils.sleep(30000); 34 cluster.killTopology("firststorm"); 35 cluster.shutdown(); 36 } 37 } 38 39 }
到這裏一個簡單的storm項目就開發完畢了,而後能夠用本地模式運行,跑起來以後某一時刻輸出結果以下:
接下來咱們將這個項目放到Storm服務器集羣中運行,這裏不要把Storm的jar包加進來,由於運行的時候,Storm環境會自動加載並協調集羣運行,方法有不少,可使用插件上傳,也可使用本地Storm客戶端配置一下numbus.host進行提交,也能夠在服務器節點上執行,執行後nimbus會獲得任務並分發給各個supervisor去執行,首先咱們應該將項目打包,右擊項目,選擇Export
而後導出類型選擇Java下的JAR file,點擊Next
而後單擊Brower肯定輸出位置和文件名或者直接在輸入框輸入jar包的名稱,而後單擊Finish完成打包
打包以後咱們能夠在輸出位置看見一個jar文件
而後咱們將這個文件上傳到服務器,這裏上傳到了storm安裝目錄下,而後這個時候在主節點storm安裝目錄下執行: bin/storm nimbus & 在從節點目錄下分別執行 bin/storm supervisor & 啓動整個集羣的storm服務,也能夠執行 bin/storm ui & 啓動UI管理界面更直觀的看到執行結果,固然對於單機環境啓動或者不啓動storm服務均可以,這個時候,執行下面命令運行本次項目的程序:
bin/storm jar firststorm.jar org.apache.storm.firststorm.FirstStorm
這裏就是調用了FirstStorm類中的main方法,若是程序中對參數進行了處理,後面還能夠跟上參數,回車確認執行以後,系統會進行初始化集羣的工做,幾秒後任務開始執行,執行過程當中某一時刻的滾動輸出以下:
到這裏,第一個Storm入門項目的開發和測試運行都完畢了,更復雜的計算邏輯模式也基本相同,主要就是Maven項目中出現了更復雜的模塊和調用,整個運行的流程其實都是差很少的,如今就算步入Storm流式計算的殿堂的大門了,接下來的精彩還須要慢慢體會