Storm 入門的Demo教程

Storm介紹

Storm是Twitter開源的分佈式實時大數據處理框架,最先開源於github,從0.9.1版本以後,歸於Apache社區,被業界稱爲實時版Hadoop。隨着愈來愈多的場景對Hadoop的MapReduce高延遲沒法容忍,好比網站統計、推薦系統、預警系統、金融系統(高頻交易、股票)等等,大數據實時處理解決方案(流計算)的應用日趨普遍,目前已經是分佈式技術領域最新爆發點,而Storm更是流計算技術中的佼佼者和主流。java

Storm的核心組件

  • Nimbus:即Storm的Master,負責資源分配和任務調度。一個Storm集羣只有一個Nimbus。
  • Supervisor:即Storm的Slave,負責接收Nimbus分配的任務,管理全部Worker,一個Supervisor節點中包含多個Worker進程。
  • Worker:工做進程,每一個工做進程中都有多個Task。
  • Task:任務,在 Storm 集羣中每一個 Spout 和 Bolt 都由若干個任務(tasks)來執行。每一個任務都與一個執行線程相對應。
  • Topology:計算拓撲,Storm 的拓撲是對實時計算應用邏輯的封裝,它的做用與 MapReduce 的任務(Job)很類似,區別在於 MapReduce 的一個 Job 在獲得結果以後總會結束,而拓撲會一直在集羣中運行,直到你手動去終止它。拓撲還能夠理解成由一系列經過數據流(Stream Grouping)相互關聯的 Spout 和 Bolt 組成的的拓撲結構。
  • Stream:數據流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分佈式環境中並行建立、處理的一組元組(tuple)的無界序列。數據流能夠由一種可以表述數據流中元組的域(fields)的模式來定義。
  • Spout:數據源(Spout)是拓撲中數據流的來源。通常 Spout 會從一個外部的數據源讀取元組而後將他們發送到拓撲中。根據需求的不一樣,Spout 既能夠定義爲可靠的數據源,也能夠定義爲不可靠的數據源。一個可靠的 Spout可以在它發送的元組處理失敗時從新發送該元組,以確保全部的元組都能獲得正確的處理;相對應的,不可靠的 Spout 就不會在元組發送以後對元組進行任何其餘的處理。一個 Spout能夠發送多個數據流。
  • Bolt:拓撲中全部的數據處理均是由 Bolt 完成的。經過數據過濾(filtering)、函數處理(functions)、聚合(aggregations)、聯結(joins)、數據庫交互等功能,Bolt 幾乎可以完成任何一種數據處理需求。一個 Bolt 能夠實現簡單的數據流轉換,而更復雜的數據流變換一般須要使用多個 Bolt 並經過多個步驟完成。
  • Stream grouping:爲拓撲中的每一個 Bolt 的肯定輸入數據流是定義一個拓撲的重要環節。數據流分組定義了在 Bolt 的不一樣任務(tasks)中劃分數據流的方式。在 Storm 中有八種內置的數據流分組方式。
  • Reliability:可靠性。Storm 能夠經過拓撲來確保每一個發送的元組都能獲得正確處理。經過跟蹤由 Spout 發出的每一個元組構成的元組樹能夠肯定元組是否已經完成處理。每一個拓撲都有一個「消息延時」參數,若是 Storm 在延時時間內沒有檢測到元組是否處理完成,就會將該元組標記爲處理失敗,並會在稍後從新發送該元組。

Storm程序再Storm集羣中運行的示例圖以下:git

Topology

爲何把Topology單獨提出來呢,由於Topology是咱們開發程序主要的用的組件。
Topology和MapReduce很相像。
MapReduce是Map進行獲取數據,Reduce進行處理數據。
而Topology則是使用Spout獲取數據,Bolt來進行計算。
總的來講就是一個Topology由一個或者多個的Spout和Bolt組成。github

具體流程是怎麼走,能夠經過查看下面這張圖來進行了解。
示例圖:數據庫

注:圖片來源http://www.tianshouzhi.com/api/tutorials/storm/52。apache

