大數據-Storm

Storm 流式處理框架

Storm是實時的,分佈式,高容錯的計算系統。java+cljourehtml

Storm常駐內存,數據在內存中處理不通過磁盤,數據經過網絡傳輸。java

底層java+cljoure構成,阿里使用java重構Storm構建Jstorm。node

數據處理分類web

  • 流式處理(異步)算法

    • 客戶端提交數據進行結算,不會等待計算結果apache

    • 數據追條處理:數據清洗或分析編程

    • 例:在數據統計分析中:數據存入隊列,storm從MQ獲取數據,storm將結果存入存儲層vim

  • 實時請求應答(同步)api

    • 客戶端提交請求後當即計算並將結果返回給客戶端數組

    • 圖片特徵提取: 客戶端向Storm的drpc發送請求,客戶端等待Storm響應結果

Storm與MR的比較

  • Storm流式處理,毫秒級響應,常駐內存

  • MR批處理TB級數據,分鐘級響應,反覆啓停

優點

高可靠:異常處理;消息可靠性保障機制

可維護性:提供UI監控接口

架構模型

  • Nimbus:接收請求,接收jar包並解析,任務分配,資源分配

  • zookeeper:存儲Nimbus分配的子任務,存儲任務數據,用於解耦架構

  • Supervisor:從zk中獲取子任務,啓動worker任務,啓停所管理的worker

  • Worker:執行具體任務(每一個worker對應一個Topology的子集),分爲spout任務和bolt任務,啓動executor,一個executor對應一個線程,默認一個executor執行一個task任務。任務所需數據從zookeeper中獲取。

storm與MR比對 MR Storm
主節點 ResourceManager Nimbus
從節點 NodeManager Supervisor
應用程序 Job Topology
工做進程 Child Worker
計算模型 Map/Reduce Spout/Bolt

編程模型

DAG有向無環圖,拓撲結構圖,做爲實時計算邏輯的封裝。一旦啓動會一直在系統中運行直到手動kill

Spout 數據源節點

  • 從外部讀取原始數據

  • 定義數據流,可定義多個數據流向不一樣Bolt 發數據,讀取外部數據源

  • nextTuple:被Storm線程不斷調用、主動從數據源拉取數據,再經過emit方法將數據生成Tuple,發送給以後的Bolt計算

Bolt 計算節點

  • execute:該方法接收到一個Tuple數據,真正業務邏輯

  • 定義數據流,將結果數據發出去

API

以wordcount爲例:Spout 獲取原始行數據,將行數據Tuple做爲發給bolt1,bolt1解析行數據爲單詞數據,bolt1將單詞數據轉爲Tuple發給bolt2,bolt2對單詞數據進行計數,將計數結果發送到存儲層。

Topology類

指定計算任務的各節點及節點關聯,提交任務

  • topologyBuilder的setSpout方法和setBolt方法構建各節點

    • 方法參數爲:id值,bolt類,併發度(線程),task數,數據來源節點,數據流,分組字段

    • shuffleGrouping是隨機分配數據

    • 多併發度

      • shuffleGrouping是將數據隨機分配給線程

      • fieldsGrouping是根據指定的若干字段分組到不一樣的線程

  • Config對象傳入配置參數

    • 超時時間 秒:conf.setMessageTimeoutSecs(3);

  • LocalCluster本地執行,對象提交任務名,參數對象,拓撲對象,最終執行任務

public class MyTopology {
   public static void main(String[] args) {
       //建立拓撲構建對象
       TopologyBuilder topologyBuilder=new TopologyBuilder();
       /**
        * 指定各節點,線程數量,上下游關聯
        * 對於多線程,注意上游的結果會隨機發給各個線程
        */
       //設置spout節點,指定(id值,spout類,併發度)
       topologyBuilder.setSpout("s1",new CountSpout(),1);
       //設置bolt節點,指定 (id值,bolt類,併發度,數據來源節點,分組字段)
       topologyBuilder.setBolt("b1",new CountBolt1(),2).setNumTasks(4).shuffleGrouping("s1");
       topologyBuilder.setBolt("b2",new CountBolt2(),1).fieldsGrouping("b1",new Fields("word"));
       //構建拓撲對象
       StormTopology stormTopology=topologyBuilder.createTopology();
       //設置配置信息,該對象實際是個map,向其中設置各類參數
       Config config=new Config();
       config.setNumWorkers(3);
       config.put("xxx","xxx");
       //獲取本地執行集羣對象
       LocalCluster lc=new LocalCluster();
       //本地執行:向集羣提交任務,任務名,參數,拓撲節點
       lc.submitTopology("job-count",config,stormTopology);
  }
}

