Storm筆記——技術點彙總

目錄

· 概述java

· 手工搭建集羣python

    · 引言react

    · 安裝Python算法

    · 配置文件數據庫

    · 啓動與測試apache

· 應用部署編程

    · 參數配置centos

    · Storm命令網絡

· 原理架構

    · Storm架構

    · Storm組件

    · Stream Grouping

    · 守護進程容錯性(Daemon Fault Tolerance)

    · 數據可靠性(Guaranteeing Message Processing)

    · 消息傳輸機制

· API

    · WordCount示例

    · 應用部署方式

    · 組件接口

    · 組件實現類

    · 數據鏈接方式

    · 經常使用Topology模式

    · 日誌(集羣模式)

    · 並行度設置

    · tick定時機制

    · 序列化

    · 與其餘系統集成

· 性能調優


 

 

概述

1. Apache StormTwitter開源的分佈式實時計算框架。

2. Storm的主要開發語言是JavaClojure,其中Java定義骨架,Clojure編寫核心邏輯。

3. Storm應用(Topology):Spout是水龍頭,源源不斷讀取消息併發送出去;Bolt是水管的每一個轉接口,經過Stream分組的策略轉發消息流。

4. Storm特性

    a) 易用性:只要遵循Topology、Spout和Bolt編程規範便可開發出擴展性極好的應用,無需瞭解底層RPC、Worker之間冗餘及數據分流等。

    b) 容錯性:守護進程(Nimbus、Supervisor等)是無狀態的,狀態保存在ZooKeeper可隨意重啓;Worker失效或機器故障時,Storm自動分配新Worker替換失效Worker。

    c) 伸縮性:可線性伸縮。

    d) 完整性:採用Acker機制,保證數據不丟失;採用事務機制,保證數據準確性。

5. Storm應用方向

    a) 流處理(Stream Processing):最基本的應用,Storm處理源源不斷流進來的消息,處理後將結果寫到某存儲。

    b) 連續計算(Continuous Computation):Storm保證計算永遠運行,直到用戶結束計算進程。

    c) 分佈式RPC(Distributed RPC):可做爲分佈式RPC框架使用。

6. 與Spark Streaming比較

    a) 聯合歷史數據:Spark Streaming將流數據分紅小的時間片斷(幾秒到幾分鐘),以相似batch批量處理的方式處理這些小部分數據,所以可同時兼容批量和實時數據處理的邏輯和算法,便於歷史數據和實時數據聯合分析。

    b) 延遲:Storm處理每次傳入的一個事件,Spark Streaming處理某個時間段窗口內的事件流,所以Storm延遲極低,Spark Streaming相對較高。

手工搭建集羣

引言

1. 環境:

Role

Host name

Nimbus

centos1

Supervisor

centos2

centos3

2. 假設已成功安裝JDK、ZooKeeper集羣。

安裝Python

1. [Nimbus、Supervisor]登陸root用戶安裝Python到/usr/local/python目錄下。

tar zxvf Python-2.7.13.tgz // Ubuntu要先安裝依賴sudo apt-get install build-essential zlib1g-dev
cd Python-2.7.13/
./configure --prefix=/usr/local/python
make
sudo make install

2. [Nimbus、Supervisor]配置命令。

ln -s /usr/local/python/bin/python /usr/bin/python // 軟連接
python -V // 驗證

配置文件

3. [Nimbus]

tar zxvf apache-storm-1.1.0.tar.gz -C /opt/app
cd /opt/app/apache-storm-1.1.0
vi conf/storm.yaml
storm.zookeeper.servers:
    - "centos1"
    - "centos2"
storm.zookeeper.port: 2181
nimbus.seeds: ["centos1"]
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
storm.local.dir: "/opt/data/storm.local.dir"
ui.port: 8080

4. [Nimbus]從Nimbus複製Storm目錄到各Supervisor。

scp -r /opt/app/apache-storm-1.1.0 hadoop@centos2:/opt/app
scp -r /opt/app/apache-storm-1.1.0 hadoop@centos3:/opt/app

啓動與測試

5. [Nimbus、Supervisor]配置Storm環境變量。

export STORM_HOME=/opt/app/apache-storm-1.1.0
export PATH=$PATH:$STORM_HOME/bin

6. [Nimbus]啓動守護進程。

nohup bin/storm nimbus 1>/dev/null 2>&1 &
nohup bin/storm ui 1>/dev/null 2>&1 &
nohup bin/storm logviewer 1>/dev/null 2>&1 &
jps
nimbus # Nimbus守護進程
core # Storm UI守護進程
logviewer # LogViewer守護進程

7. [Supervisor]啓動守護進程。

nohup bin/storm supervisor 1>/dev/null 2>&1 &
nohup bin/storm logviewer 1>/dev/null 2>&1 &
jps
supervisor # Supervisor守護進程
logviewer # LogViewer守護進程

8. [Nimbus]測試。

storm jar teststorm.jar teststorm.WordCountTopology wordcount

9. 監控頁面。

http://centos1:8080

Storm UI

10. [Nimbus]關閉守護進程。

kill -s TERM ${PID} # PID爲各守護進程ID

應用部署

參數配置

