Storm 性能優化

目錄


  1. 場景假設
  2. 調優步驟和方法
  3. Storm 的部分特性
  4. Storm 並行度
  5. Storm 消息機制
  6. Storm UI 解析
  7. 性能優化

場景假設


在介紹 Storm 的性能調優方法以前,假設一個場景:
項目組部署了3臺機器,計劃運行且僅運行 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小規模實驗集羣,集羣的配置狀況以下表:html

主機名 硬件配置 角色描述
hd01 2CPUs, 4G RAM, 2TB 機械硬盤 nimbus, supervisor, ui, kafka, zk
hd02 2CPUs, 4G RAM, 2TB 機械硬盤 supervisor, kafka, zk
hd03 2CPUs, 4G RAM, 2TB 機械硬盤 supervisor, kafka, zk

現有一個任務,須要實時計算訂單的各項彙總統計信息。訂單數據經過 kafka 傳輸。在 Storm 中建立了一個 topology 來執行此項任務,並採用 Storm kafkaSpout 讀取該 topic 的數據。kafka 和 Storm topology 的基本信息以下:java

  • kafka topic partitions = 3
  • topology 的配置狀況:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new kafkaSpout(), 3);
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout");
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order"));

Config conf = new Config();
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那麼,在此假設下,Storm topology 的數據怎麼分發?性能如何調優?這就是下文要討論的內容,其中性能調優是最終目的,數據分發即 Storm 的消息機制,則是進行調優前的知識儲備。mysql

調優步驟和方法


Storm topology 的性能優化方法,總體來講,可依次劃分爲如下幾個步驟:linux

  1. 硬件配置的優化
  2. 代碼層面的優化
  3. topology 並行度的優化
  4. Storm 集羣配置參數和 topology 運行參數的優化

其中第一點不是討論的重點,無外乎增長機器的硬件資源,提升機器的硬件配置等,可是這一步卻也不能忽略,由於機器配置過低,極可能後面的步驟怎麼調優都無濟於事。redis

Storm 的一些特性和原理,是進行調優的必要知識儲備算法

Storm 的部分特性

目前 Storm 的最新版本爲 2.0.0-SNAPSHOT。該版本太新,未通過大量驗證和測試,所以本文的討論都基於 2.0 之前的版本。Storm 有以下幾個重要的特性:sql

  • DAG
  • 常駐內存,topology 啓動後除非 kill 掉不然一直運行
  • 提供 DRPC 服務
  • Pacemaker(1.0之後的新特性)心跳守護進程,常駐內存,比ZooKeeper性能更好
  • 採用了 ZeroMQ 和 Netty 做爲底層框架
  • 採用了 ACK/fail 方式的 tuple 追蹤機制
    而且 ack/fail 只能由建立該tuple的task所承載的spout觸發

瞭解這些機制對優化 Storm 的運行性能有必定幫助數據庫

Storm 並行度


Storm 是一個分佈式的實時計算軟件,各節點,各組件間的通訊依賴於 zookeeper。從組件的角度看,Storm 運做機制構建在 nimbus, supervisor, woker, executor, task, spout/bolt 之上,若是再加上 topology,有時也能夠稱這些組件爲 Concepts(概念)。詳見官網介紹文章 http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html 和
http://storm.apache.org/releases/current/Tutorial.htmlapache

在介紹 Storm 並行度以前,先歸納地瞭解下 Storm 的幾個概念,此處假定讀者有必定的 Storm 背景知識,至少曾經跑過一個 topology 實例。api

nimbus 和 supervisor

nimbus 是 Storm 集羣的管理和調度進程,一個集羣啓動一個 nimbus,主要用於管理 topology,執行rebalance,管理 supervisor,分發 task,監控集羣的健康情況等。nimbus 依賴 zookeeper 來實現上述職責,nimbus 與 supervisor 等其餘組件並無直接的溝通。運行 nimbus 的節點成爲主節點,運行 supervisor 的節點成爲工做節點,nimbus 向 supervisor 分派任務,所以 Storm 集羣也是一個 master/slave 集羣。簡單來講,nimbus 就是工頭,supervisor 就是工人,nimbus 經過 zookeeper 來管理 supervisor。

