基於Maven構建開發第一個Storm項目

  前面說過了Storm的測試項目,那麼此時咱們更想本身寫一個小項目來練練手,首先咱們本身的Windows系統上首先應該安裝好maven,而後啓動Eclipse for JavaEE版本,接下來開始創建項目並開發java

  注意,在開發過程當中,不管是Windows仍是Linux都要徹底關閉防火牆,避免網絡的問題apache

  單擊"File"->"New"->"Maven Project"數組

  

  接下來的界面默認便可,單擊Next服務器

  

  下一步,繼續單擊Next便可網絡

  

  而後,在Group Id輸入:org.apache.storm 在Artifact Id輸入:firststorm 這裏能夠本身定義,在Version中輸入版本號:0.9.6,這裏其實默認0.1.0沒有問題,這個和storm的版本號沒有任何關係,這裏是咱們項目的版本號,由於只是測試,輸入0.9.6是爲了更簡單;Package包名會自動根據輸入生成,咱們默認便可,而後單擊Finish,稍等右下角滾動條滾動完畢,一個基本的Maven項目就創建成功了,具體結構和上一個測試案例相同,這時在包org.apache.storm.firststorm下有一個默認的類App.java,由Maven自動生成,這個能夠忽略,也能夠刪除dom

  

  而後打開項目根目錄下的pom.xml文件,這個就是構建項目的配置文件,咱們在dependencies標籤之間,添加一個節點,代碼以下:maven

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.6</version>
    <scope>provided</scope>
</dependency>

  加入位置以下圖所示,其餘的不用動便可ide

  

  最終pom.xml的代碼以下:oop

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4 
 5   <groupId>org.apache.storm</groupId>
 6   <artifactId>firststorm</artifactId>
 7   <version>0.9.6</version>
 8   <packaging>jar</packaging>
 9   
10 
11   <name>firststorm</name>
12   <url>http://maven.apache.org</url>
13 
14   <properties>
15     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16   </properties>
17 
18   <dependencies>
19     <dependency>
20       <groupId>junit</groupId>
21       <artifactId>junit</artifactId>
22       <version>3.8.1</version>
23       <scope>test</scope>
24     </dependency>
25     
26     <dependency>
27       <groupId>org.apache.storm</groupId>
28       <artifactId>storm-core</artifactId>
29       <version>0.9.6</version>
30       <scope>provided</scope>
31     </dependency>
32   </dependencies>
33 </project>

  更簡單的方法咱們能夠直接複製上一個案例中的pom.xml文件直接使用,如今咱們保存pom.xml文件,保存的時候maven會自動下載相關依賴並放到Maven Dependencies下,這些jar包能夠點擊下拉查看,而且會自動添加到項目classpath中,做爲編譯使用,等jar包所有下載完畢,如今開始編寫具體的計算邏輯了,在這個項目中咱們把全部的類都創建在包org.apache.storm.firststorm下測試

  首先創建RandomSpout類做爲數據源,而且繼承於父類BaseRichSpout,肯定後能夠看到系統自動補全3個方法:nextTuple,open和declareOutputFields

  

  咱們如今就須要重寫這3個方法,open方法是數據源的初始化,nextTuple的做用是把Tuple發送至下游,declareOutputFields用來定義輸出字段,下面咱們手動分配一個數組,而且隨機取裏面的元素,代碼以下:

 1 package org.apache.storm.firststorm;
 2 
 3 import java.util.Map;
 4 import java.util.Random;
 5 
 6 import backtype.storm.spout.SpoutOutputCollector;
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.topology.base.BaseRichSpout;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Values;
12 
13 public class RandomSpout extends BaseRichSpout {
14     
15     private SpoutOutputCollector collector;
16     private static String[] words = {"Hadoop","Storm","Apache","Linux","Nginx","Tomcat","Spark"};
17     
18 
19     public void nextTuple() {
20         String word = words[new Random().nextInt(words.length)];
21         collector.emit(new Values(word));
22 
23     }
24 
25     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
26         this.collector = arg2;
27     }
28 
29     public void declareOutputFields(OutputFieldsDeclarer arg0) {
30         arg0.declare(new Fields("randomstring"));
31     }
32 
33 }

  代碼很簡單,確定能夠看懂,而後新建一個類SenqueceBolt,繼承於BaseBasicBolt類,而且重寫方法execute和declareOutputFields,這個類就是用於執行具體的做業,準確的說是execute方法用來執行相關的計算,這裏只是簡單的輸出,代碼以下:

 1 package org.apache.storm.firststorm;
 2 
 3 import backtype.storm.topology.BasicOutputCollector;
 4 import backtype.storm.topology.OutputFieldsDeclarer;
 5 import backtype.storm.topology.base.BaseBasicBolt;
 6 import backtype.storm.tuple.Tuple;
 7 
 8 public class SenqueceBolt extends BaseBasicBolt {
 9 
10     public void execute(Tuple arg0, BasicOutputCollector arg1) {
11         String word = (String) arg0.getValue(0);
12         String out = "Hello " + word + "!";
13         System.out.println(out);
14     }
15 
16     public void declareOutputFields(OutputFieldsDeclarer arg0) {
17         
18     }
19 
20 }

  最後創建一個類FirstStorm,這個類是主類,在main方法中定義Topology,而且綜合設置Spout和Bolt,從而調用其中的方法,這裏流式計算時間設置爲30s,代碼以下:

 1 package org.apache.storm.firststorm;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.topology.TopologyBuilder;
 9 import backtype.storm.utils.Utils;