1. 配置方式

    a) External Component Specific Configuration:經過TopologyBuilder的setSpout和setBold方法返回的SpoutDeclarer和BoldDeclarer對象的一系列方法。

    b) Internal Component Specific Configuration:Override Spout和Bold的getComponentConfiguration方法並返回Map。

    c) Topology Specific Configuration:命令傳參「bin/storm -c conf1=v1 -c conf2=v2」。

    d) storm.yaml:「$STORM_HOME/conf/storm.yaml」。

    e) defaults.yaml:「$STORM_HOME/lib/storm-core-x.y.z.jar/defaults.yaml」。

2. 參數優先級:

    defaults.yaml

    < storm.yaml

    < Topology Specific Configuration

    < Internal Component Specific Configuration

    < External Component Specific Configuration

Storm命令

經常使用命令,詳情參考官方文檔。

    a) storm jar topology-jar-path class ...

    b) storm kill topology-name [-w wait-time-secs]

    c) storm activate topology-name

    d) storm deactivate topology-name

    e) storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*

    f) storm classpath

    g) storm localconfvalue conf-name

    h) storm remoteconfvalue conf-name

    i) storm nimbus

    j) storm supervisor

    k) storm ui

    l) storm get-errors topology-name

    m) storm kill_workers

    n) storm list

    o) storm logviewer

    p) storm set_log_level -l [logger name]=[log level][:optional timeout] -r [logger name] topology-name

    q) storm version

    r) storm help [command]

原理

Storm架構

Storm組件

名稱

說明

Nimbus

負責資源分配和任務調度,相似HadoopJobTracker

Supervisor

負責接受Nimbus分配的任務,啓動和中止屬於本身管理的Worker進程,相似HadoopTaskTracker

Worker

運行具體處理組件邏輯的進程。

Executor

ExecutorWorker進程中具體的物理線程,同一個Spout/BoltTask可能會共享一個物理線程,一個Executor中只能運行隸屬於同一個Spout/BoltTask

Task

每個Spout/Bolt具體要作的工做,也是各節點之間進行分組的單位。

Topology

一個實時計算應用程序邏輯上被封裝在Topology對象中,相似Hadoop的做業。與做業不一樣的是,Topology會一直運行直到顯式被殺死。

Spout

a) 在Topology中產生源數據流。

b) 一般Spout獲取數據源的數據(如MQ),而後調用nextTuple方法,發射數據供Bolt消費。

c) 可經過OutputFieldsDeclarerdeclareStream方法聲明1或多個流,並經過OutputCollectoremit方法向指定流發射數據。

Bolt

a) 在Topology中接受Spout的數據並執行處理。

b) 當處理複雜邏輯時,可分紅多個Bolt處理。

c) Bolt在接受到消息後調用execute方法,在此可執行過濾、合併、寫數據庫等操做。

d) 可經過OutputFieldsDeclarerdeclareStream方法聲明1或多個流,並經過OutputCollectoremit方法向指定流發射數據。

Tuple

消息傳遞的基本單元。

Stream

源源不斷傳遞的Tuple組成了Stream

Stream Grouping

即消息的分區(partition),內置了7種分組策略。

Stream Grouping

1. Stream Grouping定義了數據流在Bolt間如何被切分。

2. 內置7種Stream Grouping策略

    a) Shuffle grouping:隨機分組,保證各Bolt接受的Tuple數量一致。

    b) Fields grouping:根據Tuple中某一個或多個字段分組,相同分組字段的Tuple發送至同一Bolt。

    c) All grouping:數據流被複制發送給全部Bolt,慎用。

    d) Global grouping:整個數據流只發給ID值最小的Bolt。

    e) None grouping:不關心如何分組,當前與Shuffle grouping相同。

    f) Direct grouping:生產Tuple的Spout/Bolt指定該Tuple的消費者Bolt。經過OutputCollector的emitDirect方法實現。

    g) Local or shuffle grouping:若是目標Bolt有一個或多個Task與當前生產Tuple的Task在同一Worker進程,那麼將該Tuple發送給該目標Bolt;不然Shuffle grouping。

3. 自定義Stream Grouping策略:實現CustomStreamGrouping接口。

守護進程容錯性(Daemon Fault Tolerance)

1. Worker:若是Worker故障,則Supervisor重啓該Worker;若是其仍然故障,則Nimbus從新分配Worker資源。

2. 節點:若是節點機器故障,則該節點上的Task將超時,Nimbus將這些Task分配到其餘節點。

3. Nimbus和Supervisor:Nimbus和Supervisor都是fail-fast(故障時進程自銷燬)和stateless(狀態保存在ZooKeeper或磁盤),故障後重啓進程便可;Worker進程不會

受Nimbus或Supervisor故障影響,但Worker進程的跨節點遷移會受影響。

4. Nimbus:Storm v1.0.0開始引入Nimbus HA。

數據可靠性(Guaranteeing Message Processing)

1. MessageId:Storm容許用戶在Spout發射新Tuple時爲其指定MessageId(Object類型);多個Tuple可共用同一MessageId,表示它們是同一消息單元。

2. 可靠性:Tuple超時時間內,該MessageId綁定的Stream Tuple及其衍生的全部Tuple都已通過Topology中應該到達的Bolt處理;Storm使用Acker解決Tuple消息可靠性問

題(調用OutputCollector的ack和fail方法告知Storm該Tuple處理成功和失敗)。

3. Tuple超時時間:經過參數「topology.message.timeout.secs」配置,默認30秒。

