Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

[TOC]html


Storm分佈式集羣安裝部署

概述

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

Storm集羣表面相似Hadoop集羣。但在Hadoop上你運行的是」MapReduce jobs」,在Storm上你運行的是」topologies」。」Jobs」和」topologies」是大不一樣的,一個關鍵不一樣是一個MapReduce的Job最終會結束,而一個topology永遠處理消息(或直到你kill它)。java

Storm集羣有兩種節點:控制(master)節點和工做者(worker)節點。控制節點運行一個稱之爲」Nimbus」的後臺程序,它相似於Haddop的」JobTracker」。Nimbus負責在集羣範圍內分發代碼、爲worker分配任務和故障監測。apache

注意:Hadoop 2.0之前使用JobTrack來進行Job的分發,但2.x以後就使用了全新的資源調度框架,即yarn,這點尤爲須要注意。api

每一個工做者節點運行一個稱之」Supervisor」的後臺程序。Supervisor監聽分配給它所在機器的工做,基於Nimbus分配給它的事情來決定啓動或中止工做者進程。每一個工做者進程執行一個topology的子集(也就是一個子拓撲結構);一個運行中的topology由許多跨多個機器的工做者進程組成。bash

一個Zookeeper集羣負責Nimbus和多個Supervisor之間的全部協調工做(一個完整的拓撲可能被分爲多個子拓撲並由多個supervisor完成)。app

此外,Nimbus後臺程序和Supervisor後臺程序都是快速失敗(fail-fast)和無狀態的;全部狀態維持在Zookeeper或本地磁盤。這意味着你能夠kill -9殺掉nimbus進程和supervisor進程,而後重啓,它們將恢復狀態並繼續工做,就像什麼也沒發生。這種設計使storm極其穩定。這種設計中Master並無直接和worker通訊,而是藉助一箇中介Zookeeper,這樣一來能夠分離master和worker的依賴,將狀態信息存放在zookeeper集羣內以快速恢復任何失敗的一方。框架

集羣安裝

能夠參考官方文檔:http://storm.apache.org/releases/1.0.6/Setting-up-a-Storm-cluster.htmlmaven

官方文檔對於配置中的解釋是很是清晰明瞭和容易理解的。分佈式

下載地址:https://storm.apache.org/downloads.html
須要確保已經安裝好了zookeeper環境,在個人環境中已經搭建好了zookeeper集羣環境。

1.解壓
[uplooking@uplooking01 soft]$ tar -zxvf apache-storm-1.0.2.tar.gz -C ../app/
[uplooking@uplooking01 app]$ mv apache-storm-1.0.2/ storm

2.修改配置文件
# storm-env.sh
export JAVA_HOME=/opt/jdk
export STORM_CONF_DIR="/home/uplooking/app/storm/conf"

# storm.yaml
storm.zookeeper.servers:
    - "uplooking01"
    - "uplooking02"
    - "uplooking03"

nimbus.seeds: ["uplooking01", "uplooking02"]

storm.local.dir: "/home/uplooking/data/storm"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

3.建立storm.local.dir
mkdir -p /home/uplooing/data/storm

4.配置環境變量
# .bash_profile
export STORM_HOME=/home/uplooking/app/storm
export PATH=$PATH:$STORM_HOME/bin
# 將其同步到其它節點
scp .bash_profile uplooking@uplooking02:/home/uplooking
scp .bash_profile uplooking@uplooking03:/home/uplooking

5.複製storm安裝目錄到其它節點
scp -r storm/ uplooking@uplooking02:/home/uplooking/app
scp -r storm/ uplooking@uplooking03:/home/uplooking/app

6.啓動storm集羣
# uplooking01
storm nimbus &
storm ui &

# uplooking02
storm nimbus &
storm supervisor &

# uplooking03
storm supervisor &

