大數據系列——Storm安裝和API

1. 實時計算

  • 有別於傳統的離線批處理操做(對不少數據的集合進行的操做)
  • 實時處理,說白就是針對一條一條的數據/記錄進行操做
  • 實時計算計算的是無界數據

2. 有界數據和無界數據

2.1 有界數據

  • 離線計算面臨的操做數據都是有界限的,不管是1G、1T、1P、1EB、1NB
  • 數據的有界必然會致使計算的有界

2.2 無界數據

  • 實時計算面臨的操做數據是源源不斷的向水流同樣,是沒有界限的
  • 數據的無界必然致使計算的無界

3. 計算中心和計算引擎

在大數據領域中存在三大計算中心三大計算引擎java

3.1 三大計算中心

  • 離線計算計算中心(mapreduce)
  • 實時計算中心(storm flink...)
  • 準實時計算中心(spark)

3.2 三大計算引擎

  • 交互式查詢計算引擎(hive sparksql)
  • 圖計算計算引擎
  • 機器學習計算引擎

4. Storm簡介

  • 免費 開源 分佈式 實時計算系統
  • 處理無界的數據流
  • Tiwtter開源的cloujre
  • Storm能實現高頻數據和大規模數據的實時處理
  • 官網資料顯示storm的一個節點1秒鐘可以處理100萬個100字節的消息(IntelE5645@2.4Ghz的CPU,24GB的內存)
  • storm是毫秒級的實時處理框架

Apache Storm是Twitter開源的一個相似於Hadoop的實時數據處理框架,它原來是由BackType開發,後BackType被Twitter收購,將Storm做爲Twitter的實時數據分析系統。sql

5. hadoop與storm的計算

  • 數據來源shell

    • hadoop數據庫

      • HADOOP處理的是HDFS上TB級別的數據(歷史數據)
    • stormapache

      • STORM是處理的是實時新增的某一筆數據(實時數據)
  • 處理過程安全

    • hadoopbash

      • HADOOP是分MAP階段到REDUCE階段
      • HADOOP最後是要結束的
    • storm架構

      • STORM是由用戶定義處理流程,流程中能夠包含多個步驟,每一個步驟能夠是數據源(SPOUT)或處理邏輯(BOLT)
      • STORM是沒有結束狀態,到最後一步時,就停在那,直到有新數據進入時再從頭開始
  • 處理速度框架

    • hadoopssh

      • HADOOP是以處理HDFS上TB級別數據爲目的,處理速度慢
    • storm

      • STORM是隻要處理新增的某一筆數據便可,能夠作到很快 (毫秒級的響應)
  • 適用場景

    • HADOOP是在要處理批量數據時用的 ,不講究時效性
    • STORM是要處理某一新增數據時用的,要講時效性

6. Storm的架構

  • Spout

    • Storm認爲每一個stream都有一個stream源,也就是原始元組的源頭,因此它將這個源頭稱爲Spout
    • 消息源,是消息生產者,他會從一個外部源讀取數據並向topology裏面面發出消息
  • Bolt

    • 消息處理者,全部的消息處理邏輯被封裝在bolts裏面,處理輸入的數據流併產生新的輸出數據流,可執行過濾,聚合,查詢數據庫等操做
  • 數據流
  • Task 每個Spout和Bolt會被看成不少task在整個集羣裏面執行,每個task對應到一個線程.
  • Stream groupings: 消息分發策略,定義一個Topology的其中一步是定義每一個tuple接受什麼樣的流做爲輸入,stream grouping就是用來定義一個stream應該如何分配給Bolts們.

7. Storm集羣的安裝

  • 準備安裝文件

    ​ apache-storm-1.0.2.tar.gz

  • 解壓
[root@uplooking01 /soft]
    tar -zxvf apache-storm-1.0.2.tar.gz -C /opt
    mv apache-storm-1.0.2/ storm
  • 配置storm

storm-env.sh

[root@uplooking01 /soft]
    export JAVA_HOME=/opt/jdk
    export STORM_CONF_DIR="/opt/storm/conf"

storm.yaml

[root@uplooking01 /opt/storm/conf]


storm.zookeeper.servers:
  - "uplooking03"
  - "uplooking04"
  - "uplooking05"

#配置兩個主節點,實現主節點的單點故障
nimbus.seeds: ["uplooking01", "uplooking02"]
storm.local.dir: "/opt/storm/storm-local"
#配置從節點的槽數
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703
  • 分發到其餘節點
[root@uplooking01 /]
    scp -r /opt/storm uplooking02:/opt
    scp -r /opt/storm uplooking03:/opt
    scp -r /opt/storm uplooking04:/opt
    scp -r /opt/storm uplooking05:/opt
  • 啓動storm
[root@uplooking01 /]  
    #啓動主進程和ui進程
    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /]
    #啓動主進程(numbus)
    nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#啓動從節點進程(supervisor)