4. 錨定(Anchoring)

    a) Tuple從Spout到Bolt造成了Tuple tree,以WordCount爲例:

    b) 錨定:Tuple被錨定後,若是Tuple未被下游ack,根節點的Spout將稍後重發Tuple。

    c) 錨定的API寫法:

1 // Spout
2 collector.emit(new Values(content1), uniqueMessageId);
1 // Bold
2 collector.emit(tuple, new Values(content2));
3 collector.ack(tuple);

    d) 未錨定:Tuple未被錨定,若是Tuple未被下游ack,根節點的Spout不會重發Tuple。
    e) 未錨定的API寫法:

1 // Bold
2 collector.emit(new Values(content));
3 collector.ack(tuple);

    f) 複合錨定:一個輸出Tuple可被錨定到多個輸入Tuple。複合錨定會打破樹結構,造成有向無環圖(DAG)。

    g) 複合錨定API寫法:

1 // Bold
2 List<Tuple> anchors = new ArrayList<>();
3 anchors.add(tuple1);
4 anchors.add(tuple2);
5 collector.emit(anchors, new Values(content));
6 collector.ack(tuple);

    h) ack和fail:每個Tuple必須執行ack或fail,Storm使用內存跟蹤每一個Tuple,若是未ack或fail,任務最終會內存耗盡。

    i) Acker任務:Topology有一組特殊的Acker任務跟蹤Tuple樹或有向無環圖,經過參數「topology.acker.executors」或「Config.TOPOLOGY_ACKER_EXECUTORS」配置Acker任務數量,默認爲1。處理量大時應增大該值。

5. 關閉可靠性:若是對可靠性要求不高,可關閉以提升性能。

    a) 方法1:設置「Config.TOPOLOGY_ACKER_EXECUTORS」爲0。

    b) 方法2:採用未錨定的API寫法寫法。

消息傳輸機制

自Storm 0.9.0開始使用Netty做爲消息通訊解決方案,已再也不須要ZeroMQ。

API

WordCount示例

1. WordCountTopology.java

 1 import org.apache.storm.Config;
 2 import org.apache.storm.LocalCluster;
 3 import org.apache.storm.StormSubmitter;
 4 import org.apache.storm.generated.AlreadyAliveException;
 5 import org.apache.storm.generated.AuthorizationException;
 6 import org.apache.storm.generated.InvalidTopologyException;
 7 import org.apache.storm.topology.TopologyBuilder;
 8 import org.apache.storm.tuple.Fields;
 9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 
12 public class WordCountTopology {
13     
14     private static final Logger logger = LoggerFactory.getLogger(WordCountTopology.class);
15 
16     public static void main(String[] args) throws InterruptedException {
17         final String inputFile = "/opt/app/apache-storm-1.1.0/LICENSE";
18         final String outputDir = "/opt/workspace/wordcount";
19         
20         TopologyBuilder builder = new TopologyBuilder();
21         builder.setSpout(FileReadSpout.class.getSimpleName(), new FileReadSpout(inputFile));
22         builder.setBolt(LineSplitBolt.class.getSimpleName(), new LineSplitBolt())
23                 .shuffleGrouping(FileReadSpout.class.getSimpleName());
24         // 最終生成4個文件
25         builder.setBolt(WordCountBolt.class.getSimpleName(), new WordCountBolt(outputDir), 2)
26                 .setNumTasks(4)
27                 .fieldsGrouping(LineSplitBolt.class.getSimpleName(), new Fields("word"));
28         
29         Config conf = new Config();
30         conf.setDebug(true);
31         if (args != null && args.length > 0) {
32             try {
33                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
34             } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
35                 logger.error("Failed to submit " + WordCountTopology.class.getName() + ".", e);
36             }
37         } else {
38             conf.setDebug(true);
39             LocalCluster cluster = new LocalCluster();
40             cluster.submitTopology(WordCountTopology.class.getSimpleName(), conf, builder.createTopology());
41             Thread.sleep(30 * 1000);
42             cluster.shutdown();
43         }
44     }
45 
46 }

2. FileReadSpout.java

 1 import java.io.BufferedReader;
 2 import java.io.FileNotFoundException;
 3 import java.io.FileReader;
 4 import java.io.IOException;
 5 import java.util.Map;
 6 
 7 import org.apache.storm.spout.SpoutOutputCollector;
 8 import org.apache.storm.task.TopologyContext;
 9 import org.apache.storm.topology.OutputFieldsDeclarer;
10 import org.apache.storm.topology.base.BaseRichSpout;
11 import org.apache.storm.tuple.Fields;
12 import org.apache.storm.tuple.Values;
13 
14 public class FileReadSpout extends BaseRichSpout {
15 
16     private static final long serialVersionUID = 8543601286964250940L;
17     
18     private String inputFile;
19     
20     private BufferedReader reader;
21     
22     private SpoutOutputCollector collector;
23     
24     public FileReadSpout(String inputFile) {
25         this.inputFile = inputFile;
26     }
27 
28     @Override
29     @SuppressWarnings("rawtypes")
30     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
31         try {
32             reader = new BufferedReader(new FileReader(inputFile));
33         } catch (FileNotFoundException e) {
34             throw new RuntimeException("Cannot find file [" + inputFile + "].", e);
35         }
36         this.collector = collector;
37     }
38     
39     @Override
40     public void nextTuple() {
41         try {
42             String line = null;
43             while ((line = reader.readLine()) != null) {
44                 collector.emit(new Values(line));
45             }
46         } catch (IOException e) {
47             throw new RuntimeException("Encountered a file read error.", e);
48         }
49     }
50 
51     @Override
52     public void declareOutputFields(OutputFieldsDeclarer declarer) {
53         declarer.declare(new Fields("line"));
54     }
55 
56     @Override
57     public void close() {
58         if (reader != null) {
59             try {
60                 reader.close();
61             } catch (IOException e) {
62                 // Ignore
63             }
64         }
65         super.close();
66     }
67 
68 }