圖片有三種模式,解釋以下:
第一種比較簡單,就是由一個Spout獲取數據,而後交給一個Bolt進行處理;
第二種稍微複雜點,由一個Spout獲取數據,而後交給一個Bolt進行處理一部分,而後在交給下一個Bolt進行處理其餘部分。
第三種則比較複雜,一個Spout能夠同時發送數據到多個Bolt,而一個Bolt也能夠接受多個Spout或多個Bolt,最終造成多個數據流。可是這種數據流必須是有方向的,有起點和終點,否則會形成死循環,數據永遠也處理不完。就是Spout發給Bolt1,Bolt1發給Bolt2,Bolt2又發給了Bolt1,最終造成了一個環狀。api

Storm 集羣安裝

以前已經寫過了,這裏就不在說明了。
博客地址:http://www.panchengming.com/2018/01/26/pancm70/數組

Storm Hello World

前面講了一些Storm概念,可能在理解上不太清楚,那麼這裏咱們就用一個Hello World代碼示例來體驗下Storm運做的流程吧。框架

環境準備

在進行代碼開發以前,首先得作好相關的準備。
本項目是使用Maven構建的,使用Storm的版本爲1.1.1。
Maven的相關依賴以下:maven

<!--storm相關jar  -->
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.1</version>
    <scope>provided</scope>
 </dependency>

具體流程

在寫代碼的時候,咱們先來明確要用Storm作什麼。
那麼第一個程序,就簡單的輸出下信息。
具體步驟以下:分佈式

  1. 啓動topology,設置好Spout和Bolt。
  2. 將Spout獲取的數據傳遞給Bolt。
  3. Bolt接受Spout的數據進行打印。

Spout

那麼首先開始編寫Spout類。通常是實現 IRichSpout 或繼承BaseRichSpout該類,而後實現該方法。
這裏咱們繼承BaseRichSpout這個類,該類須要實現這幾個主要的方法:

1、open

open()方法中是在ISpout接口中定義,在Spout組件初始化時被調用。
有三個參數,它們的做用分別是:

  1. Storm配置的Map;
  2. topology中組件的信息;
  3. 發射tuple的方法;

代碼示例:

@Override
    public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
        System.out.println("open:"+map.get("test"));
        this.collector = collector;
    }
2、nextTuple

nextTuple()方法是Spout實現的核心。
也就是主要執行方法,用於輸出信息,經過collector.emit方法發射。

這裏咱們的數據信息已經寫死了,因此這裏咱們就直接將數據進行發送。
這裏設置只發送兩次。
代碼示例:

@Override
    public void nextTuple() {
        if(count<=2){
            System.out.println("第"+count+"次開始發送數據...");
            this.collector.emit(new Values(message));
        }
        count++;
    }
3、declareOutputFields

declareOutputFields是在IComponent接口中定義,用於聲明數據格式。
即輸出的一個Tuple中,包含幾個字段。

由於這裏咱們只發射一個,因此就指定一個。若是是多個,則用逗號隔開。
代碼示例:

@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("定義格式...");
        declarer.declare(new Fields(field));
    }
4、ack

ack是在ISpout接口中定義,用於表示Tuple處理成功。

代碼示例:

@Override
    public void ack(Object obj) {
        System.out.println("ack:"+obj);
    }
5、fail

fail是在ISpout接口中定義,用於表示Tuple處理失敗。

代碼示例:

@Override
    public void fail(Object obj) {
        System.out.println("失敗:"+obj);
    }
6、close

close是在ISpout接口中定義,用於表示Topology中止。

代碼示例:

@Override
    public void close() {
        System.out.println("關閉...");
    }

至於還有其餘的,這裏就不在一一列舉了。

Bolt

Bolt是用於處理數據的組件,主要是由execute方法來進行實現。通常來講須要實現 IRichBolt 或繼承BaseRichBolt該類,而後實現其方法。
須要實現方法以下:

1、prepare

在Bolt啓動前執行,提供Bolt啓動環境配置的入口。
參數基本和Sqout同樣。
通常對於不可序列化的對象進行實例化。
這裏的咱們就簡單的打印下

@Override
    public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
        System.out.println("prepare:"+map.get("test"));
        this.collector=collector;
    }