7.啓動logviewer(可選)
在全部從節點執行"nohup bin/storm logviewer >/dev/null 2>&1 &"啓動log後臺程序,並放到後臺執行。
(nimbus節點能夠不用啓動logviewer進程,由於logviewer進程主要是爲了方便查看任務的執行日誌,這些執行日誌都在supervisor節點上)。

由於啓動了storm ui,在地址欄中輸入:http://uplooking01:8080就能夠查看storm集羣的相關信息ide

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

同時查看其顯示的信息,對於咱們前面的配置也有一個十分直觀的體現。

提交Topology做業到集羣

Topology開發與打包

使用前面的計算總和的例子:

package cn.xpleaf.bigdata.storm.remote;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,經過一個main中的job將兩者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些不必的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 咱們稱這爲適配器模式
 */
public class StormSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 若是當前bolt爲最後一個處理單元,該方法能夠不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,至關於在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt())
                .shuffleGrouping("id_order_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = StormSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但自己封裝了一些基本的配置

        // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集羣模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

能夠看到區別在於後面做業的提供方式,使用集羣的方式爲:StormSubmitter.submitTopology(topologyName, config, topology);

這裏使用Maven的方式進行打包,確保pom.xml中已經配置了storm-core依賴的可見範圍和相關的打包插件:

<!--依賴-->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
    <!--可見範圍爲provided時,打包時不會對依賴進行打包,但在本地測試開發時應該註釋掉,不然程序沒法運行-->
    <!--另外不須要打包storm的依賴是由於,集羣中已經有storm的相關依賴jar包了-->
    <scope>provided</scope>
</dependency>

<!--打包插件-->
<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <!-- 將依賴也一塊兒打包 -->
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
            <manifest>
                <!-- 能夠在這裏指定運行的主類,這樣在打包爲jar包後就能夠不用指定須要運行的類 -->
                <mainClass>

                </mainClass>
            </manifest>
        </archive>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

在idea中配置maven打包的命令:

clean package -DskipTests

以後就能夠打包並上傳到咱們的集羣環境中了。

提交做業

[uplooking@uplooking01 storm]$ cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
-bash: cn.xpleaf.bigdata.storm.remote.StormSumTopology: command not found
[uplooking@uplooking01 storm]$ storm jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
Running: /opt/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/uplooking/app/storm -Dstorm.log.dir=/home/uplooking/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/uplooking/app/storm/lib/log4j-over-slf4j-1.6.6.jar:/home/uplooking/app/storm/lib/reflectasm-1.10.1.jar:/home/uplooking/app/storm/lib/disruptor-3.3.2.jar:/home/uplooking/app/storm/lib/clojure-1.7.0.jar:/home/uplooking/app/storm/lib/objenesis-2.1.jar:/home/uplooking/app/storm/lib/log4j-slf4j-impl-2.1.jar:/home/uplooking/app/storm/lib/slf4j-api-1.7.7.jar:/home/uplooking/app/storm/lib/log4j-core-2.1.jar:/home/uplooking/app/storm/lib/storm-core-1.0.2.jar:/home/uplooking/app/storm/lib/storm-rename-hack-1.0.2.jar:/home/uplooking/app/storm/lib/kryo-3.0.3.jar:/home/uplooking/app/storm/lib/asm-5.0.3.jar:/home/uplooking/app/storm/lib/log4j-api-2.1.jar:/home/uplooking/app/storm/lib/servlet-api-2.5.jar:/home/uplooking/app/storm/lib/minlog-1.3.0.jar:storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/uplooking/app/storm/conf:/home/uplooking/app/storm/bin -Dstorm.jar=storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
842  [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8973061592627522790:-5130577098800003128
934  [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
1036 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
1064 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
1064 [main] INFO  o.a.s.StormSubmitter - Submitting topology StormSumTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8973061592627522790:-5130577098800003128"}
1710 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: StormSumTopology

注意看輸出,jar包被上傳到/home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar,後面能夠在leader節點中查看到有該jar包:

[uplooking@uplooking02 inbox]$ pwd
/home/uplooking/data/storm/nimbus/inbox
[uplooking@uplooking02 inbox]$ ls
stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar

由於此時uplooking01節點不是leader,因此在其上面是沒有該jar包的,這點須要注意。

概念驗證

能夠在storm ui中查看此時的集羣狀態信息:

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

再查看詳細的Topology信息:

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

再查看spout或者bolt的詳細信息:

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

能夠看到是在uplooking02上運行的Executors,此時能夠到該節點上查看輸出信息:

[uplooking@uplooking02 6700]$ pwd
/home/uplooking/app/storm/logs/workers-artifacts/StormSumTopology-1-1523548000/6700
[uplooking@uplooking02 6700]$ tail -5 worker.log
2018-04-13 00:39:56.636 STDIO [INFO] 商城網站到目前20180413003956的商品總交易額5054610
2018-04-13 00:39:57.636 STDIO [INFO] 當前時間20180413003957產生的訂單金額:3181
2018-04-13 00:39:57.637 STDIO [INFO] 商城網站到目前20180413003957的商品總交易額5057790
2018-04-13 00:39:58.638 STDIO [INFO] 當前時間20180413003958產生的訂單金額:3182
2018-04-13 00:39:58.639 STDIO [INFO] 商城網站到目前20180413003958的商品總交易額5060971

須要注意的是,此時在uplooking03上是沒有這些信息的,由於集羣將做業交給了uplooking02上的supervisor來運行。此外還須要知道的是,在uplooking02的data目錄下也能夠查看到有前面的jar包,其是由nimbus分發過來的:

[uplooking@uplooking02 StormSumTopology-1-1523548000]$ pwd
/home/uplooking/data/storm/supervisor/stormdist/StormSumTopology-1-1523548000
[uplooking@uplooking02 StormSumTopology-1-1523548000]$ ls
stormcode.ser  stormconf.ser  stormjar.jar

可是在uplooking03上也是沒有的。

另外也能夠在uplooking02上使用jps命令查看到有worker進程:

[uplooking@uplooking02 ~]$ jps
2224 QuorumPeerMain
1858 Elasticsearch
27427 logviewer
2291 NameNode
27972 LogWriter
27988 worker
25878 nimbus
28006 Jps
26054 supervisor
2552 DFSZKFailoverController
2365 DataNode
2462 JournalNode

對於輸出信息的查看,其實也能夠在storm ui上直接進行查看,上面的界面,點擊6700的連接,就能夠進行查看,可是前提是須要先在uplooking02上運行了logviewer

storm logviewer &

查看到的輸出以下:

Storm筆記整理(三):Storm集羣安裝部署與Topology做業提交

集羣健壯性驗證

由前面能夠知道,目前worker運行在uplooking02上,若是在此節點上直接將該進程kill掉,那麼其又會自動進行重啓:

[uplooking@uplooking02 ~]$ jps | grep worker
27988 worker
[uplooking@uplooking02 ~]$ kill -9 27988
[uplooking@uplooking02 ~]$ jps | grep worker
kill 27988: 沒有那個進程
[uplooking@uplooking02 ~]$ kill 27988: 沒有那個進程

[uplooking@uplooking02 ~]$ jps | grep worker
28235 worker

固然若是真的但願停掉Topology做業,有兩種方式:

第一種是在storm ui的topology界面中進行操做:
    Topology actions中有Kill的操做,點擊便可

第二種是在命令行中使用命令進行操做:
    [uplooking@uplooking01 ~]$ storm kill
    Syntax: [storm kill topology-name [-w wait-time-secs]]
    -w後接秒數,表示多少秒後將中止該topology做業

再作進一步的驗證,若是把三臺主機上除了了worker進程(nimbus、supervisor等)都關掉,那麼此時worker是能夠繼續正常運行的,數據也會正常產生,只是此時不一樣的是,不可以再向storm集羣中添加做業了。

相關文章
相關標籤/搜索