10 
11 public class FirstStorm {
12 
13     public static void main(String[] args) {
14         TopologyBuilder builder = new TopologyBuilder();
15         builder.setSpout("spout", new RandomSpout());
16         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
17         Config conf = new Config();
18         conf.setDebug(false);
19         if(args != null && args.length > 0) {
20             conf.setNumWorkers(3);
21             try {
22                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
23             } catch (AlreadyAliveException e) {
24                 // TODO Auto-generated catch block
25                 e.printStackTrace();
26             } catch (InvalidTopologyException e) {
27                 // TODO Auto-generated catch block
28                 e.printStackTrace();
29             }
30         } else {
31             LocalCluster cluster = new LocalCluster();
32             cluster.submitTopology("firststorm", conf, builder.createTopology());
33             Utils.sleep(30000);
34             cluster.killTopology("firststorm");
35             cluster.shutdown();
36         }
37     }
38 
39 }

  到這裏一個簡單的storm項目就開發完畢了,而後能夠用本地模式運行,跑起來以後某一時刻輸出結果以下:

  

  接下來咱們將這個項目放到Storm服務器集羣中運行,這裏不要把Storm的jar包加進來,由於運行的時候,Storm環境會自動加載並協調集羣運行,方法有不少,可使用插件上傳,也可使用本地Storm客戶端配置一下numbus.host進行提交,也能夠在服務器節點上執行,執行後nimbus會獲得任務並分發給各個supervisor去執行,首先咱們應該將項目打包,右擊項目,選擇Export

  

  而後導出類型選擇Java下的JAR file,點擊Next

  

  而後單擊Brower肯定輸出位置和文件名或者直接在輸入框輸入jar包的名稱,而後單擊Finish完成打包

  

  打包以後咱們能夠在輸出位置看見一個jar文件

  

  而後咱們將這個文件上傳到服務器,這裏上傳到了storm安裝目錄下,而後這個時候在主節點storm安裝目錄下執行: bin/storm nimbus & 在從節點目錄下分別執行 bin/storm supervisor & 啓動整個集羣的storm服務,也能夠執行 bin/storm ui & 啓動UI管理界面更直觀的看到執行結果,固然對於單機環境啓動或者不啓動storm服務均可以,這個時候,執行下面命令運行本次項目的程序:

bin/storm jar firststorm.jar org.apache.storm.firststorm.FirstStorm

  這裏就是調用了FirstStorm類中的main方法,若是程序中對參數進行了處理,後面還能夠跟上參數,回車確認執行以後,系統會進行初始化集羣的工做,幾秒後任務開始執行,執行過程當中某一時刻的滾動輸出以下:

  

  到這裏,第一個Storm入門項目的開發和測試運行都完畢了,更復雜的計算邏輯模式也基本相同,主要就是Maven項目中出現了更復雜的模塊和調用,整個運行的流程其實都是差很少的,如今就算步入Storm流式計算的殿堂的大門了,接下來的精彩還須要慢慢體會

相關文章
相關標籤/搜索