3. LineSplitBolt.java

 1 import java.util.Map;
 2 
 3 import org.apache.storm.task.OutputCollector;
 4 import org.apache.storm.task.TopologyContext;
 5 import org.apache.storm.topology.OutputFieldsDeclarer;
 6 import org.apache.storm.topology.base.BaseRichBolt;
 7 import org.apache.storm.tuple.Fields;
 8 import org.apache.storm.tuple.Tuple;
 9 import org.apache.storm.tuple.Values;
10 
11 public class LineSplitBolt extends BaseRichBolt {
12 
13     private static final long serialVersionUID = -2045688041930588092L;
14 
15     private OutputCollector collector;
16     
17     @Override
18     @SuppressWarnings("rawtypes")
19     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
20         this.collector = collector;
21     }
22     
23     @Override
24     public void execute(Tuple tuple) {
25         String line = tuple.getStringByField("line");
26         String[] words = line.split(" ");
27         for (String word : words) {
28             word = word.trim();
29             if (!word.isEmpty()) {
30                 word = word.toLowerCase();
31                 collector.emit(new Values(word, 1));
32             }
33         }
34         
35         collector.ack(tuple);
36     }
37 
38     @Override
39     public void declareOutputFields(OutputFieldsDeclarer declarer) {
40         declarer.declare(new Fields("word", "count"));
41     }
42 
43     @Override
44     public void cleanup() {
45         super.cleanup();
46     }
47 
48 }

4. WordCountBolt.java

 1 import java.io.IOException;
 2 import java.io.RandomAccessFile;
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.UUID;
 6 
 7 import org.apache.storm.task.OutputCollector;
 8 import org.apache.storm.task.TopologyContext;
 9 import org.apache.storm.topology.OutputFieldsDeclarer;
10 import org.apache.storm.topology.base.BaseRichBolt;
11 import org.apache.storm.tuple.Tuple;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 
15 public class WordCountBolt extends BaseRichBolt {
16 
17     private static final long serialVersionUID = 8239697869626573368L;
18     
19     private static final Logger logger = LoggerFactory.getLogger(WordCountBolt.class);
20     
21     private String outputDir;
22 
23     private OutputCollector collector;
24     
25     private Map<String, Integer> wordCounter;
26     
27     public WordCountBolt(String outputDir) {
28         this.outputDir = outputDir;
29     }
30 
31     @Override
32     @SuppressWarnings("rawtypes")
33     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
34         this.collector = collector;
35         wordCounter = new HashMap<>();
36     }
37     
38     @Override
39     public void execute(Tuple tuple) {
40         String word = tuple.getStringByField("word");
41         Integer count = tuple.getIntegerByField("count");
42         Integer wordCount = wordCounter.get(word);
43         if (wordCount == null) {
44             wordCounter.put(word, count);
45         } else {
46             wordCounter.put(word, count + wordCount);
47         }
48         
49         collector.ack(tuple);
50     }
51 
52     @Override
53     public void declareOutputFields(OutputFieldsDeclarer declarer) {
54     }
55     
56     @Override
57     public void cleanup() {
58         if (wordCounter != null) {
59             outputResult(wordCounter);
60             wordCounter.clear();
61         }
62         super.cleanup();
63     }
64     
65     private void outputResult(Map<String, Integer> wordCounter) {
66         String filePath = outputDir + "/" + UUID.randomUUID().toString();
67         RandomAccessFile randomAccessFile = null;
68         try {
69             randomAccessFile = new RandomAccessFile(filePath, "rw");
70             for (Map.Entry<String, Integer> entry : wordCounter.entrySet()) {
71                 randomAccessFile.writeChars(entry.getKey());
72                 randomAccessFile.writeChar('\t');
73                 randomAccessFile.writeChars(String.valueOf(entry.getValue()));
74                 randomAccessFile.writeChar('\n');
75             }
76         } catch (IOException e) {
77             logger.error("Failed to write file [" + filePath + "].", e);
78         } finally {
79             if (randomAccessFile != null) {
80                 try {
81                     randomAccessFile.close();
82                 } catch (IOException e) {
83                     logger.warn("Failed to close output stream.", e);
84                 }
85             }
86         }
87     }
88 
89 }

應用部署方式

應用程序部署(Topology提交)分類

    a) 本地模式:在進程中模擬Storm集羣,用於開發、測試。

    b) 集羣模式:用於生產。

組件接口

1. IComponent

 1 package org.apache.storm.topology;
 2 
 3 import java.io.Serializable;
 4 import java.util.Map;
 5 
 6 /**
 7  * Common methods for all possible components in a topology. This interface is used
 8  * when defining topologies using the Java API. 
 9  */