注:若是是能夠序列化的對象,那麼最好是使用構造函數。

2、execute

execute()方法是Bolt實現的核心。
也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
從tuple中獲取消息可使用 tuple.getString()tuple.getStringByField();這兩個方法。我的推薦第二種,能夠經過field來指定接收的消息。
注:若是繼承的是IRichBolt,則須要手動ack。這裏就不用了,BaseRichBolt會自動幫咱們應答。
代碼示例:

@Override
    public void execute(Tuple tuple) {
//      String msg=tuple.getString(0);
        String msg=tuple.getStringByField("test");
        //這裏咱們就不作消息的處理,只打印
        System.out.println("Bolt第"+count+"接受的消息:"+msg); 
        count++;
        /**
         * 
         * 沒次調用處理一個輸入的tuple,全部的tuple都必須在必定時間內應答。
         * 能夠是ack或者fail。不然,spout就會重發tuple。
         */
//      collector.ack(tuple);
    }
3、declareOutputFields

和Spout的同樣。
由於到了這裏就再也不輸出了,因此就什麼都沒寫。

@Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {        
    }
cleanup

cleanup是IBolt接口中定義,用於釋放bolt佔用的資源。
Storm在終止一個bolt以前會調用這個方法。
由於這裏沒有什麼資源須要釋放,因此就簡單的打印一句就好了。

@Override
    public void cleanup() {
        System.out.println("資源釋放");
    }

Topology

這裏咱們就是用main方法進行提交topology。
不過在提交topology以前,須要進行相應的設置。
這裏我就不一一細說了,代碼的註釋已經很詳細了。
代碼示例:

import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * 
    * Title: App
    * Description:
    * storm測試 
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月6日
     */
    public class App {
        
        private static final String str1="test1"; 
        private static final String str2="test2"; 
    
        public static void main(String[] args)  {
            // TODO Auto-generated method stub
            //定義一個拓撲
            TopologyBuilder builder=new TopologyBuilder();
            //設置一個Executeor(線程),默認一個
            builder.setSpout(str1, new TestSpout());
            //設置一個Executeor(線程),和一個task
            builder.setBolt(str2, new TestBolt(),1).setNumTasks(1).shuffleGrouping(str1);
            Config conf = new Config();
            conf.put("test", "test");
            try{
              //運行拓撲
           if(args !=null&&args.length>0){ //有參數時,表示向集羣提交做業,並把第一個參數當作topology名稱
             System.out.println("遠程模式");
                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
          } else{//沒有參數時,本地提交
        //啓動本地模式
         System.out.println("本地模式");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("111" ,conf,  builder.createTopology() );
        Thread.sleep(10000);
    //  關閉本地集羣
        cluster.shutdown();
          }
            }catch (Exception e){
                e.printStackTrace();
            }   
        }
    }

運行該方法,輸出結果以下:

本地模式
定義格式...
open:test
第1次開始發送數據...
第2次開始發送數據...
prepare:test
Bolt第1接受的消息:這是個測試消息!
Bolt第2接受的消息:這是個測試消息!
資源釋放
關閉...

到這裏,是否是基本上對Storm的運做有些瞭解了呢。
這個demo達到了上述的三種模式圖中的第一種,一個Spout傳輸數據, 一個Bolt處理數據。

那麼若是咱們想達到第二種模式呢,那又該如何作呢?
假如咱們想統計下在一段文本中的單詞出現頻率的話,咱們只需執行一下步驟就能夠了。
1.首先將Spout中的message消息進行更改成數組,並依次將消息發送到TestBolt。
2.而後TestBolt將獲取的數據進行分割,將分割的數據發送到TestBolt2。
3.TestBolt2對數據進行統計,在程序關閉的時候進行打印。
4.Topology成功配置而且啓動以後,等待20秒左右,關閉程序,而後獲得輸出的結果。

代碼示例以下:

Spout
用於發送消息。

