什麼是Apache Stormhtml
Apache Storm是一個分佈式實時大數據處理系統。Storm設計用於在容錯和水平可擴展方法中處理大量數據。它是一個流數據框架,具備最高的攝取率。雖然Storm是無狀態的,它經過Apache ZooKeeper管理分佈式環境和集羣狀態。經過Storm能夠並行地對實時數據執行各類操做。Storm易於部署和操做,而且它能夠保證每一個消息將經過拓撲至少處理一次。java
Apache Storm核心概念數據庫
Apache Storm從一端讀取實時數據的原始流,並將其傳遞經過一系列小處理單元,並在另外一端輸出處理/有用的信息。apache
下圖描述了Apache Storm的核心概念。安全
Apache Storm的組件網絡
Tuple數據結構
Tuple是Storm中的主要數據結構。它是有序元素的列表。默認狀況下,Tuple支持全部數據類型。一般,它被建模爲一組逗號分隔的值,並傳遞到Storm集羣。多線程
Stream架構
流是元組的無序序列。併發
Spouts
流的源。一般,Storm從原始數據源(如Twitter Streaming API,Apache Kafka隊列,Kestrel隊列等)接受輸入數據。不然,您能夠編寫spouts以從數據源讀取數據。「ISpout」是實現spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。
Bolts
Bolts是邏輯處理單元。Spouts將數據傳遞到Bolts和Bolts過程,併產生新的輸出流。Bolts能夠執行過濾,聚合,加入,與數據源和數據庫交互的操做。Bolts接收數據併發射到一個或多個Bolts。 「IBolt」是實現Bolts的核心接口。一些常見的接口是IRichBolt,IBasicBolt等。
拓撲
Spouts和Bolts鏈接在一塊兒,造成拓撲結構。實時應用程序邏輯在Storm拓撲中指定。簡單地說,拓撲是有向圖,其中頂點是計算,邊緣是數據流。
簡單拓撲從spouts開始。Spouts將數據發射到一個或多個Bolts。Bolt表示拓撲中具備最小處理邏輯的節點,而且Bolts的輸出能夠發射到另外一個Bolts做爲輸入。
Storm保持拓撲始終運行,直到您終止拓撲。Apache Storm的主要工做是運行拓撲,並在給定時間運行任意數量的拓撲。
任務
如今你有一個關於Spouts和Bolts的基本想法。它們是拓撲的最小邏輯單元,而且使用單個Spout和Bolt陣列構建拓撲。應以特定順序正確執行它們,以使拓撲成功運行。Storm執行的每一個Spout和Bolt稱爲「任務」。簡單來講,任務是Spouts或Bolts的執行。
進程
拓撲在多個工做節點上以分佈式方式運行。Storm將全部工做節點上的任務均勻分佈。工做節點的角色是監聽做業,並在新做業到達時啓動或中止進程。
流分組
數據流從Spouts流到Bolts,或從一個Bolts流到另外一個Bolts。流分組控制元組在拓撲中的路由方式,並幫助咱們瞭解拓撲中的元組流。有以下分組:
emit
method in OutputCollector (which returns the task ids that the tuple was sent to).Apache Storm集羣架構
Apache Storm的主要亮點是,它是一個容錯,快速,沒有「單點故障」(SPOF)分佈式應用程序。咱們能夠根據須要在多個系統中安裝Apache Storm,以增長應用程序的容量。
讓咱們看看Apache Storm集羣如何設計和其內部架構。下圖描述了集羣設計。
Apache Storm有兩種類型的節點,Nimbus(主節點)和Supervisor(工做節點)。Nimbus是Apache Storm的核心組件。Nimbus的主要工做是運行Storm拓撲。Nimbus分析拓撲並收集要執行的任務。而後,它將任務分配給可用的supervisor。
Supervisor將有一個或多個工做進程。Supervisor將任務委派給工做進程。工做進程將根據須要產生儘量多的執行器並運行任務。Apache Storm使用內部分佈式消息傳遞系統來進行Nimbus和管理程序之間的通訊。Storm普遍使用Thrift協議進行內部通訊和數據定義。Storm拓撲只是Thrift Structs。在Apache Storm中運行拓撲的Storm Nimbus是一個Thrift服務。
Nimbus(主節點)
Nimbus是Storm集羣的主節點。集羣中的全部其餘節點稱爲工做節點。主節點負責在全部工做節點之間分發數據,向工做節點分配任務和監視故障。
Supervisor(工做節點)
遵循指令的節點被稱爲Supervisors。Supervisor有多個工做進程,它管理工做進程以完成由nimbus分配的任務。
Worker process(工做進程)
工做進程將執行與特定拓撲相關的任務。工做進程不會本身運行任務,而是建立執行者(Executor)並要求他們執行特定的任務。工做進程將有多個執行者。
Executor(執行者)
執行者只是工做進程產生的單個線程。執行者運行一個或多個任務,但僅用於特定的spout或bolt。
Task(任務)
任務執行實際的數據處理。因此,它是一個spout或bolt。
ZooKeeper framework(ZooKeeper框架)
Apache的ZooKeeper的是使用羣集(節點組)本身和維護具備強大的同步技術共享數據之間進行協調的服務。Nimbus是無狀態的,因此它依賴於ZooKeeper來監視工做節點的狀態。
ZooKeeper幫助supervisor與nimbus交互,它負責維持nimbus,supervisor的狀態。
Storm是無狀態的。即便無狀態性質有它本身的缺點,它實際上幫助Storm以最好的可能和最快的方式處理實時數據。
Storm雖然不是徹底無狀態的。它將其狀態存儲在Apache ZooKeeper中。因爲狀態在Apache ZooKeeper中可用,故障的網絡能夠從新啓動,並從它離開的地方工做。一般,像monitor這樣的服務監視工具將監視Nimbus,並在出現任何故障時從新啓動它。
Apache Storm工做流程
一個工做的Storm集羣應該有一個Nimbus和一個或多個supervisors。另外一個重要的節點是Apache ZooKeeper,它將用於nimbus和supervisors之間的協調。
如今讓咱們仔細看看Apache Storm的工做流程 −
默認狀況下,Storm集羣中有兩種模式:
Storm使用經驗分享
1.使用組件的並行度代替線程池或額外的線程
Storm自身是一個分佈式、多線程的框架,對每一個Spout和Bolt,咱們均可以設置其併發度;它也支持經過rebalance命令來動態調整併發度,把負載分攤到多個Worker上。
若是本身在組件內部採用線程池作一些計算密集型的任務,好比JSON解析,有可能使得某些組件的資源消耗特別高,其餘組件又很低,致使Worker之間資源消耗不均衡,這種狀況在組件並行度比較低的時候更明顯。
好比某個Bolt設置了1個並行度,但在Bolt中又啓動了線程池,這樣致使的一種後果就是,集羣中分配了這個Bolt的Worker進程可能會把機器的資源都給消耗光了,影響到其餘Topology在這臺機器上的任務的運行。若是真有計算密集型的任務,咱們能夠把組件的併發度設大,Worker的數量也相應提升,讓計算分配到多個節點上。
爲了不某個Topology的某些組件把整個機器的資源都消耗光的狀況,除了不在組件內部啓動線程池來作計算之外,也能夠經過CGroup控制每一個Worker的資源使用量。
不要在組件內部使用使用額外的線程,好比啓動了額外的線程或Timer去處理邏輯,Storm並不保證額外的線程中處理數據的線程安全。
2.不要用DRPC批量處理大數據
RPC提供了應用程序和Storm Topology之間交互的接口,可供其餘應用直接調用,使用Storm的併發性來處理數據,而後將結果返回給調用的客戶端。這種方式在數據量不大的狀況下,一般不會有問題,而當須要處理批量大數據的時候,問題就比較明顯了。
(1)處理數據的Topology在超時以前可能沒法返回計算的結果。
(2)批量處理數據,可能使得集羣的負載短暫偏高,處理完畢後,又下降回來,負載均衡性差。
批量處理大數據不是Storm設計的初衷,Storm考慮的是時效性和批量之間的均衡,更多地看中前者。須要準實時地處理大數據量,能夠考慮Spark等批量框架。
3.不要在Spout中處理耗時的操做
Spout中nextTuple方法會發射數據流,在啓用Ack的狀況下,fail方法和ack方法會被觸發。須要明確一點,在Storm中Spout是單線程(JStorm的Spout分了3個線程,分別執行nextTuple方法、fail方法和ack方法)。若是nextTuple方法很是耗時,某個消息被成功執行完畢後,Acker會給Spout發送消息,Spout若沒法及時消費,可能形成ACK消息超時後被丟棄,而後Spout反而認爲這個消息執行失敗了,形成邏輯錯誤。反之若fail方法或者ack方法的操做耗時較多,則會影響Spout發射數據的量,形成Topology吞吐量下降。
4.注意fieldsGrouping的數據均衡性
fieldsGrouping是根據一個或者多個Field對數據進行分組,不一樣的目標Task收到不一樣的數據,而同一個Task收到的數據會相同。假設某個Bolt根據用戶ID對數據進行fieldsGrouping,若是某一些用戶的數據特別多,而另一些用戶的數據又比較少,那麼就可能使得下一級處理Bolt收到的數據不均衡,整個處理的性能就會受制於某些數據量大的節點。能夠加入更多的分組條件或者更換分組策略,使得數據具備均衡性。
5.優先使用localOrShuffleGrouping
localOrShuffleGrouping是指若是目標Bolt中的一個或者多個Task和當前產生數據的Task在同一個Worker進程裏面,那麼就走內部的線程間通訊,將Tuple直接發給在當前Worker進程的目的Task。不然,同shuffleGrouping。
localOrShuffleGrouping的數據傳輸性能優於shuffleGrouping,由於在Worker內部傳輸,只須要經過Disruptor隊列就能夠完成,沒有網絡開銷和序列化開銷。所以在數據處理的複雜度不高,而網絡開銷和序列化開銷佔主要地位的狀況下,能夠優先使用localOrShuffleGrouping來代替shuffleGrouping。
6.設置合理的MaxSpoutPending值
在啓用Ack的狀況下,Spout中有個RotatingMap用來保存Spout已經發送出去,但尚未等到Ack結果的消息。RotatingMap的最大個數是有限制的,爲p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也能夠由TopologyBuilder在setSpout經過setMaxSpoutPending方法來設定),num-tasks是Spout的Task數。若是不設置MaxSpoutPending的大小或者設置得太大,可能消耗掉過多的內存致使內存溢出,設置過小則會影響Spout發射Tuple的速度。
7.設置合理的Worker數
Worker數越多,性能越好?並非!
這是因爲一方面,每新增長一個Worker進程,都會將一些本來線程間的內存通訊變爲進程間的網絡通訊,這些進程間的網絡通訊還須要進行序列化與反序列化操做,這些下降了吞吐率。
另外一方面,每新增長一個Worker進程,都會額外地增長多個線程(Netty發送和接收線程、心跳線程、System Bolt線程以及其餘系統組件對應的線程等),這些線程切換消耗了很多CPU,sys 系統CPU消耗佔比增長,在CPU總使用率受限的狀況下,下降了業務線程的使用效率。
8.平衡吞吐量和時效性
Storm的數據傳輸默認使用Netty。在數據傳輸性能方面,有以下的參數能夠調整:
storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分別爲接收消息線程和發送消息線程的數量。
netty.transfer.batch.size是指每次 Netty Client向 Netty Server發送的數據的大小,若是須要發送的Tuple消息大於netty.transfer.batch.size,則Tuple消息會按照netty.transfer.batch.size進行切分,而後屢次發送。
storm.messaging.netty.buffer_size爲每次批量發送的Tuple序列化以後的TaskMessage消息的大小。
storm.messaging.netty.flush.check.interval.ms表示當有TaskMessage須要發送的時候, Netty Client檢查能夠發送數據的頻率。下降storm.messaging.netty.flush.check.interval.ms的值,能夠提升時效性。增長netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,能夠提高網絡傳輸的吐吞量,使得網絡的有效載荷提高(減小TCP包的數量,而且TCP包中的有效數據量增長),一般時效性就會下降一些。所以須要根據自身的業務狀況,合理在吞吐量和時效性直接的平衡。
除了這些參數,咱們怎麼找到Storm中性能的瓶頸,能夠經過以下的一些途徑來進行:
在Storm的UI中,對每一個Topology都提供了相應的統計信息,其中有3個參數對性能來講參考意義比較明顯,包括Execute latency、Process latency和Capacity。
分別看一下這3個參數的含義和做用。
(1)Execute latency:消息的平均處理時間,單位爲毫秒。
(2)Process latency:消息從收到到被ack掉所花的時間,單位爲毫秒。若是沒有啓用Acker機制,那麼Process latency的值爲0。
(3)Capacity:計算公式爲Capacity = Bolt或者Executor調用execute方法處理的消息數量 * 消息平均執行時間 / 時間區間。這個值越接近1,說明Bolt或者Executor基本一直在調用execute方法,所以並行度不夠,須要擴展這個組件的Executor數量。
爲了在Storm中達到高性能,咱們在設計和開發Topology的時候,須要注意如下原則:
(1)模塊和模塊之間解耦,模塊之間的層次清晰,每一個模塊能夠獨立擴展,而且符合流水線的原則。
(2)無狀態設計,無鎖設計,水平擴展支持。
(3)爲了達到高的吞吐量,延遲會加大;爲了低延遲,吞吐量可能下降,須要在兩者之間平衡。
(4)性能的瓶頸永遠在熱點,解決熱點問題。
(5)優化的前提是測量,而不是主觀臆測。收集相關數據,再動手,事半功倍。
參考:
https://www.w3cschool.cn/apache_storm/apache_storm_introduction.html
https://zhuanlan.zhihu.com/p/20504669
http://storm.apache.org/index.html