Spout類(自定義)

繼承BaseRichSpout類,實際實現了IRichSpout接口,主要重寫如下方法

  • open()——初始化,參數爲:配置參數, 上下文,輸出對象

  • nextTuple()——業務代碼

  • declareOutputFields()——定義輸出結果

  • 關於數據輸出

    將SpoutOutputCollector輸出對象轉爲全局變量,經過SpoutOutputCollector輸出數據

    使用Values對象,存入多個結果(至關於一個list)

    經過OutputFieldsDeclarer對象,依次定義每一個結果的字段名(至關於給list字段命名,造成相似map的有序集合)

import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class CountSpout extends BaseRichSpout {
   /** 將輸出對象轉爲全局屬性,支持數據輸出 */
   private SpoutOutputCollector collector;
   //初始化
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
       //將輸出對象轉爲全局屬性
       this.collector=collector;
  }
   //業務代碼:隨機輸出一行字符串
   @Override
   public void nextTuple() {
       int index=random.nextInt(lines.length);
       String line= lines[index];
       
       //輸出數據,構建value時可以設置多個數據,在Fields對象中定義字段名
       collector.emit(new Values(line,line));
  }
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       //定義輸出數據,給每一個字段賦予字段名
       declarer.declare(new Fields("line1""line2"));
  }
}

Bolt類(自定義)

繼承BaseRichBolt類,實際實現了IRichBolt接口,主要重寫如下方法

  • prepare()——初始化

  • execute()——業務代碼

  • declareOutputFields()——定義輸出字段

  • 關於輸出數據,與spout相同

  • 關於輸入數據:經過input的各類方法獲取對應類型的數據,支持經過字段所在索引下標或字段名獲取指定位置的值

public class CountBolt1 extends BaseRichBolt {
/** 將輸出對象轉爲全局屬性,支持數據輸出 */
   private OutputCollector collector;
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
       this.collector=collector;
  }
   @Override
   public void execute(Tuple input) {
       /* 不一樣方法獲取不一樣類型的值,方法中經過索引下標或字段名來取值 */
       //經過索引取值
       String line1=input.getString(0);
       //經過字段名取值
       String line2=input.getStringByField("line1");

       //如下爲業務邏輯,將字符串拆成單詞數組,遍歷輸出
       String[] words=line1.split(" ");
       for (String word:words){
           //遍歷輸出,輸出兩個字段的數據
           collector.emit(new Values(word,word));
      }
  }
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("word","word2"));
  }
}

分組策略

  • Shuffle Grouping:輪詢的方式,隨機派發tuple,每一個線程接收到數量大體相同。

  • Fields Grouping:按指定的字段分組,指定字段的值相同tuple,分配相同的task(處於同一個線程中)

  • 如下方式瞭解

    • All Grouping:廣播發送,全部下游的bolt線程都會收到。仍是task

    • Global Grouping:全局分組, 將tuple分配給 id最低的task,這個id是用戶指定的id值。

    • None Grouping:隨機分組

    • Direct Grouping:指向型分組

    • local or shuffle grouping:若目標bolt task與源bolt task在同一工做進程中,則tuple隨機分配給同進程中的tasks。不然普通的Shuffle分配 Grouping行爲一致

    • customGrouping:自定義

分佈式搭建