supervisor 是一個工做進程,負責監聽 nimbus 分派的任務。當它接到任務後,會啓動一個 worker 進程,由 worker 運行 topology 的一個子集。爲何說是子集呢?由於當一個 topology 提交到集羣后,nimbus 便會根據該 topology 的配置(此處假定 numWorker=3),將 topology 分配給3個 worker 並行執行(正常狀況下是這樣,也有不是均勻分配的,好比有一個 supervisor 節點內存不足了)。若是恰好集羣有3個 supervisor,則每一個 supervisor 會啓動1個 worker,即一個節點啓動一個 worker(一個節點只能有一個 supervisor 有效運行)。所以,worker 進程運行的是 topology 的一個子集。supervisor 一樣經過 zookeeper 與 nimbus 進行交流,所以 nimbus 和 supervisor 均可以快速失敗/中止,由於全部的狀態信息都保存在本地文件系統的 zookeeper 中, 當失敗中止運行後,只須要從新啓動 nimbus 或 supervisor 進程以快速恢復。固然,若是集羣中正在工做的 supervisor 中止了,其上運行着的 topology 子集也會跟着中止,不過一旦 supervisor 啓動起來,topology 子集又馬上恢復正常了。


nimbus 和 supervisor 的協做關係
worker

worker 是一個JVM進程,由 supervisor 啓動和關閉。當 supervisor 接到任務後,會根據 topology 的配置啓動若干 worker,實際的任務執行便由 worker 進行。worker 進程會佔用固定的可由配置進行修改的內存空間(默認768M)。一般使用 conf.setNumWorkers() 函數來指定一個 topolgoy 的 worker 數量。

executor

executor 是一個線程,由 worker 進程派生(spawned)。executor 線程負責根據配置派生 task 線程,默認一個 executor 建立一個 task,可經過 setNumTask() 函數指定每一個 executor 的 task 數量。executor 將實例化後的 spout/bolt 傳遞給 task。

task

task 能夠說是 topology 最終的實際的任務執行者,每一個 task 承載一個 spout 或 bolt 的實例,並調用其中的 spout.nexTuple(),bolt.execute() 等方法,而 spout.nexTuple() 是數據的發射器,bolt.execute() 則是數據的接收方,業務邏輯的代碼基本上都在這兩個函數裏面處理了,所以能夠說 task 是最終搬磚的苦逼。

topology

topology 中文翻譯爲拓撲,相似於 hdfs 上的一個 mapreduce 任務。一個 topology 定義了運行一個 Storm 任務的全部必要元件,主要包括 spout 和 bolt,以及 spout 和 bolt 之間的流向關係。


topology 結構
並行度

什麼是並行度?在 Storm 的設定裏,並行度大致分爲3個方面:

  1. 一個 topology 指定多少個 worker 進程並行運行;
  2. 一個 worker 進程指定多少個 executor 線程並行運行;
  3. 一個 executor 線程指定多少個 task 並行運行。

通常來講,並行度設置越高,topology 運行的效率就越高,可是也不能一股腦地給出一個很高的值,還得考慮給每一個 worker 分配的內存的大小,還得平衡系統的硬件資源,以免浪費。
Storm 集羣能夠運行一個或多個 topology,而每一個 topology 包含一個或多個 worker 進程,每一個 worer 進程可派生一個或多個 executor 線程,而每一個 executor 線程則派生一個或多個 task,task 是實際的數據處理單元,也是 Storm 概念裏最小的工做單元, spout 或 bolt 的實例即是由 task 承載。


worker executor task 的關係

爲了更好地解釋 worker、executor 和 task 之間的工做機制,咱們用官網的一個簡單 topology 示例來介紹。先看此 topology 的配置:

Config conf = new Config();
conf.setNumWorkers(2); // 爲此 topology 配置兩個 worker 進程

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // blue-spout 並行度=2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) // green-bolt 並行度=2
               .setNumTasks(4)  // 爲此 green-bolt 配置 4 個 task
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)  // yellow-bolt 並行度=6
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
    "mytopology",
    conf,
    topologyBuilder.createTopology()
);

從上面的代碼能夠知道:

  • 這個 topology 裝備了2個 worker 進程,也就是一樣的工做會有 2 個進程並行進行,能夠確定地說,2個 worker 確定比1個 worker 執行效率要高不少,可是並無2倍的差距;
  • 配置了一個 blue-spout,而且爲其指定了 2 個 executor,即並行度爲2;
  • 配置了一個 green-bolt,而且爲其指定了 2 個 executor,即並行度爲2;
  • 配置了一個 yellow-bolt,而且爲其指定了 6 個 executor,即並行度爲6;

你們看官方給出的下圖:


一個 topology 的結構圖示

