在介紹 Storm 的性能調優方法以前,假設一個場景:
項目組部署了3臺機器,計劃運行且僅運行 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小規模實驗集羣,集羣的配置狀況以下表:html
主機名 | 硬件配置 | 角色描述 |
---|---|---|
hd01 | 2CPUs, 4G RAM, 2TB 機械硬盤 | nimbus, supervisor, ui, kafka, zk |
hd02 | 2CPUs, 4G RAM, 2TB 機械硬盤 | supervisor, kafka, zk |
hd03 | 2CPUs, 4G RAM, 2TB 機械硬盤 | supervisor, kafka, zk |
現有一個任務,須要實時計算訂單的各項彙總統計信息。訂單數據經過 kafka 傳輸。在 Storm 中建立了一個 topology 來執行此項任務,並採用 Storm kafkaSpout 讀取該 topic 的數據。kafka 和 Storm topology 的基本信息以下:java
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new kafkaSpout(), 3); builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); Config conf = new Config(); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());
那麼,在此假設下,Storm topology 的數據怎麼分發?性能如何調優?這就是下文要討論的內容,其中性能調優是最終目的,數據分發即 Storm 的消息機制,則是進行調優前的知識儲備。mysql
Storm topology 的性能優化方法,總體來講,可依次劃分爲如下幾個步驟:linux
其中第一點不是討論的重點,無外乎增長機器的硬件資源,提升機器的硬件配置等,可是這一步卻也不能忽略,由於機器配置過低,極可能後面的步驟怎麼調優都無濟於事。redis
Storm 的一些特性和原理,是進行調優的必要知識儲備算法
目前 Storm 的最新版本爲 2.0.0-SNAPSHOT。該版本太新,未通過大量驗證和測試,所以本文的討論都基於 2.0 之前的版本。Storm 有以下幾個重要的特性:sql
瞭解這些機制對優化 Storm 的運行性能有必定幫助數據庫
Storm 是一個分佈式的實時計算軟件,各節點,各組件間的通訊依賴於 zookeeper。從組件的角度看,Storm 運做機制構建在 nimbus, supervisor, woker, executor, task, spout/bolt 之上,若是再加上 topology,有時也能夠稱這些組件爲 Concepts(概念)。詳見官網介紹文章 http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html 和
http://storm.apache.org/releases/current/Tutorial.htmlapache
在介紹 Storm 並行度以前,先歸納地瞭解下 Storm 的幾個概念,此處假定讀者有必定的 Storm 背景知識,至少曾經跑過一個 topology 實例。api
nimbus 是 Storm 集羣的管理和調度進程,一個集羣啓動一個 nimbus,主要用於管理 topology,執行rebalance,管理 supervisor,分發 task,監控集羣的健康情況等。nimbus 依賴 zookeeper 來實現上述職責,nimbus 與 supervisor 等其餘組件並無直接的溝通。運行 nimbus 的節點成爲主節點,運行 supervisor 的節點成爲工做節點,nimbus 向 supervisor 分派任務,所以 Storm 集羣也是一個 master/slave 集羣。簡單來講,nimbus 就是工頭,supervisor 就是工人,nimbus 經過 zookeeper 來管理 supervisor。
supervisor 是一個工做進程,負責監聽 nimbus 分派的任務。當它接到任務後,會啓動一個 worker 進程,由 worker 運行 topology 的一個子集。爲何說是子集呢?由於當一個 topology 提交到集羣后,nimbus 便會根據該 topology 的配置(此處假定 numWorker=3),將 topology 分配給3個 worker 並行執行(正常狀況下是這樣,也有不是均勻分配的,好比有一個 supervisor 節點內存不足了)。若是恰好集羣有3個 supervisor,則每一個 supervisor 會啓動1個 worker,即一個節點啓動一個 worker(一個節點只能有一個 supervisor 有效運行)。所以,worker 進程運行的是 topology 的一個子集。supervisor 一樣經過 zookeeper 與 nimbus 進行交流,所以 nimbus 和 supervisor 均可以快速失敗/中止,由於全部的狀態信息都保存在本地文件系統的 zookeeper 中, 當失敗中止運行後,只須要從新啓動 nimbus 或 supervisor 進程以快速恢復。固然,若是集羣中正在工做的 supervisor 中止了,其上運行着的 topology 子集也會跟着中止,不過一旦 supervisor 啓動起來,topology 子集又馬上恢復正常了。
worker 是一個JVM進程,由 supervisor 啓動和關閉。當 supervisor 接到任務後,會根據 topology 的配置啓動若干 worker,實際的任務執行便由 worker 進行。worker 進程會佔用固定的可由配置進行修改的內存空間(默認768M)。一般使用 conf.setNumWorkers() 函數來指定一個 topolgoy 的 worker 數量。
executor 是一個線程,由 worker 進程派生(spawned)。executor 線程負責根據配置派生 task 線程,默認一個 executor 建立一個 task,可經過 setNumTask() 函數指定每一個 executor 的 task 數量。executor 將實例化後的 spout/bolt 傳遞給 task。
task 能夠說是 topology 最終的實際的任務執行者,每一個 task 承載一個 spout 或 bolt 的實例,並調用其中的 spout.nexTuple(),bolt.execute() 等方法,而 spout.nexTuple() 是數據的發射器,bolt.execute() 則是數據的接收方,業務邏輯的代碼基本上都在這兩個函數裏面處理了,所以能夠說 task 是最終搬磚的苦逼。
topology 中文翻譯爲拓撲,相似於 hdfs 上的一個 mapreduce 任務。一個 topology 定義了運行一個 Storm 任務的全部必要元件,主要包括 spout 和 bolt,以及 spout 和 bolt 之間的流向關係。
什麼是並行度?在 Storm 的設定裏,並行度大致分爲3個方面:
通常來講,並行度設置越高,topology 運行的效率就越高,可是也不能一股腦地給出一個很高的值,還得考慮給每一個 worker 分配的內存的大小,還得平衡系統的硬件資源,以免浪費。
Storm 集羣能夠運行一個或多個 topology,而每一個 topology 包含一個或多個 worker 進程,每一個 worer 進程可派生一個或多個 executor 線程,而每一個 executor 線程則派生一個或多個 task,task 是實際的數據處理單元,也是 Storm 概念裏最小的工做單元, spout 或 bolt 的實例即是由 task 承載。
爲了更好地解釋 worker、executor 和 task 之間的工做機制,咱們用官網的一個簡單 topology 示例來介紹。先看此 topology 的配置:
Config conf = new Config(); conf.setNumWorkers(2); // 爲此 topology 配置兩個 worker 進程 topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // blue-spout 並行度=2 topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) // green-bolt 並行度=2 .setNumTasks(4) // 爲此 green-bolt 配置 4 個 task .shuffleGrouping("blue-spout"); topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6) // yellow-bolt 並行度=6 .shuffleGrouping("green-bolt"); StormSubmitter.submitTopology( "mytopology", conf, topologyBuilder.createTopology() );
從上面的代碼能夠知道:
你們看官方給出的下圖:
能夠看出,這個圖片完好無損地還原了代碼裏設定的 topology 結構:
參考
http://storm.apache.org/releases/1.0.1/Understanding-the-parallelism-of-a-Storm-topology.html
http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/
TopologyBuilder builder = new TopologyBuilder(); BrokerHosts hosts = new ZkHosts(zkConns); SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, clintId); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); /** 指示 kafkaSpout 從 kafka topic 最後記錄的 ofsset 開始讀取數據 */ spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); builder.setSpout("kafkaSpout", kafkaSpout, 3); // spout 並行度=3 builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); // FilterBolt 並行度=3 builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); // AlertBolt 並行度=3 Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(3); // 爲此 topology 配置3個 worker 進程 conf.setMaxSpoutPending(10000); try { StormSubmitter.submitTopologyWithProgressBar(topology, conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); }
Storm 主要提供了兩種消息保證機制(Message Processing Guarantee)
其中 exactly once 是經過 Trident 方式實現的(exactly once through Trident)。兩種模式的選擇要視業務狀況而定,有些場景要求精確的僅且一次消費,好比訂單處理,決不能容許重複的處理訂單,由於極可能會致使訂單金額、交易手數等計算錯誤;有些場景容許必定的重複,好比頁面點擊統計,訪客統計等。總之,無論何種模式,Storm 都能保證數據不會丟失,開發者須要關心的是,如何保證數據不會重複消費。
At least once 的消息處理機制,在運用時須要格外當心,Storm 採用 ack/fail 機制來追蹤消息的流向,當一個消息(tuple)發送到下游時,若是超時未通知 spout,或者發送失敗,Storm 默認會根據配置策略進行重發,可經過調節重發策略來儘可能減小消息的重複發送。一個常見狀況是,Storm 集羣常常會超負載運行,致使下游的 bolt 未能及時 ack,從而致使 spout 不斷的重發一個 tuple,進而致使消息大量的重複消費。
在與 Kafka 集成時,經常使用 Storm 提供的 kafkaSpout 做爲 spout 消費 kafka 中的消息。Storm 提供的 kafkaSpout 默認有兩種實現方式:至少一次消費的 core Storm spouts 和僅且一次消費的 Trident spouts :(We support both Trident and core Storm spouts)。
在 Storm 裏面,消息的處理,經過兩個組件進行:spout 和 bolt。其中 spout 負責產生數據,bolt 負責接收並處理數據,業務邏輯代碼通常都寫入 bolt 中。能夠定義多個 bolt ,bolt 與 bolt 之間能夠指定單向連接關係。一般的做法是,在 spout 裏面讀取諸如 kafka,mysql,redis,elasticsearch 等數據源的數據,併發射(emit)給下游的 bolt,定義多個 bolt,分別進行多個不一樣階段的數據處理,好比第一個 bolt 負責過濾清洗數據,第二個 bolt 負責邏輯計算,併產生最終運算結果,寫入 redis,mysql,hdfs 等目標源。
Storm 將消息封裝在一個 Tuple 對象裏,Tuple 對象經由 spout 產生後經過 emit() 方法發送給下游 bolt,下游的全部 bolt 也一樣經過 emit() 方法將 tuple 傳遞下去。一個 tuple 多是一行 mysql 記錄,也多是一行文件內容,具體視 spout 如何讀入數據源,並如何發射給下游。
以下圖,是一個 spout/bolt 的執行過程:
spout -> open(pending狀態) -> nextTuple -> emit -> bolt -> execute -> ack(spout) / fail(spout) -> message-provider 將該消息移除隊列(complete) / 將消息從新壓回隊列
上文說到,Storm 保證了數據不會丟失,ack/fail 機制即是實現此機制的法寶。Storm 在內部構建了一個 tuple tree 來表示每個 tuple 的流向,當一個 tuple 被 spout 發射給下游 bolt 時,默認會帶上一個 messageId,能夠由代碼指定但默認是自動生成的,當下遊的 bolt 成功處理 tuple 後,會經過 acker 進程通知 spout 調用 ack 方法,當處理超時或處理失敗,則會調用 fail 方法。當 fail 方法被調用,消息可能被重發,具體取決於重發策略的配置,和所使用的 spout。
對於一個消息,Storm 提出了『徹底處理』的概念。即一個消息是否被徹底處理,取決於這個消息是否被 tuple tree 裏的每個 bolt 徹底處理,當 tuple tree 中的全部 bolt 都徹底處理了這條消息後,纔會通知 acker 進程並調用該消息的原始發射 spout 的 ack 方法,不然會調用 fail 方法。
ack/fail 只能由建立該 tuple 的 task 所承載的 spout 觸發
默認狀況下,Storm 會在每一個 worker 進程裏面啓動1個 acker 線程,覺得 spout/bolt 提供 ack/fail 服務,該線程一般不太耗費資源,所以也無須配置過多,大多數狀況下1個就足夠了。
上文所說是在一個 worker 內的狀況,可是 Storm 是一個分佈式的並行計算框架,而實現並行的一個關鍵方式,即是一個 topology 能夠由多個 worker 進程分佈在多個 supervisor 節點並行地執行。那麼,多個 worker 之間必然是會有通訊機制的。nimbus 和 supervsor 之間僅靠 zookeeper 進行溝通,那麼爲什麼 worker 之間不經過 zookeeper 之類的中間件進行溝通呢?其中的一個緣由我想,應該是組件隔離的原則。worker 是 supervisor 管理下的一個進程,那麼 worker 若是也採用 zookeeper 進行溝通,那麼就有一種越級操做的嫌疑了。
你們看上圖,一個 worker 進程裝配了以下幾個元件:
Config conf = new Config(); conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // 默認8 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
參考
http://storm.apache.org/releases/1.0.1/Guaranteeing-message-processing.html
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
參數名 | 說明 |
---|---|
Supervisors | 集羣中配置的 supervisor 數量 |
Used slots | 集羣中已用掉的 workers 數量 |
Free slots | 集羣中空閒的 workers 數量 |
Total slots | 集羣中總的的 workers 數量 |
Executors | 當前集羣中總的 Executor 線程數量,該值等於集羣中全部 topology 的全部 spout/bolt 配置的 executor 數量之和,其中默認狀況下每一個 worker 進程還會派生一個 acker executor 線程,也一併計算在內了 |
Tasks | 當前集羣中總的 task 數量,也是全部 executor 派生的 task 數量之和 |
Nimbus Summary
比較簡單,就略過了
Topology Summary
這部分也比較簡單,值得注意的是 Assigned Mem (MB),這裏值得是分配給該 topolgoy 下全部 worker 工做內存之和,單個 worker 的內存配置可由 Config.WORKER_HEAP_MEMORY_MB 和 Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB 指定,默認爲 768M,另外再加上默認 64M 的 logwritter 進程內存空間,則有 832M。
此處 fast-pay 的值爲 2496M = 3*832
此處也比較簡單,值得注意的是 slot 和 used slot 分別表示當前節點總的可用 worker 數,及已用掉的 worker 數。
可搜索和查看當前 topology 的各項配置參數
此處的大部分配置與上文中出現的意義同樣,值得注意的是
Num executors 和 Num tasks 的值。其中 Num executors 的數量等於當前 topology 下全部 spout/bolt 的並行度總和,再加上全部 worker 下的 acker executor 線程總數(默認狀況下一個 worker 派生一個 acker executor)。
按鈕 | 說明 |
---|---|
Activate | 激活此 topology |
Deactivate | 暫停此 topology 運行 |
Rebalance | 調整並行度並從新平衡資源 |
Kill | 關閉並刪除此 topology |
Debug | 調試此 topology 運行,須要設置 topology.eventlogger.executors 數量 > 0 |
Stop Debug | 中止調試 |
Change Log Level | 調整日誌級別 |
參數 | 說明 |
---|---|
Window | 時間窗口,好比"10m 0s"表示在topology啓動後10m 0s以內 |
Emitted | 此時間窗口內發射的總tuple數 |
Transferred | 此時間窗口內成功轉移到下一個bolt的tuple數 |
Complete latency (ms) | 此時間窗口內每一個tuple在tuple tree中徹底處理所花費的平均時間 |
Acked | 此時間窗口內成功處理的tuple數 |
Failed | 此時間窗口內處理失敗或超時的tuple數 |
參數 | 說明 |
---|---|
Id | topologoy 中 spout 的名稱,通常是在代碼裏設置的 |
Executors | 當前分配給此 spout 的 executor 線程總數 |
Tasks | 當前分配給此 spout 的 task 線程總數 |
Emitted | 截止當前發射的總tuple數 |
Transferred | 截止當前成功轉移到下一個bolt的tuple數 |
Complete latency (ms) | 截止當前每一個tuple在tuple tree中徹底處理所花費的平均時間 |
Acked | 截止當前成功處理的tuple數 |
Failed | 截止當前處理失敗或超時的tuple數 |
參數 | 說明 |
---|---|
Id | topologoy 中 bolt 的名稱,通常是在代碼裏設置的 |
Executors | 當前分配給此 bolt 的 executor 線程總數 |
Tasks | 當前分配給此 bolt 的 task 線程總數 |
Emitted | 截止當前發射的總tuple數 |
Transferred | 截止當前成功轉移到下一個bolt的tuple數 |
Complete latency (ms) | 截止當前每一個tuple在tuple tree中徹底處理所花費的平均時間 |
Capacity (last 10m) | 性能指標,取值越小越好,當接近1的時候,說明負載很嚴重,須要增長並行度,正常是在 0.0x 到 0.1 0.2 左右。該值計算方式爲 (number executed * average execute latency) / measurement time |
Execute latency (ms) | 截止當前成功處理的tuple數 |
Executed | 截止當前處理過的tuple數 |
Process latency (ms) | 截止當前單個 tuple 的平均處理時間,越小越好,正常也是 0.0x 級別;若是很大,能夠考慮增長並行度,但主要以 Capacity 爲準 |
Acked | 截止當前成功處理的tuple數 |
Failed | 截止當前處理失敗或超時的tuple數 |
這個頁面,大部分都比較簡單,就不一一說明了,值得注意的是下面這個 Tab:
這個Tab的參數,應該不用解釋了,可是要注意看,Emitted,Transferred 和 Acked 這幾個參數,看看是否全部的 executor 都均勻地分擔了 tuple 的處理工做。
這個頁面與 spout 頁面相似,也不贅述了。
參考:這個頁面經過 API 的方式,對 UI 界面的參數作了一些解釋
http://storm.apache.org/releases/1.0.1/STORM-UI-REST-API.html
Storm 提供了良好的 debug 措施,許多操做能夠再 UI 上完成,也能夠在命令行完成。好比 Change log level 在不重啓 topology 的狀況下動態修改日誌記錄的級別,在 UI 界面上查看某個 bolt 的日誌等,固然也能夠在命令行上操做。
# 幾個與 debug 相關的命令 bin/storm set_log_level [topology name]-l [logger name]=[LEVEL]:[TIMEOUT] bin/storm logviewer &
下面的參考文章寫的很詳細,你們有興趣能夠去閱讀一下,本文就再也不討論了。
參考
https://community.hortonworks.com/articles/36151/debugging-an-apache-storm-topology.html
上文說了這麼多,這才進入主題。
此處暫不討論
要優化代碼的性能,若是嚴謹一點,首先要有一個衡量代碼執行效率的方式。在數學上,一般使用大O函數來衡量一個算法的時間複雜度。咱們能夠考慮使用大O函數來近似地估計一個代碼片斷的執行時間:假定一行代碼花費1個單位時間,那麼代碼片斷的時間複雜度能夠近似地用大O表示爲 O(n),其中n表示代碼的行數或執行次數。固然,若是代碼裏引入了其餘的類和函數,或者處於循環體內,那麼其餘類、函數的代碼行數,以及循環體內代碼的重複執行次數也須要統計在內。這裏說到大O函數的概念,在實際中也不多用到,咱們每每會用第三方工具來較爲準確地計算代碼的實際執行時間,可是理解這個概念有助於優化咱們的代碼。有興趣的同窗能夠閱讀《算法概論》這本書。
這裏順便舉一個斐波那契數列的例子:
/** C代碼:用遞歸實現的斐波那契數列 */ int fibonacci(unsigned int n) { if (n <= 0) return 0; if (n == 1) return 1; return fibonacci(n - 1) + fibonacci(n - 2); }
/** C代碼:用循環實現的斐波那契數列 */ int fibonacci(unsigned int n) { int r, a, b; int i; int result[2] = {0, 1}; if (n < 2) return result[n]; a = 0; b = 1; r = 0; for (i = 2; i <= n; i++) { r = a + b; a = b; b = r; } return r; }
觀看兩個不一樣實現的例子,第一種遞歸的方式,當傳入的n很大時,代碼執行的時間將會呈指數級增加,這時T(n)接近於 O(2^n);第二種循環的方式,即便傳入很大的n,代碼也能夠在較短的時間內執行完畢,這時T(n)接近於O(n),爲何是O(n)呢,好比說n=1000,那麼整個算法的執行時間基本集中在那個 for 循環裏了,至關於執行了 for 循環內3行代碼1000屢次,因此差很少是n。這其實就是一種用空間換時間的概念,利用循環代替遞歸的方式,從而大大地優化了代碼的執行效率。
回到咱們的 Storm。代碼優化,歸結起來,應該有這幾種:
在技術層面進行優化,手法就很是多了,好比鏈接數據庫時,運用鏈接池,經常使用的鏈接池有 alibaba 的 druid,還有 redis 的鏈接池;好比合理地使用多線程,合理地優化JVM參數等等。這裏舉一個工做中可能會遇到的例子來介紹一下:
在配置了多個並行度的 bolt 中,存取 redis 數據時,若是不使用 redis 線程池,那麼極可能會遇到 topology 運行緩慢,spout 不斷重發,甚至直接掛掉的狀況。首先 redis 的單個實例並非線程安全的,其次在不使用 redis-pool 的狀況下,每次讀取 redis 都建立一個 redis 鏈接,同建立一個 mysql 鏈接同樣,在鏈接 redis 時所耗費的時間相較於 get/set 自己是很是巨大的。
/** * redis-cli 操做工具類 */ package net.mtide.dbtool; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisCli { private static JedisPool pool = null; private final static Logger logger = LoggerFactory.getLogger(FilterBolt.class); /** * 同步初始化 JedisPool */ private static synchronized void initPool() { if (pool == null) { String hosts = "HOST"; String port = "PORT"; String pass = "PASS"; pool = new JedisPool(new JedisPoolConfig(), hosts, Integer.parseInt(port), 2000, pass); } } /** * 將鏈接 put back to pool * * @param jedis */ private static void returnResource(final Jedis jedis) { if (pool != null && jedis != null) { pool.returnResource(jedis); } } /** * 同步獲取 Jedis 實例 * * @return Jedis */ public synchronized static Jedis getJedis() { if (pool == null) { initPool(); } return pool.getResource(); } public static void set(final String key, final String value) { Jedis jedis = getJedis(); try { jedis.set(key, value); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } } public static void set(final String key, final String value, final int seconds) { Jedis jedis = getJedis(); try { jedis.set(key, value); jedis.expire(key, seconds); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } } public static String get(final String key) { String value = null; Jedis jedis = getJedis(); try { value = jedis.get(key); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } return value; } public static List<String> mget(final String... keys) { List<String> value = null; Jedis jedis = getJedis(); try { value = jedis.mget(keys); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } return value; } public static Long del(final String key) { Long value = null; Jedis jedis = getJedis(); try { value = jedis.del(key); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } return value; } public static Long expire(final String key, final int seconds) { Long value = null; Jedis jedis = getJedis(); try { value = jedis.expire(key, seconds); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } return value; } public static Long incr(final String key) { Long value = null; Jedis jedis = getJedis(); try { value = jedis.incr(key); } catch (Exception e) { logger.error(e.toString()); } finally { returnResource(jedis); } return value; } }
當一個配置了多個並行度的 topology 運行在集羣上時,若是 redis 操做不當,極可能會形成運行該 redis 的 bolt 長時間阻塞,從而形成 tuple 傳遞超時,默認狀況下 spout 在 fail 後會重發該 tuple,然而 redis 阻塞的問題沒有解決,重發不只不能解決問題,反而會加劇集羣的運行負擔,那麼 spout 重發愈來愈多,fail 的次數也愈來愈多, 最終致使數據重複消費愈來愈嚴重。上面貼出來的 RedisCli 工具類,能夠在多線程的環境下安全的使用 redis,從而解決了阻塞的問題。
有幾個手段能夠配置 topology 的並行度:
如今回到咱們的一開始的場景假設:
項目組部署了3臺機器,計劃運行一個 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小規模實驗集羣,每臺機器的配置爲 2CPUs,4G RAM
/** 初始配置 */ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new kafkaSpout(), 3); builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); Config conf = new Config(); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());
那麼問題是:
回答以下:
第一個問題:關於 worker 的並行度:worker 能夠分配到不一樣的 supervisor 節點,這也是 Storm 實現多節點並行計算的主要配置手段。據此, workers 的數量,能夠說是越多越好,但也不能形成浪費,並且也要看硬件資源是否足夠。因此主要考慮集羣各節點的內存狀況:默認狀況下,一個 worker 分配 768M 的內存,外加 64M 給 logwriter 進程;所以一個 worker 會耗費 832M 內存;題設的集羣有3個節點,每一個節點4G內存,除去 linux 系統、kafka、zookeeper 等的消耗,保守估計僅有2G內存可用來運行 topology,由此可知,當集羣只有一個 topology 在運行的狀況下,最多能夠配置6個 worker。
另外,咱們還能夠調節 worker 的內存空間。這取決於流過 topology 的數據量的大小以及各 bolt 單元的業務代碼的執行時間。若是數據量特別大,代碼執行時間較長,那麼能夠考慮增長單個 worker 的工做內存。有一點須要注意的是,一個 worker 下的全部 executor 和 task 都是共享這個 worker 的內存的,也就是假如一個 worker 分配了 768M 內存,3個 executor,6個 task,那麼這個 3 executor 和 6 task 實際上是共用這 768M 內存的,可是好處是能夠充分利用多核 CPU 的運算性能。
總結起來,worker 的數量,取值因素有:
機器的CPU、帶寬、磁盤性能等也會對 Storm 性能有影響,可是這些外在因素通常不影響 worker 數量的決策。
須要注意的是,Storm 在默認狀況下,每一個 supervisor 節點只容許最多4個 worker(slot)進程運行;若是所配置的 worker 數量超過這個限制,則須要在 storm 配置文件中修改。
第二個問題:關於 FilterBolt 的並行度:若是 spout 讀取的是 kafka 的數據,那麼正常狀況下,設置爲 topic 的分區數量便可。計算 kafkaSpout 的最佳取值,有一個最簡單的辦法,就是在 Storm UI裏面,點開 topology 的首頁,在 Spouts (All time) 下,查看如下幾個參數的值:
怎麼看這幾個參數呢?有幾個技巧:
kafka 只能保證同一分區下消息的順序性,當 spout 配置了多個 executor 的時候,不一樣分區的消息會均勻的分發到不一樣的 executor 上消費,那麼消息的總體順序性就難以保證了,除非將 spout 並行度設爲 1
第三個問題:關於 FilterBolt 的並行度:其取值也有一個簡單辦法,就是在 Storm UI裏面,點開 topology 的首頁,在 Bolts (All time) 下,查看如下幾個參數的值:
通常狀況下,按照該 bolt 的代碼時間複雜度,設置一個 spout 並行度的 1-3倍便可。
第四個問題:AlertBolt 的並行度同 FilterBolt。
第五個問題:shuffleGrouping 會將 tuple 均勻地隨機分發給下游 bolt,通常狀況下用它就是最好的了。
總之,要找出並行度的最佳取值,主要結合 Storm UI 來作決策。
/** tuple發送失敗重試策略,通常狀況下不須要調整 */ spoutConfig.retryInitialDelayMs = 0; spoutConfig.retryDelayMultiplier = 1.0; spoutConfig.retryDelayMaxMs = 60 * 1000; /** 此參數比較重要,可適當調大一點 */ /** 一般狀況下 spout 的發射速度會快於下游的 bolt 的消費速度,當下遊的 bolt 還有 TOPOLOGY_MAX_SPOUT_PENDING 個 tuple 沒有消費完時,spout 會停下來等待,該配置做用於 spout 的每一個 task。 */ conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10000) /** 調整分配給每一個 worker 的內存,關於內存的調節,上文已有描述 */ conf.put(Config.WORKER_HEAP_MEMORY_MB, 768); conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 768); /** 調整 worker 間通訊相關的緩衝參數,如下是一種推薦的配置 */ conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); // 1.0 以上已移除 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
能夠在 Storm UI 上查看當前集羣的 Topology Configuration
能夠直接採用 rebalance 命令(也能夠在 Storm UI上操做)從新配置 topology 的並行度:
storm rebalance TOPOLOGY-NAME -n 5 -e SPOUT/BOLT1-NAME=3 -e SPOUT/BOLT2-NAME=10