環境搭建

  • 安裝

    tar -zxvf apache-storm-0.10.0.tar.gz

    mv apache-storm-0.10.0 storm-0.10.0

  • 修改配置

    vim /opt/sxt/storm-0.10.0/conf/storm.yaml

    #指定zk地址
    storm.zookeeper.servers:
        - "node1"
        - "node2"
        - "node3"
       
    #指定nimbus的節點
    nimbus.host: "node1"

    #指定開放的端口(默認如下4個)
    supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703
  • 拷貝到其餘節點

    cd /opt/sxt 
    scp -r storm-0.10.0/ node3:`pwd`

啓動+命令

啓動集羣

  • 啓動zookeeper集羣

    zkServer.sh start

  • 啓動nimbus節點 cd /opt/sxt/storm-0.10.0

    • 各節點建立logs目錄:mkdir logs

    • 主節點啓動:

      • 啓動nimbus

        nohup ./bin/storm nimbus >> logs/nimbus.out 2>&1 &

      • 啓動監控ui

        nohup ./bin/storm ui >> logs/ui.out 2>&1 &

    • 從節點啓動supervisor

    • nohup ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &

    • 注:項目的日誌也就存放中logs目錄下的各文件中

      能夠遠程查看:例如文件wc-1-1571317142-worker-6700.log

      http://node1:8080/topology.html?wc-1-1571317142-worker-6700.log

  • 網頁登陸設置在主節點上的UI監控地址 http://node1:8080

關閉集羣

#關閉nimbus
kill `ps aux | egrep '(daemon.nimbus)|(storm.ui.core)' | fgrep -v egrep | awk '{print $2}'`
#關閉supervisor
kill `ps aux | fgrep storm | fgrep -v 'fgrep' | awk '{print $2}'`
#關閉zk
zkServer.sh stop

運行項目

從IDE中導出項目jar文件(field標籤 > project structure > 指定主類建立jar配置 > bulid標籤 > build project > 從項目目錄中找到jar文件) 導入到/opt/sxt下

載入項目命令:./bin/storm jar /opt/sxt/wc.jar com.shsxt.storm.wordcount.WordCountTP wc

依次指定jar文件目錄,主類,topology名

中止項目命令:./bin/storm kill wc (注意指定topology)

內部文件結構

  • 本地目錄

    /opt/sxt/storm-0.10.0/storm-local 目錄下存放項目執行時的數據

    內部爲一個目錄樹結構

  • zookeeper目錄

    進入zk客戶端 zkCli.sh

    查看storm的目錄 ls /storm 內部時一個目錄樹

    查看指定的數據 get /xxx/xxx

併發機制

  • worker進程

    • 具體執行任務的進程

    • 一個topology包含若干個worker執行,worker執行topology的子集

    • 一個worker須要綁定一個網絡端口,默認一個節點最多運行4個worker,經過配置多於4個端口開放更多worker

  • Executor線程

    • execute是由worker進程建立的線程,worker包含至少一個executor線程

    • 一個線程中可以執行一個或多個task任務,默認一個線程執行一個task

    • spout / boltk 與線程是一對多的關係,一個spout 能夠包含多個線程,但線程只對應一個spout,監控線程除外。

  • task

    • 數據處理的最小單元

    • 每一個task對應spout或bolt的一個併發度

關於再平衡熱rebalance

  • 在topology生命週期中,worker與executor的數量可變,task的數量不可變

  • 多個線程沒法同時執行一個task,只能一個線程執行一個task,所以線程數應該小於task數,設置多的線程是無效的。

  • 注意:系統對每一個worker都會設置一個監控線程,監控線程數量與worker相同

  • 命令修改

    ./bin/storm/rebalance wc -n 5 -e blue-spout=3 -e yellow-bolt=10

    (調整wc任務 5個worker 3個spout線程 10個bolt線程 )

  • api設置

    • worker數

      經過配置文件制定worker數量

      config.setNumWorkers(3);

    • executor數與task數量

      線程數設置 setBolt(String id, IRichBolt bolt, Number 線程數);和setSpout()方法

      設置任務數 setNumTasks(task數)

      topologyBuilder.setBolt("wcSplit",new WordCountSplit(),2).setNumTasks(4).shuffleGrouping("wcSpout");