10 public interface IComponent extends Serializable {
11 
12     /**
13      * Declare the output schema for all the streams of this topology.
14      *
15      * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
16      */
17     void declareOutputFields(OutputFieldsDeclarer declarer);
18 
19     /**
20      * Declare configuration specific to this component. Only a subset of the "topology.*" configs can
21      * be overridden. The component configuration can be further overridden when constructing the 
22      * topology using {@link TopologyBuilder}
23      *
24      */
25     Map<String, Object> getComponentConfiguration();
26 
27 }

2. ISpout

 1 package org.apache.storm.spout;
 2 
 3 import org.apache.storm.task.TopologyContext;
 4 import java.util.Map;
 5 import java.io.Serializable;
 6 
 7 /**
 8  * ISpout is the core interface for implementing spouts. A Spout is responsible
 9  * for feeding messages into the topology for processing. For every tuple emitted by
10  * a spout, Storm will track the (potentially very large) DAG of tuples generated
11  * based on a tuple emitted by the spout. When Storm detects that every tuple in
12  * that DAG has been successfully processed, it will send an ack message to the Spout.
13  *
14  * If a tuple fails to be fully processed within the configured timeout for the
15  * topology (see {@link org.apache.storm.Config}), Storm will send a fail message to the spout
16  * for the message.
17  *
18  * When a Spout emits a tuple, it can tag the tuple with a message id. The message id
19  * can be any type. When Storm acks or fails a message, it will pass back to the
20  * spout the same message id to identify which tuple it's referring to. If the spout leaves out
21  * the message id, or sets it to null, then Storm will not track the message and the spout
22  * will not receive any ack or fail callbacks for the message.
23  *
24  * Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor
25  * of an ISpout does not need to worry about concurrency issues between those methods. However, it 
26  * also means that an implementor must ensure that nextTuple is non-blocking: otherwise 
27  * the method could block acks and fails that are pending to be processed.
28  */
29 public interface ISpout extends Serializable {
30     /**
31      * Called when a task for this component is initialized within a worker on the cluster.
32      * It provides the spout with the environment in which the spout executes.
33      *
34      * This includes the:
35      *
36      * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
37      * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
38      * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
39      */
40     void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
41 
42     /**
43      * Called when an ISpout is going to be shutdown. There is no guarentee that close
44      * will be called, because the supervisor kill -9's worker processes on the cluster.
45      *
46      * The one context where close is guaranteed to be called is a topology is
47      * killed when running Storm in local mode.
48      */
49     void close();
50     
51     /**
52      * Called when a spout has been activated out of a deactivated mode.
53      * nextTuple will be called on this spout soon. A spout can become activated
54      * after having been deactivated when the topology is manipulated using the 
55      * `storm` client. 
56      */
57     void activate();
58     
59     /**
60      * Called when a spout has been deactivated. nextTuple will not be called while
61      * a spout is deactivated. The spout may or may not be reactivated in the future.
62      */
63     void deactivate();
64 
65     /**
66      * When this method is called, Storm is requesting that the Spout emit tuples to the 
67      * output collector. This method should be non-blocking, so if the Spout has no tuples
68      * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
69      * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
70      * to have nextTuple sleep for a short amount of time (like a single millisecond)
71      * so as not to waste too much CPU.
72      */
73     void nextTuple();
74 
75     /**
76      * Storm has determined that the tuple emitted by this spout with the msgId identifier
77      * has been fully processed. Typically, an implementation of this method will take that
78      * message off the queue and prevent it from being replayed.
79      */
80     void ack(Object msgId);
81 
82     /**
83      * The tuple emitted by this spout with the msgId identifier has failed to be
84      * fully processed. Typically, an implementation of this method will put that
85      * message back on the queue to be replayed at a later time.
86      */
87     void fail(Object msgId);
88 }

3. IBolt

 1 package org.apache.storm.task;
 2 
 3 import org.apache.storm.tuple.Tuple;
 4 import java.util.Map;
 5 import java.io.Serializable;
 6 
 7 /**
 8  * An IBolt represents a component that takes tuples as input and produces tuples
 9  * as output. An IBolt can do everything from filtering to joining to functions
10  * to aggregations. It does not have to process a tuple immediately and may
11  * hold onto tuples to process later.
12  *
13  * A bolt's lifecycle is as follows:
14  *
15  * IBolt object created on client machine. The IBolt is serialized into the topology
16  * (using Java serialization) and submitted to the master machine of the cluster (Nimbus).
17  * Nimbus then launches workers which deserialize the object, call prepare on it, and then
18  * start processing tuples.
19  *
20  * If you want to parameterize an IBolt, you should set the parameters through its
21  * constructor and save the parameterization state as instance variables (which will
22  * then get serialized and shipped to every task executing this bolt across the cluster).
23  *
24  * When defining bolts in Java, you should use the IRichBolt interface which adds
25  * necessary methods for using the Java TopologyBuilder API.
26  */
27 public interface IBolt extends Serializable {
28     /**
29      * Called when a task for this component is initialized within a worker on the cluster.
30      * It provides the bolt with the environment in which the bolt executes.
31      *
32      * This includes the:
33      * 
34      * @param stormConf The Storm configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine.
35      * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
36      * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object.
37      */
38     void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
39 
40     /**
41      * Process a single tuple of input. The Tuple object contains metadata on it
42      * about which component/stream/task it came from. The values of the Tuple can
43      * be accessed using Tuple#getValue. The IBolt does not have to process the Tuple
44      * immediately. It is perfectly fine to hang onto a tuple and process it later
45      * (for instance, to do an aggregation or join).
46      *
47      * Tuples should be emitted using the OutputCollector provided through the prepare method.
48      * It is required that all input tuples are acked or failed at some point using the OutputCollector.
49      * Otherwise, Storm will be unable to determine when tuples coming off the spouts
50      * have been completed.
51      *
52      * For the common case of acking an input tuple at the end of the execute method,
53      * see IBasicBolt which automates this.
54      * 
55      * @param input The input tuple to be processed.
56      */
57     void execute(Tuple input);
58 
59     /**
60      * Called when an IBolt is going to be shutdown. There is no guarentee that cleanup
61      * will be called, because the supervisor kill -9's worker processes on the cluster.
62      *
63      * The one context where cleanup is guaranteed to be called is when a topology
64      * is killed when running Storm in local mode.
65      */
66     void cleanup();
67 }