能夠看出,這個圖片完好無損地還原了代碼裏設定的 topology 結構:

  • 圖左最大的灰色方框,表示這個 topology;
  • topology 裏面恰好有兩個白色方框,表示2個 worker 進程;
  • 每一個 worker 裏面的灰色方框表示 executor 線程,能夠看到2個 worker 方框裏各有5個 executor,爲何呢?由於代碼裏面指定的 spout 並行度=2,green-bolt並行度=2,yellow-bolt並行度=6,加起來恰好是10,而配置的 worker 數量爲2,那麼天然地,這10個 executor 會均勻地分配到2個 worker 裏面;
  • 每一個 executor 裏面的黃藍綠(寫着Task)的方框,就是最小的處理單元 task 了。你們仔細看綠色的 Task 方框,與其餘 Task 不一樣的是,兩個綠色方框同時出如今一個 executor 方框內。爲何會這樣呢?你們回到上文看 topology 的定義代碼,topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4),這裏面的 setNumTasks(4) 表示爲該 green-bolt 指定了4個 task,且 executor 的並行度爲2,那麼天然地,這4個 task 會均勻地分配到2個 executor 裏面;
  • 圖右的三個圓圈,依次是藍色的 blue-spout,綠色的 green-bolt 和黃色的 yellow-bolt,而且用箭頭指示了三個組件之間的關係。spout 是數據的產生元件,而 green-bolt 則是數據的中間接收節點,yellow-bolt 則是數據的最後接收節點。這也是 DAG 的體現,有向的(箭頭不能往回走)無環圖。

參考
http://storm.apache.org/releases/1.0.1/Understanding-the-parallelism-of-a-Storm-topology.html
http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/

一個 topology 的代碼較完整例子
TopologyBuilder builder = new TopologyBuilder();

BrokerHosts hosts       = new ZkHosts(zkConns);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, clintId);
spoutConfig.scheme      = new SchemeAsMultiScheme(new StringScheme());

/** 指示 kafkaSpout 從 kafka topic 最後記錄的 ofsset 開始讀取數據 */
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, 3); // spout 並行度=3
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); // FilterBolt 並行度=3
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); // AlertBolt 並行度=3

Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(3); // 爲此 topology 配置3個 worker 進程
conf.setMaxSpoutPending(10000);

try {
    StormSubmitter.submitTopologyWithProgressBar(topology, conf, builder.createTopology());
} catch (Exception e) {
    e.printStackTrace();
}

Storm 消息機制


Storm 主要提供了兩種消息保證機制(Message Processing Guarantee)

  • 至少一次 At least once
  • 僅且一次 exactly once

其中 exactly once 是經過 Trident 方式實現的(exactly once through Trident)。兩種模式的選擇要視業務狀況而定,有些場景要求精確的僅且一次消費,好比訂單處理,決不能容許重複的處理訂單,由於極可能會致使訂單金額、交易手數等計算錯誤;有些場景容許必定的重複,好比頁面點擊統計,訪客統計等。總之,無論何種模式,Storm 都能保證數據不會丟失,開發者須要關心的是,如何保證數據不會重複消費。

At least once 的消息處理機制,在運用時須要格外當心,Storm 採用 ack/fail 機制來追蹤消息的流向,當一個消息(tuple)發送到下游時,若是超時未通知 spout,或者發送失敗,Storm 默認會根據配置策略進行重發,可經過調節重發策略來儘可能減小消息的重複發送。一個常見狀況是,Storm 集羣常常會超負載運行,致使下游的 bolt 未能及時 ack,從而致使 spout 不斷的重發一個 tuple,進而致使消息大量的重複消費。

在與 Kafka 集成時,經常使用 Storm 提供的 kafkaSpout 做爲 spout 消費 kafka 中的消息。Storm 提供的 kafkaSpout 默認有兩種實現方式:至少一次消費的 core Storm spouts 和僅且一次消費的 Trident spouts :(We support both Trident and core Storm spouts)。

在 Storm 裏面,消息的處理,經過兩個組件進行:spout 和 bolt。其中 spout 負責產生數據,bolt 負責接收並處理數據,業務邏輯代碼通常都寫入 bolt 中。能夠定義多個 bolt ,bolt 與 bolt 之間能夠指定單向連接關係。一般的做法是,在 spout 裏面讀取諸如 kafka,mysql,redis,elasticsearch 等數據源的數據,併發射(emit)給下游的 bolt,定義多個 bolt,分別進行多個不一樣階段的數據處理,好比第一個 bolt 負責過濾清洗數據,第二個 bolt 負責邏輯計算,併產生最終運算結果,寫入 redis,mysql,hdfs 等目標源。