數據傳輸

  • worker之間數據傳輸使用Netty架構

    • 接收線程監聽tcp端口,並將數據存入緩衝區域

    • 發送線程使用傳輸隊列發送數據

  • worker的內部通信採用Disruptor的隊列處理機制

    數據從緩衝區讀取通過:接收隊列——工做線程——發送隊列——發送線程

  • tuple 在Storm中的數據的傳輸單元

  • Stream 數據連通的管道,多個管道時須要指定Stream的id

線程數包括:spout線程數,blot線程數,acker(每一個work1個監控線程)

容錯機制

宕機機制

  • nimbus宕機

    • 無狀態策略,全部的狀態信息都存放在Zookeeper中來管理,worker正常運行,只是沒法提交新的topology

    • 快速失敗策略:自檢到異常後自毀並重啓

  • supervisor宕機

    • 無狀態策略,supervisor的全部的狀態信息都存放在Zookeeper中來管理

    • 快速失敗策略:自檢到異常後自毀並重啓

    • 節點故障狀態下,若全部該節點上的任務超時,nimbus會將該節點的task分配給其餘的節點

  • worker宕機

    • supervisor會重啓worker進程

    • 若worker屢次失敗且沒法向nimbus發送心跳,nimbus會將該worker的分配到其餘supervisor上執行。

數據跟蹤acker

消息完整性

spout中發出的tuple及其由tuple產生的子tuple,構成一個樹形結構,只有當數中的全部消息都被正確處理,才能保證數據的完整性。

爲了保證數據的完整性,對於處理失敗的tuple須要請求上游的spout重發數據。

消息重發的機制:at least(至少一次),oncely (有且只有一次)

acker機制

acker跟蹤每一個spout發出的tuple。

acker記錄每一個trulp的id,數據處理後,經過亦或算法清空id,若id未清空則代表這條數據沒有處理完成。

  • 自定義acker類,對數據進行監控

    • 在topology中添加具有acker功能的spout做爲acker類

      topologyBuilder.setSpout("myack",new MyAckSpout());

    • 定義acker類

      • 實現IRichSpout接口,重寫ack(成功)和fail(失敗)方法,對發出的數據進程緩存

      • 設置全局緩存,數據發送時存入緩存,處理成功刪除緩存,處理失敗,從緩存取數據重發

      public class MyAckSpout implements IRichSpout {
         SpoutOutputCollector collector;
         //數據緩存
         Map<Object,String> map =  new HashMap<>();
         //設置全局緩存id
         long id = 0;
         @Override
         public void nextTuple() {
             //發送數據,並存入消息id
             collector.emit(new Values("你好"),id);
             //數據存入緩存
             map.put(id,line);
             //索引自增處理
             id++; }

         //數據完整處理調用ack方法,參數爲消息id索引
         @Override
         public void ack(Object msgId) {
             //將緩存中的數據清除
             map.remove(msgId); }

         //數據沒有完整處理調用fail方法,參數爲消息id索引
         @Override
         public void fail(Object msgId) {
             //重發消息
             collector.emit(new Values(map.get(msgId)),msgId);
        }
      //省略如下重寫方法open,declareOutputFields,getComponentConfiguration,close,activate,deactivate
      }
  • bolt 中代碼響應,execute方法中

    • 方法末尾添加數據處理的響應

      處理成功:collection.ack(input)

      處理失敗:collection.fail(input) 注意:超時也數據失敗

    • 向下遊發送數據時,指定acker是否繼續跟蹤下游數據

      跟蹤:collector.emit(input,new Values(split[i]));

      不跟蹤:collector.emit(new Values(split[i]));

      public void execute(Tuple input) {
         String xxx = input.getStringByField("aaa");
         //結束跟蹤
         collector.emit(new Values(xxx));
         //繼續collector.emit(new Values(xxx));
      //處理成功
         collector.ack(input);
         //處理失敗,collector.fail(input);
      }

實時計算整合

採集層(flume):實現日誌收集,使用負載均衡策略

消息隊列(kafka):做用是解耦及不一樣速度系統緩衝

實時處理單元(Storm):用Storm來進行數據處理,最終數據流入DB中

展現單元(DB+web):數據可視化,使用WEB框架展現

整合kafka

相關文章
相關標籤/搜索