4. IRichSpout

 1 package org.apache.storm.topology;
 2 
 3 import org.apache.storm.spout.ISpout;
 4 
 5 /**
 6  * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
 7  * to use to implement components of the topology.
 8  *
 9  */
10 public interface IRichSpout extends ISpout, IComponent {
11 
12 }

5. IRichBolt

 1 package org.apache.storm.topology;
 2 
 3 import org.apache.storm.task.IBolt;
 4 
 5 /**
 6  * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
 7  * to use to implement components of the topology.
 8  *
 9  */
10 public interface IRichBolt extends IBolt, IComponent {
11 
12 }

6. IBasicBolt

 1 package org.apache.storm.topology;
 2 
 3 import org.apache.storm.task.TopologyContext;
 4 import org.apache.storm.tuple.Tuple;
 5 import java.util.Map;
 6 
 7 public interface IBasicBolt extends IComponent {
 8     void prepare(Map stormConf, TopologyContext context);
 9     /**
10      * Process the input tuple and optionally emit new tuples based on the input tuple.
11      * 
12      * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
13      */
14     void execute(Tuple input, BasicOutputCollector collector);
15     void cleanup();
16 }

7. IStateSpout(Storm內部未完成)

 1 package org.apache.storm.state;
 2 
 3 import org.apache.storm.task.TopologyContext;
 4 import java.io.Serializable;
 5 import java.util.Map;
 6 
 7 public interface IStateSpout extends Serializable {
 8     void open(Map conf, TopologyContext context);
 9     void close();
10     void nextTuple(StateSpoutOutputCollector collector);
11     void synchronize(SynchronizeOutputCollector collector);
12 }

8. IRichStateSpout(Storm內部未完成)

1 package org.apache.storm.topology;
2 
3 import org.apache.storm.state.IStateSpout;
4 
5 
6 public interface IRichStateSpout extends IStateSpout, IComponent {
7 
8 }

組件實現類

1. BaseComponent

 1 package org.apache.storm.topology.base;
 2 
 3 import org.apache.storm.topology.IComponent;
 4 import java.util.Map;
 5 
 6 public abstract class BaseComponent implements IComponent {
 7     @Override
 8     public Map<String, Object> getComponentConfiguration() {
 9         return null;
10     }    
11 }

2. BaseRichSpout

 1 package org.apache.storm.topology.base;
 2 
 3 import org.apache.storm.topology.IRichSpout;
 4 
 5 public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
 6     @Override
 7     public void close() {
 8     }
 9 
10     @Override
11     public void activate() {
12     }
13 
14     @Override
15     public void deactivate() {
16     }
17 
18     @Override
19     public void ack(Object msgId) {
20     }
21 
22     @Override
23     public void fail(Object msgId) {
24     }
25 }

3. BaseRichBolt

1 package org.apache.storm.topology.base;
2 
3 import org.apache.storm.topology.IRichBolt;
4 
5 public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
6     @Override
7     public void cleanup() {
8     }    
9 }

4. BaseBasicBolt

 1 package org.apache.storm.topology.base;
 2 
 3 import org.apache.storm.task.TopologyContext;
 4 import org.apache.storm.topology.IBasicBolt;
 5 import java.util.Map;
 6 
 7 public abstract class BaseBasicBolt extends BaseComponent implements IBasicBolt {
 8 
 9     @Override
10     public void prepare(Map stormConf, TopologyContext context) {
11     }
12 
13     @Override
14     public void cleanup() {
15     }    
16 }

數據鏈接方式

1. 直接鏈接(Direct Connection)

    a) 場景:特別適合消息發射器是已知設備或設備組的場景。已知設備指在Topology啓動時已知並在Topology生命週期中保持不變的設備。對於變化的設備可協調器通知Topology建立新Spout鏈接。

    b) 直接鏈接架構圖

    c) 設備組直接鏈接架構圖

    d) 基於協調器的直接鏈接

2. 消息隊列(Enqueued Messages)

經常使用Topology模式

1. BasicBolt

    a) 含義:Storm自動在Bolt的execute方法後ack輸入的Tuple。

    b) 方法:實現org.apache.storm.topology.IBasicBolt接口。