import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * 
    * Title: TestSpout
    * Description:
    * 發送信息
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月6日
     */
    public class TestSpout extends BaseRichSpout{
    
        private static final long serialVersionUID = 225243592780939490L;
    
        private SpoutOutputCollector collector;
        private static final String field="word";
        private int count=1;
        private String[] message =  {
    "My nickname is xuwujing",
    "My blog address is http://www.panchengming.com/",
    "My interest is playing games"
    };
        
        /**
     * open()方法中是在ISpout接口中定義,在Spout組件初始化時被調用。
     * 有三個參數:
     * 1.Storm配置的Map;
     * 2.topology中組件的信息;
     * 3.發射tuple的方法;
     */
        @Override
        public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
            System.out.println("open:"+map.get("test"));
            this.collector = collector;
        }
    
    /**
     * nextTuple()方法是Spout實現的核心。
     * 也就是主要執行方法,用於輸出信息,經過collector.emit方法發射。
     */
        @Override
        public void nextTuple() {
                
            if(count<=message.length){
                System.out.println("第"+count +"次開始發送數據...");
                this.collector.emit(new Values(message[count-1]));
            }
            count++;
        }
    
    
        /**
     * declareOutputFields是在IComponent接口中定義,用於聲明數據格式。
     * 即輸出的一個Tuple中,包含幾個字段。
     */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            System.out.println("定義格式...");
            declarer.declare(new Fields(field));
        }
    
        /**
         * 當一個Tuple處理成功時,會調用這個方法
         */
        @Override
        public void ack(Object obj) {
            System.out.println("ack:"+obj);
        }
        
        /**
         * 當Topology中止時,會調用這個方法
         */
        @Override
        public void close() {
            System.out.println("關閉...");
        }
        
        /**
         * 當一個Tuple處理失敗時,會調用這個方法
         */
        @Override
        public void fail(Object obj) {
            System.out.println("失敗:"+obj);
        }
        
    }

TestBolt

用於分割單詞。

import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    
    /**
     * 
    * Title: TestBolt
    * Description: 
    * 對單詞進行分割
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月16日
     */
    public class TestBolt extends BaseRichBolt{
    
        /**
         * 
         */
        private static final long serialVersionUID = 4743224635827696343L;
        
        private OutputCollector collector;
       
        /**
    * 在Bolt啓動前執行,提供Bolt啓動環境配置的入口
    * 通常對於不可序列化的對象進行實例化。
    * 注:若是是能夠序列化的對象,那麼最好是使用構造函數。
    */
        @Override
        public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
            System.out.println("prepare:"+map.get("test"));
            this.collector=collector;
        }
      
        /**
         * execute()方法是Bolt實現的核心。
         * 也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
         */
        @Override
        public void execute(Tuple tuple) {
            String msg=tuple.getStringByField("word");
        System.out.println("開始分割單詞:"+msg);
    String[] words = msg.toLowerCase().split(" ");
    for (String word : words) {
    this.collector.emit(new Values(word));//向下一個bolt發射數據
    } 
        
        }
    
        /**
         * 聲明數據格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("count"));
        }
        
        /**
     * cleanup是IBolt接口中定義,用於釋放bolt佔用的資源。
     * Storm在終止一個bolt以前會調用這個方法。
         */
        @Override
        public void cleanup() {
            System.out.println("TestBolt的資源釋放");
        }
    }

Test2Bolt
用於統計單詞出現次數。