[root@uplooking03 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking04 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking05 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &

8. Storm集羣的啓動腳本

#!/bin/bash
#啓動nimbus

for nimbusHost in  `cat /opt/shell/nimbus.host`
do
#-T 進制分配僞終端 通常自動化腳本不須要分配僞終端
ssh -T  root@${nimbusHost}    << eeooff
    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
eeooff
done

#啓動supervisor
for supervisorHost in  `cat /opt/shell/supervisor.host`
do
#-T 進制分配僞終端 通常自動化腳本不須要分配僞終端
ssh -T  root@${supervisorHost}    << eeooff
        nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
eeooff
done


#啓動logviewer
for logviewerHost in  `cat /opt/shell/logviewer.host`
do
#-T 進制分配僞終端 通常自動化腳本不須要分配僞終端
ssh -T  root@${logviewerHost}    << eeooff
        nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
eeooff
done


#啓動ui
for uiHost in  `cat /opt/shell/ui.host`
do
#-T 進制分配僞終端 通常自動化腳本不須要分配僞終端
ssh -T  root@${uiHost}    << eeooff
        nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
eeooff
done

9. Storm實現數字累加

  • 編寫Spout
public class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    //初始化累加的數字
    int num = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        collector.emit(new Values(num));
        num++;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("mynum"));
    }
}
  • 編寫Bolt
public class MyBolt extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple tuple) {
        Integer num = tuple.getIntegerByField("mynum");
        System.out.println(num);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
  • 編寫Topology
public class MyTopology {
    public static void main(String[] args) {
        //建立自定義的spout
        MySpout mySpout = new MySpout();
        //建立自定義的bolt
        MyBolt myBolt = new MyBolt();
        //建立topology名稱
        String topologyName = "MyNumTopology";
        //建立topology的配置對象
        Map conf = new Config();

        //建立topology的構造器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //爲topology設置spout和bolt
        topologyBuilder.setSpout("myspout", mySpout);
        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");

        //建立本地的topology提交器
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, conf, stormTopology);
    }
}

10. 多個Bolt的問題

  • 定義下一個Bolt
public class MyBolt02 extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple tuple) {
        System.out.println(tuple.getIntegerByField("mynum02") + ".....");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
  • 第一個Bolt中給第二個Bolt發射數據
public class MyBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        Integer num = tuple.getIntegerByField("mynum");
        System.out.println(num);
        collector.emit(new Values(num));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("mynum02"));
    }
}
  • 在Topology中配置第二個Bolt
public class MyTopology {
    public static void main(String[] args) {
        //建立自定義的spout
        MySpout mySpout = new MySpout();
        //建立自定義的bolt
        MyBolt myBolt = new MyBolt();

        MyBolt02 myBolt02 = new MyBolt02();
        //建立topology名稱
        String topologyName = "MyNumTopology";
        //建立topology的配置對象
        Map conf = new Config();

        //建立topology的構造器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //爲topology設置spout和bolt
        topologyBuilder.setSpout("myspout", mySpout);
        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");
        topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt");

        //建立本地的topology提交器
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, conf, stormTopology);
    }
}

11. 提交做業到集羣

StormSubmitter.submitTopology(topologyName, conf, stormTopology);

12. Storm的並行度

在storm中的並行度說的就是一個進程的運行須要多少個線程來參與,若是storm運行的線程個數+1,則並行度+1

Worker :

  • worker是一個進程級別的概念,能夠經過jps查看的到
  • worker是一個Topology實例的子集,也就是說一個Topology的實例在supervisor中運行,能夠在一個或者多個supervisor中啓動一個或者多個worker進程
  • 一個worker進程只能爲一個Topology實例服務
  • 因此Topology和worker的關係===>1:N
  • 進程是由多個線程來組成,這裏的線程就是Executor
  • conf.setNumWorkers(int workers)
  • 因此worker和executor的關係===>1:N
  • 每個executor線程具體幹活是由一個個task任務的實例來完成的
  • 在builer.setSpout/setBolt的第三個參數設置
  • Task真正在topology幹活的實例,一個executor線程,默認狀況下對應了1個task的實例的
  • Executor和Task的關係===>1:N
  • builder.setSpout().setNumTasks(tasks)//設置的是spout對應的executor擁有幾個task實例builder.setBolt().setNumTasks(tasks)//設置的是bolt對應的executor擁有幾個task實例

13. Storm中的消息確認機制

  • 在spout中若是發送消息時指定messageId則表明開啓消息確認機制,若是不指定messageID則表明不開啓消息確認機制
  • 若是Spout中開啓了消息確認機制則在bolt中須要用ack()方法來確認消息接收成功
  • 在Soput中重寫響應的fail()和ack()方法來處理消息成功或者失敗的回調邏輯
  • Storm默認若是不確認消息接收成功則30s以後返回消息失敗
  • 消息確認機制要慎重使用(效率換取安全)
相關文章
相關標籤/搜索