2. 流鏈接(Stream Join)

    a) 含義:基於某些字段,把兩個或更多數據流結合到一塊兒,造成一個新的數據流。

    b) 方法:

1 builder.setBolt("join", new Joiner(), parallelism)
2         .fieldGrouping("1", new Field("1-joinfield1", "1-joinfield2"))
3         .fieldGrouping("2", new Field("2-joinfield1", "2-joinfield2"))
4         .fieldGrouping("3", new Field("3-joinfield1", "3-joinfield2"));

3. 批處理(Batching)

    a) 含義:對一組Tuple處理而不是單個處理。

    b) 方法:在Bolt成員變量保存Tuple引用以便批處理,處理完成後,ack該批Tuple。

4. TopN

    a) 含義:按照某個統計指標(如出現次數)計算TopN,而後每隔一段時間輸出TopN結果。例如微博的熱門話題、熱門點擊圖片。

    b) 方法:爲處理大數據量的流,可先由多個Bolt並行計算TopN,再合併結果到一個Bolt計算全局TopN。

1 builder.setBolt("rank", new RankBolt(), parallelism)
2         .fieldGrouping("spout", new Fields("count"));
3 builder.setBolt("merge_rank", new MergeRank())
4         .globalGrouping("rank");

日誌(集羣模式)

1. 提交日誌:「$STORM_HOME/logs/nimbus.log」。

2. 運行日誌

    a) 設置日誌:Storm UI或「$STORM_HOME/logback/cluster.xml」。

    b) 查看日誌:Storm UI或各節點的「$STORM_HOME/logs/worker-port.log」(port爲具體數字)。

3. 日誌框架衝突:Storm使用logback日誌框架,logback和log4j做爲兩套slf4j的實現不可共存,應在Maven中剔除其餘框架引入的log4j:

 1 <dependency>
 2     <groupId>...</groupId>
 3     <artifactId>...</artifactId>
 4     <version>...</version>
 5     <exclusions>
 6         <exclusion>
 7             <groupId>org.slf4j</groupId>
 8             <artifactId>slf4j-log4j12</artifactId>
 9         </exclusion>
10         <exclusion>
11             <groupId>log4j</groupId>
12             <artifactId>log4j</artifactId>
13         </exclusion>
14     </exclusions>
15 </dependency>

並行度設置

1. 組件與並行度關係:

    a) 一個運行的Topology由集羣中各機器運行的多個Worker進程組成,一個Worker進程僅屬於一個Topology;

    b) 一個Worker進程中包含一或多個Executor線程;

    c) 一個Executor線程中包含一或多個Task,Task是Spout或Bolt;

    d) 默認,一個Executor對應一個Task。

2. 示例

    a) 代碼(提交時設置並行度)

 1 Config conf = new Config();
 2 conf.setNumWorkers(2); // use two worker processes
 3 topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2);
 4 topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
 5                .setNumTasks(4)
 6                .shuffleGrouping("blue-spout");
 7 topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
 8                .shuffleGrouping("green-bolt");
 9 StormSubmitter.submitTopology(
10         "mytopology",
11         conf,
12         topologyBuilder.createTopology()
13     );

    b) 並行度:mytopology總共包含2個Worker進程、10個Executor線程(2個blue-spout + 2個green-bolt + 6個yellow-bolt)、12個Task(2個blue-spout + 4個green-bolt + 6個yellow-bolt),其中2個green-bolt Executor各運行2個Bolt Task。

    c) 運行時修改並行度:從新設置爲5個Worker進程、3個blue-spout Executor、10個yellow-bolt Executor。

storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

tick定時機制

1. 場景:定時處理某些業務邏輯,例如每5分鐘統計一次並將結果保存到數據庫。

2. 原理:讓Topology的系統組件定時發送tick消息,Bolt接收到tick消息後,觸發相應的業務邏輯。