import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * 
    * Title: Test2Bolt
    * Description:
    * 統計單詞出現的次數 
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月16日
     */
    public class Test2Bolt extends BaseRichBolt{
    
        /**
         * 
         */
        private static final long serialVersionUID = 4743224635827696343L;
        
        
        /**
         * 保存單詞和對應的計數
         */
        private HashMap<String, Integer> counts = null;
         
        private long count=1;
        /**
    * 在Bolt啓動前執行,提供Bolt啓動環境配置的入口
    * 通常對於不可序列化的對象進行實例化。
    * 注:若是是能夠序列化的對象,那麼最好是使用構造函數。
    */
        @Override
        public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
            System.out.println("prepare:"+map.get("test"));
            this.counts=new HashMap<String, Integer>();
        }
      
        /**
         * execute()方法是Bolt實現的核心。
         * 也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
         * 
         */
        @Override
        public void execute(Tuple tuple) {
            String msg=tuple.getStringByField("count");
            System.out.println("第"+count+"次統計單詞出現的次數");
            /**
             * 若是不包含該單詞,說明在該map是第一次出現
             * 不然進行加1
             */
            if (!counts.containsKey(msg)) {
                counts.put(msg, 1);
            } else {
                counts.put(msg, counts.get(msg)+1);
            }
            count++;
        }
    
        
        /**
     * cleanup是IBolt接口中定義,用於釋放bolt佔用的資源。
     * Storm在終止一個bolt以前會調用這個方法。
         */
        @Override
        public void cleanup() {
            System.out.println("===========開始顯示單詞數量============");
            for (Map.Entry<String, Integer> entry : counts.entrySet()) {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
            System.out.println("===========結束============");
           System.out.println("Test2Bolt的資源釋放");
        }
        
        /**
         * 聲明數據格式
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            
        }
    }

Topology

主程序入口。

import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    /**
     * 
    * Title: App
    * Description:
    * storm測試 
    * Version:1.0.0  
    * @author pancm
    * @date 2018年3月6日
     */
    public class App {
        
        private static final String test_spout="test_spout"; 
        private static final String test_bolt="test_bolt"; 
        private static final String test2_bolt="test2_bolt"; 
    
        public static void main(String[] args)  {
            //定義一個拓撲
            TopologyBuilder builder=new TopologyBuilder();
            //設置一個Executeor(線程),默認一個
            builder.setSpout(test_spout, new TestSpout(),1);
            //shuffleGrouping:表示是隨機分組
            //設置一個Executeor(線程),和一個task
            builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout);
            //fieldsGrouping:表示是按字段分組
            //設置一個Executeor(線程),和一個task
            builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count"));
            Config conf = new Config();
            conf.put("test", "test");
            try{
              //運行拓撲
           if(args !=null&&args.length>0){ //有參數時,表示向集羣提交做業,並把第一個參數當作topology名稱
             System.out.println("運行遠程模式");
                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
          } else{//沒有參數時,本地提交
        //啓動本地模式
            System.out.println("運行本地模式");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Word-counts" ,conf,  builder.createTopology() );
        Thread.sleep(20000);
    //  //關閉本地集羣
        cluster.shutdown();
          }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

輸出結果:

運行本地模式
定義格式...
open:test
第1次開始發送數據...
第2次開始發送數據...
第3次開始發送數據...
prepare:test
prepare:test
開始分割單詞:My nickname is xuwujing
開始分割單詞:My blog address is http://www.panchengming.com/
開始分割單詞:My interest is playing games
第1次統計單詞出現的次數
第2次統計單詞出現的次數
第3次統計單詞出現的次數
第4次統計單詞出現的次數
第5次統計單詞出現的次數
第6次統計單詞出現的次數
第7次統計單詞出現的次數
第8次統計單詞出現的次數
第9次統計單詞出現的次數
第10次統計單詞出現的次數
第11次統計單詞出現的次數
第12次統計單詞出現的次數
第13次統計單詞出現的次數
第14次統計單詞出現的次數
===========開始顯示單詞數量============
address: 1
interest: 1
nickname: 1
games: 1
is: 3
xuwujing: 1
playing: 1
my: 3
blog: 1
http://www.panchengming.com/: 1
===========結束============
Test2Bolt的資源釋放
TestBolt的資源釋放
關閉...

上述的是本地模式運行,若是想在Storm集羣中進行使用,只須要將程序打包爲jar,而後將程序上傳到storm集羣中,
輸入:

storm jar xxx.jar xxx xxx
說明:第一個xxx是storm程序打包的包名,第二個xxx是運行主程序的路徑,第三個xxx則表示主程序輸入的參數,這個能夠隨意。

若是是使用maven打包的話,則須要在pom.xml加上

<plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>com.pancm.storm.App</mainClass>
              </manifest>
            </archive>
          </configuration>
      </plugin>

成功運行程序以後,能夠在Storm集羣的UI界面查看該程序的狀態。

到此,本文結束,謝謝閱讀! 本篇文章源碼地址: https://github.com/xuwujing/java-study

相關文章
相關標籤/搜索