Storm 將消息封裝在一個 Tuple 對象裏,Tuple 對象經由 spout 產生後經過 emit() 方法發送給下游 bolt,下游的全部 bolt 也一樣經過 emit() 方法將 tuple 傳遞下去。一個 tuple 多是一行 mysql 記錄,也多是一行文件內容,具體視 spout 如何讀入數據源,並如何發射給下游。

以下圖,是一個 spout/bolt 的執行過程:


spout/bolt 的執行過程

spout -> open(pending狀態) -> nextTuple -> emit -> bolt -> execute -> ack(spout) / fail(spout) -> message-provider 將該消息移除隊列(complete) / 將消息從新壓回隊列

ACK/Fail

上文說到,Storm 保證了數據不會丟失,ack/fail 機制即是實現此機制的法寶。Storm 在內部構建了一個 tuple tree 來表示每個 tuple 的流向,當一個 tuple 被 spout 發射給下游 bolt 時,默認會帶上一個 messageId,能夠由代碼指定但默認是自動生成的,當下遊的 bolt 成功處理 tuple 後,會經過 acker 進程通知 spout 調用 ack 方法,當處理超時或處理失敗,則會調用 fail 方法。當 fail 方法被調用,消息可能被重發,具體取決於重發策略的配置,和所使用的 spout。

對於一個消息,Storm 提出了『徹底處理』的概念。即一個消息是否被徹底處理,取決於這個消息是否被 tuple tree 裏的每個 bolt 徹底處理,當 tuple tree 中的全部 bolt 都徹底處理了這條消息後,纔會通知 acker 進程並調用該消息的原始發射 spout 的 ack 方法,不然會調用 fail 方法。

ack/fail 只能由建立該 tuple 的 task 所承載的 spout 觸發

默認狀況下,Storm 會在每一個 worker 進程裏面啓動1個 acker 線程,覺得 spout/bolt 提供 ack/fail 服務,該線程一般不太耗費資源,所以也無須配置過多,大多數狀況下1個就足夠了。


ack/fail 示意
Worker 間通訊

上文所說是在一個 worker 內的狀況,可是 Storm 是一個分佈式的並行計算框架,而實現並行的一個關鍵方式,即是一個 topology 能夠由多個 worker 進程分佈在多個 supervisor 節點並行地執行。那麼,多個 worker 之間必然是會有通訊機制的。nimbus 和 supervsor 之間僅靠 zookeeper 進行溝通,那麼爲什麼 worker 之間不經過 zookeeper 之類的中間件進行溝通呢?其中的一個緣由我想,應該是組件隔離的原則。worker 是 supervisor 管理下的一個進程,那麼 worker 若是也採用 zookeeper 進行溝通,那麼就有一種越級操做的嫌疑了。


Worker 間通訊

你們看上圖,一個 worker 進程裝配了以下幾個元件:

  • 一個 receive 線程,該線程維護了一個 ArrayList,負責接收其餘 worker 的 sent 線程發送過來的數據,並將數據存儲到 ArrayList 中。數據首先存入 receive 線程的一個緩衝區,可經過 topology.receiver.buffer.size (此項配置在 Storm 1.0 版本之後被刪除了)來配置該緩衝區存儲消息的最大數量,默認爲8(個數,而且得是2的倍數),而後才被推送到 ArrayList 中。receive 線程接收數據,是經過監聽 TCP的端口,該端口有 storm 配置文件中 supervisor.slots.prots 來配置,好比 6700;
  • 一個 sent 線程,該線程維護了一個消息隊列,負責將隊裏中的消息發送給其餘 worker 的 receive 線程。一樣具備緩衝區,可經過 topology.transfer.buffer.size 來配置緩衝區存儲消息的最大數量,默認爲1024(個數,而且得是2的倍數)。當消息達到此閾值時,便會被髮送到 receive 線程中。sent 線程發送數據,是經過一個隨機分配的TCP端口來進行的。
  • 一個或多個 executor 線程。executor 內部一樣擁有一個 receive buffer 和一個 sent buffer,其中 receive buffer 接收來自 receive 線程的的數據,sent buffer 向 sent 線程發送數據;而 task 線程則介於 receive buffer 和 sent buffer 之間。receive buffer 的大小可經過 Conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE 參數配置,sent buffer 的大小可經過 Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE 配置,兩個參數默認都是 1024(個數,而且得是2的倍數)。
Config conf = new Config(); conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // 默認8 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

參考
http://storm.apache.org/releases/1.0.1/Guaranteeing-message-processing.html
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