3. 代碼:修改WordCount代碼,定時5秒輸出一次。

  1 import java.io.IOException;
  2 import java.io.RandomAccessFile;
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 import java.util.UUID;
  6 
  7 import org.apache.storm.Config;
  8 import org.apache.storm.task.OutputCollector;
  9 import org.apache.storm.task.TopologyContext;
 10 import org.apache.storm.topology.OutputFieldsDeclarer;
 11 import org.apache.storm.topology.base.BaseRichBolt;
 12 import org.apache.storm.tuple.Tuple;
 13 import org.apache.storm.utils.TupleUtils;
 14 import org.slf4j.Logger;
 15 import org.slf4j.LoggerFactory;
 16 
 17 public class WordCountBolt extends BaseRichBolt {
 18 
 19     private static final long serialVersionUID = 8239697869626573368L;
 20     
 21     private static final Logger logger = LoggerFactory.getLogger(WordCountBolt.class);
 22     
 23     private String outputDir;
 24 
 25     private OutputCollector collector;
 26     
 27     private Map<String, Integer> wordCounter;
 28     
 29     public WordCountBolt(String outputDir) {
 30         this.outputDir = outputDir;
 31     }
 32 
 33     @Override
 34     @SuppressWarnings("rawtypes")
 35     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
 36         this.collector = collector;
 37         wordCounter = new HashMap<>();
 38     }
 39     
 40     @Override
 41     public void execute(Tuple tuple) {
 42         if (TupleUtils.isTick(tuple)) {    // tick tuple
 43             outputResult(wordCounter);
 44             wordCounter.clear();
 45         } else {    // 正常tuple
 46             String word = tuple.getStringByField("word");
 47             Integer count = tuple.getIntegerByField("count");
 48             Integer wordCount = wordCounter.get(word);
 49             if (wordCount == null) {
 50                 wordCounter.put(word, count);
 51             } else {
 52                 wordCounter.put(word, count + wordCount);
 53             }
 54         }
 55         
 56         collector.ack(tuple);
 57     }
 58 
 59     @Override
 60     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 61     }
 62     
 63     @Override
 64     public void cleanup() {
 65         if (wordCounter != null) {
 66             wordCounter.clear();
 67         }
 68         super.cleanup();
 69     }
 70     
 71     @Override
 72     public Map<String, Object> getComponentConfiguration() {
 73         Config conf = new Config();
 74         // 5秒定時
 75         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);  76         return conf;
 77     }
 78 
 79     private void outputResult(Map<String, Integer> wordCounter) {
 80         String filePath = outputDir + "/" + UUID.randomUUID().toString();
 81         RandomAccessFile randomAccessFile = null;
 82         try {
 83             randomAccessFile = new RandomAccessFile(filePath, "rw");
 84             for (Map.Entry<String, Integer> entry : wordCounter.entrySet()) {
 85                 randomAccessFile.writeChars(entry.getKey());
 86                 randomAccessFile.writeChar('\t');
 87                 randomAccessFile.writeChars(String.valueOf(entry.getValue()));
 88                 randomAccessFile.writeChar('\n');
 89             }
 90         } catch (IOException e) {
 91             logger.error("Failed to write file [" + filePath + "].", e);
 92         } finally {
 93             if (randomAccessFile != null) {
 94                 try {
 95                     randomAccessFile.close();
 96                 } catch (IOException e) {
 97                     logger.warn("Failed to close output stream.", e);
 98                 }
 99             }
100         }
101     }
102 
103 }

序列化

1. 用途:Storm是一個分佈式系統,Tuple對象在任務之間傳遞時需序列化和反序列化。

2. 支持類型:

    a) Java基本類型

    b) String

    c) byte[]

    d) ArrayList

    e) HashMap

    f) HashSet

    g) Clojure集合

    h) 自定義序列化

3. 序列化框架:Kryo,靈活、快速。

4. 序列化方式:動態類型,即無需爲Tuple中的字段聲明類型。

5. 自定義序列化:須要時參考官方文檔。

6. 未知類型序列化

    a) Storm使用Java序列化處理沒有序列化註冊的類,若是沒法序列化,則拋出異常。

    b) 可配置參數「topology.fall.back.on.java.serialization」爲false關閉Java序列化。

與其餘系統集成

Storm已封裝與如下系統集成的API,須要時參考官方文檔。

1. Apache Kafka

2. Apache HBase

3. Apache HDFS

4. Apache Hive

5. Apache Solr

6. Apache Cassandra

7. JDBC

8. JMS

9. Redis

10. Event Hubs

11. Elasticsearch

12. MQTT

13. Mongodb

14. OpenTSDB

15. Kinesis

16. Druid

17. Kestrel

性能調優

1. 不要在Spout處理耗時操做

    a) 背景:Spout是單線程的,啓用Ack時,在該線程同時執行Spout的nextTuple、ack和fail方法(JStorm啓動3個線程分別執行這3個方法)。

    b) 若是nextTuple方法很是耗時,Acker給Spout發送Tuple執行ack或fail方法沒法及時相應,可能形成ACK超時後被丟棄,Spout反而認爲該Tuple執行失敗。

    c) 若是ack或fail方法很是耗時,會影響Spout執行nextTuple方法發射數據量,形成Topology吞吐量下降。

2. 注意Fields grouping數據均衡性

若是按某分組字段分組後的數據,某些分組字段對應的數據很是多,另外一些很是少,那麼會形成下一級Bolt收到的數據不均衡,整個性能將受制於那些數據量很是大的節

點。

3. 優先使用Local or shuffle grouping

    a) 原理:使用Local or shuffle grouping時,在Worker內部傳輸,只需經過Disruptor隊列完成,無網絡開銷和序列化開銷。

    b) 結論:數據處理複雜度不高而網絡和序列化開銷佔主要時,使用Local or shuffle grouping代替Shuffle grouping。

4. 合理設置MaxSpoutPending

    a) 背景:啓用Ack時,Spout將已發射但未等到Ack的Tuple保存在RotatingMap。

    b) 設置方式:經過參數「topology.max.spout.pending」或TopologyBuilder.setSout.setMaxSpoutPending方法設置其最大個數。

    c) 方法:具體優化值再參考資料。

5. Netty優化

參數

默認值(defaults.yaml

storm.messaging.transport

org.apache.storm.messaging.netty.Context

storm.messaging.netty.server_worker_threads

1

storm.messaging.netty.client_worker_threads

1

storm.messaging.netty.buffer_size

5242880

storm.messaging.netty.transfer.batch.size

262144

6. JVM調優
參數「worker.childopts」,例如:

work.childopts: "-Xms2g -Xmx2g"

 

做者:netoxi
出處:http://www.cnblogs.com/netoxi本文版權歸做者和博客園共有,歡迎轉載,未經贊成須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。歡迎指正與交流。

相關文章
相關標籤/搜索