Storm UI 解析


首頁
  • Cluster Summary

    Cluster Summary
參數名 說明
Supervisors 集羣中配置的 supervisor 數量
Used slots 集羣中已用掉的 workers 數量
Free slots 集羣中空閒的 workers 數量
Total slots 集羣中總的的 workers 數量
Executors 當前集羣中總的 Executor 線程數量,該值等於集羣中全部 topology 的全部 spout/bolt 配置的 executor 數量之和,其中默認狀況下每一個 worker 進程還會派生一個 acker executor 線程,也一併計算在內了
Tasks 當前集羣中總的 task 數量,也是全部 executor 派生的 task 數量之和
  • Nimbus Summary
    比較簡單,就略過了

  • Topology Summary


    Topology Summary

這部分也比較簡單,值得注意的是 Assigned Mem (MB),這裏值得是分配給該 topolgoy 下全部 worker 工做內存之和,單個 worker 的內存配置可由 Config.WORKER_HEAP_MEMORY_MB 和 Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB 指定,默認爲 768M,另外再加上默認 64M 的 logwritter 進程內存空間,則有 832M。
此處 fast-pay 的值爲 2496M = 3*832

  • Supervisor Summary

    Supervisor Summary

此處也比較簡單,值得注意的是 slot 和 used slot 分別表示當前節點總的可用 worker 數,及已用掉的 worker 數。

  • Nimbus Configuration

    Nimbus Configuration

可搜索和查看當前 topology 的各項配置參數

topology 頁面
  • Topology summary

    Topology summary

此處的大部分配置與上文中出現的意義同樣,值得注意的是
Num executors 和 Num tasks 的值。其中 Num executors 的數量等於當前 topology 下全部 spout/bolt 的並行度總和,再加上全部 worker 下的 acker executor 線程總數(默認狀況下一個 worker 派生一個 acker executor)。

  • Topology actions

    Topology actions
按鈕 說明
Activate 激活此 topology
Deactivate 暫停此 topology 運行
Rebalance 調整並行度並從新平衡資源
Kill 關閉並刪除此 topology
Debug 調試此 topology 運行,須要設置 topology.eventlogger.executors 數量 > 0
Stop Debug 中止調試
Change Log Level 調整日誌級別
  • Topology stats

    Topology stats
參數 說明
Window 時間窗口,好比"10m 0s"表示在topology啓動後10m 0s以內
Emitted 此時間窗口內發射的總tuple數
Transferred 此時間窗口內成功轉移到下一個bolt的tuple數
Complete latency (ms) 此時間窗口內每一個tuple在tuple tree中徹底處理所花費的平均時間
Acked 此時間窗口內成功處理的tuple數
Failed 此時間窗口內處理失敗或超時的tuple數
  • Spouts (All time)

    Spouts (All time)
參數 說明
Id topologoy 中 spout 的名稱,通常是在代碼裏設置的
Executors 當前分配給此 spout 的 executor 線程總數
Tasks 當前分配給此 spout 的 task 線程總數
Emitted 截止當前發射的總tuple數
Transferred 截止當前成功轉移到下一個bolt的tuple數
Complete latency (ms) 截止當前每一個tuple在tuple tree中徹底處理所花費的平均時間
Acked 截止當前成功處理的tuple數
Failed 截止當前處理失敗或超時的tuple數
  • Bolts (All time)

    Bolts (All time)
參數 說明
Id topologoy 中 bolt 的名稱,通常是在代碼裏設置的
Executors 當前分配給此 bolt 的 executor 線程總數
Tasks 當前分配給此 bolt 的 task 線程總數
Emitted 截止當前發射的總tuple數
Transferred 截止當前成功轉移到下一個bolt的tuple數
Complete latency (ms) 截止當前每一個tuple在tuple tree中徹底處理所花費的平均時間
Capacity (last 10m) 性能指標,取值越小越好,當接近1的時候,說明負載很嚴重,須要增長並行度,正常是在 0.0x 到 0.1 0.2 左右。該值計算方式爲 (number executed * average execute latency) / measurement time
Execute latency (ms) 截止當前成功處理的tuple數
Executed 截止當前處理過的tuple數
Process latency (ms) 截止當前單個 tuple 的平均處理時間,越小越好,正常也是 0.0x 級別;若是很大,能夠考慮增長並行度,但主要以 Capacity 爲準
Acked 截止當前成功處理的tuple數
Failed 截止當前處理失敗或超時的tuple數
spout 頁面

這個頁面,大部分都比較簡單,就不一一說明了,值得注意的是下面這個 Tab:

  • Executors (All time)

    Executors (All time)

這個Tab的參數,應該不用解釋了,可是要注意看,Emitted,Transferred 和 Acked 這幾個參數,看看是否全部的 executor 都均勻地分擔了 tuple 的處理工做。

bolt 頁面

這個頁面與 spout 頁面相似,也不贅述了。

參考:這個頁面經過 API 的方式,對 UI 界面的參數作了一些解釋
http://storm.apache.org/releases/1.0.1/STORM-UI-REST-API.html

Storm debug


Storm 提供了良好的 debug 措施,許多操做能夠再 UI 上完成,也能夠在命令行完成。好比 Change log level 在不重啓 topology 的狀況下動態修改日誌記錄的級別,在 UI 界面上查看某個 bolt 的日誌等,固然也能夠在命令行上操做。

# 幾個與 debug 相關的命令 bin/storm set_log_level [topology name]-l [logger name]=[LEVEL]:[TIMEOUT] bin/storm logviewer &

下面的參考文章寫的很詳細,你們有興趣能夠去閱讀一下,本文就再也不討論了。

參考
https://community.hortonworks.com/articles/36151/debugging-an-apache-storm-topology.html

性能調優


上文說了這麼多,這才進入主題。

一、合理地配置硬件資源

此處暫不討論

二、優化代碼的執行性能

要優化代碼的性能,若是嚴謹一點,首先要有一個衡量代碼執行效率的方式。在數學上,一般使用大O函數來衡量一個算法的時間複雜度。咱們能夠考慮使用大O函數來近似地估計一個代碼片斷的執行時間:假定一行代碼花費1個單位時間,那麼代碼片斷的時間複雜度能夠近似地用大O表示爲 O(n),其中n表示代碼的行數或執行次數。固然,若是代碼裏引入了其餘的類和函數,或者處於循環體內,那麼其餘類、函數的代碼行數,以及循環體內代碼的重複執行次數也須要統計在內。這裏說到大O函數的概念,在實際中也不多用到,咱們每每會用第三方工具來較爲準確地計算代碼的實際執行時間,可是理解這個概念有助於優化咱們的代碼。有興趣的同窗能夠閱讀《算法概論》這本書。

這裏順便舉一個斐波那契數列的例子:

/** C代碼:用遞歸實現的斐波那契數列 */
int fibonacci(unsigned int n)
{
    if (n <= 0) return 0;
    if (n == 1) return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
/** C代碼:用循環實現的斐波那契數列 */
int fibonacci(unsigned int n)
{
    int r, a, b;
    int i;
    int result[2] = {0, 1};

    if (n < 2) return result[n];

    a = 0;
    b = 1;
    r = 0;
    for (i = 2; i <= n; i++)
    {
        r = a + b;
        a = b;
        b = r;
    }
    return r;
}

觀看兩個不一樣實現的例子,第一種遞歸的方式,當傳入的n很大時,代碼執行的時間將會呈指數級增加,這時T(n)接近於 O(2^n);第二種循環的方式,即便傳入很大的n,代碼也能夠在較短的時間內執行完畢,這時T(n)接近於O(n),爲何是O(n)呢,好比說n=1000,那麼整個算法的執行時間基本集中在那個 for 循環裏了,至關於執行了 for 循環內3行代碼1000屢次,因此差很少是n。這其實就是一種用空間換時間的概念,利用循環代替遞歸的方式,從而大大地優化了代碼的執行效率。

回到咱們的 Storm。代碼優化,歸結起來,應該有這幾種:

  • 在算法層面進行優化
  • 在業務邏輯層面進行優化
  • 在技術層面進行優化
  • 特定於 Storm,合理地規劃 topology,即安排多少個 bolt,每一個 bolt 作什麼,連接關係如何

在技術層面進行優化,手法就很是多了,好比鏈接數據庫時,運用鏈接池,經常使用的鏈接池有 alibaba 的 druid,還有 redis 的鏈接池;好比合理地使用多線程,合理地優化JVM參數等等。這裏舉一個工做中可能會遇到的例子來介紹一下:

在配置了多個並行度的 bolt 中,存取 redis 數據時,若是不使用 redis 線程池,那麼極可能會遇到 topology 運行緩慢,spout 不斷重發,甚至直接掛掉的狀況。首先 redis 的單個實例並非線程安全的,其次在不使用 redis-pool 的狀況下,每次讀取 redis 都建立一個 redis 鏈接,同建立一個 mysql 鏈接同樣,在鏈接 redis 時所耗費的時間相較於 get/set 自己是很是巨大的。

/**
 * redis-cli 操做工具類
 */
package net.mtide.dbtool;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisCli {

    private static JedisPool pool = null;

    private final static Logger logger = LoggerFactory.getLogger(FilterBolt.class);

    /**
     * 同步初始化 JedisPool
     */
    private static synchronized void initPool() {
        if (pool == null) {
            String hosts = "HOST";
            String port  = "PORT";
            String pass  = "PASS";
            pool = new JedisPool(new JedisPoolConfig(), hosts, Integer.parseInt(port), 2000, pass);
        }
    }

    /**
     * 將鏈接 put back to pool
     * 
     * @param jedis
     */
    private static void returnResource(final Jedis jedis) {
        if (pool != null && jedis != null) {
            pool.returnResource(jedis);
        }
    }

    /**
     * 同步獲取 Jedis 實例
     * 
     * @return Jedis
     */
    public synchronized static Jedis getJedis() {
        if (pool == null) {
            initPool();
        }
        return pool.getResource();
    }

    public static void set(final String key, final String value) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key, value);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
    }

    public static void set(final String key, final String value, final int seconds) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key, value);
            jedis.expire(key, seconds);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
    }

    public static String get(final String key) {
        String value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.get(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static List<String> mget(final String... keys) {
        List<String> value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.mget(keys);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static Long del(final String key) {
        Long value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.del(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static Long expire(final String key, final int seconds) {
        Long value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.expire(key, seconds);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static Long incr(final String key) {
        Long value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.incr(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }
}
View Code

當一個配置了多個並行度的 topology 運行在集羣上時,若是 redis 操做不當,極可能會形成運行該 redis 的 bolt 長時間阻塞,從而形成 tuple 傳遞超時,默認狀況下 spout 在 fail 後會重發該 tuple,然而 redis 阻塞的問題沒有解決,重發不只不能解決問題,反而會加劇集羣的運行負擔,那麼 spout 重發愈來愈多,fail 的次數也愈來愈多, 最終致使數據重複消費愈來愈嚴重。上面貼出來的 RedisCli 工具類,能夠在多線程的環境下安全的使用 redis,從而解決了阻塞的問題。

三、合理的配置並行度

有幾個手段能夠配置 topology 的並行度:

  • conf.setNumWorkers() 配置 worker 的數量
  • builder.setBolt("NAME", new Bolt(), 並行度) 設置 executor 數量
  • spout/bolt.setNumTask() 設置 spout/bolt 的 task 數量

如今回到咱們的一開始的場景假設:

項目組部署了3臺機器,計劃運行一個 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小規模實驗集羣,每臺機器的配置爲 2CPUs,4G RAM

/** 初始配置 */ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new kafkaSpout(), 3); builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); Config conf = new Config(); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那麼問題是:

  1. setNumWorkers 應該取多少?取決於哪些因素?
  2. kafkaSpout 的並行度應該取多少?取決於哪些因素?
  3. FilterBolt 的並行度應該取多少?取決於哪些因素?
  4. AlertBolt 的並行度應該取多少?取決於哪些因素?
  5. FilterBolt 用 shuffleGrouping 是最好的嗎?
  6. AlertBolt 用 fieldsGrouping 是最好的嗎?

回答以下:
第一個問題:關於 worker 的並行度:worker 能夠分配到不一樣的 supervisor 節點,這也是 Storm 實現多節點並行計算的主要配置手段。據此, workers 的數量,能夠說是越多越好,但也不能形成浪費,並且也要看硬件資源是否足夠。因此主要考慮集羣各節點的內存狀況:默認狀況下,一個 worker 分配 768M 的內存,外加 64M 給 logwriter 進程;所以一個 worker 會耗費 832M 內存;題設的集羣有3個節點,每一個節點4G內存,除去 linux 系統、kafka、zookeeper 等的消耗,保守估計僅有2G內存可用來運行 topology,由此可知,當集羣只有一個 topology 在運行的狀況下,最多能夠配置6個 worker。
另外,咱們還能夠調節 worker 的內存空間。這取決於流過 topology 的數據量的大小以及各 bolt 單元的業務代碼的執行時間。若是數據量特別大,代碼執行時間較長,那麼能夠考慮增長單個 worker 的工做內存。有一點須要注意的是,一個 worker 下的全部 executor 和 task 都是共享這個 worker 的內存的,也就是假如一個 worker 分配了 768M 內存,3個 executor,6個 task,那麼這個 3 executor 和 6 task 實際上是共用這 768M 內存的,可是好處是能夠充分利用多核 CPU 的運算性能。

總結起來,worker 的數量,取值因素有:

  • 節點數量,及其內存容量
  • 數據量的大小和代碼執行時間

機器的CPU、帶寬、磁盤性能等也會對 Storm 性能有影響,可是這些外在因素通常不影響 worker 數量的決策。

須要注意的是,Storm 在默認狀況下,每一個 supervisor 節點只容許最多4個 worker(slot)進程運行;若是所配置的 worker 數量超過這個限制,則須要在 storm 配置文件中修改。

第二個問題:關於 FilterBolt 的並行度:若是 spout 讀取的是 kafka 的數據,那麼正常狀況下,設置爲 topic 的分區數量便可。計算 kafkaSpout 的最佳取值,有一個最簡單的辦法,就是在 Storm UI裏面,點開 topology 的首頁,在 Spouts (All time) 下,查看如下幾個參數的值:

  • Emitted 已發射出去的tuple數
  • Transferred 已轉移到下一個bolt的tuple數
  • Complete latency (ms) 每一個tuple在tuple tree中徹底處理所花費的平均時間
  • Acked 成功處理的tuple數
  • Failed 處理失敗或超時的tuple數

Paste_Image.png

怎麼看這幾個參數呢?有幾個技巧:

  • 正常狀況下 Failed 值爲0,若是不爲0,考慮增長該 spout 的並行度。這是最重要的一個判斷依據;
  • 正常狀況下,Emitted、Transferred和Acked這三個值應該是相等或大體相等的,若是相差太遠,要麼該 spout 負載過重,要麼下游負載太重,須要調節該 spout 的並行度,或下游 bolt 的並行度;
  • Complete latency (ms) 時間,若是很長,十秒以上就已經算很長的了。固然具體時間取決於代碼邏輯,bolt 的結構,機器的性能等。

kafka 只能保證同一分區下消息的順序性,當 spout 配置了多個 executor 的時候,不一樣分區的消息會均勻的分發到不一樣的 executor 上消費,那麼消息的總體順序性就難以保證了,除非將 spout 並行度設爲 1

第三個問題:關於 FilterBolt 的並行度:其取值也有一個簡單辦法,就是在 Storm UI裏面,點開 topology 的首頁,在 Bolts (All time) 下,查看如下幾個參數的值:

  • Capacity (last 10m) 取值越小越好,當接近1的時候,說明負載很嚴重,須要增長並行度,正常是在 0.0x 到 0.1 0.2 左右
  • Process latency (ms) 單個 tuple 的平均處理時間,越小越好,正常也是 0.0x 級別;若是很大,能夠考慮增長並行度,但主要以 Capacity 爲準

Paste_Image.png

通常狀況下,按照該 bolt 的代碼時間複雜度,設置一個 spout 並行度的 1-3倍便可。

第四個問題:AlertBolt 的並行度同 FilterBolt。

第五個問題:shuffleGrouping 會將 tuple 均勻地隨機分發給下游 bolt,通常狀況下用它就是最好的了。

總之,要找出並行度的最佳取值,主要結合 Storm UI 來作決策。

四、優化配置參數
/** tuple發送失敗重試策略,通常狀況下不須要調整 */ spoutConfig.retryInitialDelayMs = 0; spoutConfig.retryDelayMultiplier = 1.0; spoutConfig.retryDelayMaxMs = 60 * 1000; /** 此參數比較重要,可適當調大一點 */ /** 一般狀況下 spout 的發射速度會快於下游的 bolt 的消費速度,當下遊的 bolt 還有 TOPOLOGY_MAX_SPOUT_PENDING 個 tuple 沒有消費完時,spout 會停下來等待,該配置做用於 spout 的每一個 task。 */ conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10000) /** 調整分配給每一個 worker 的內存,關於內存的調節,上文已有描述 */ conf.put(Config.WORKER_HEAP_MEMORY_MB, 768); conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 768); /** 調整 worker 間通訊相關的緩衝參數,如下是一種推薦的配置 */ conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); // 1.0 以上已移除 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

能夠在 Storm UI 上查看當前集羣的 Topology Configuration

五、rebalance

能夠直接採用 rebalance 命令(也能夠在 Storm UI上操做)從新配置 topology 的並行度:

storm rebalance TOPOLOGY-NAME -n 5 -e SPOUT/BOLT1-NAME=3 -e SPOUT/BOLT2-NAME=10
相關文章
相關標籤/搜索