apache storm基本原理及使用總結

什麼是Apache Stormhtml

Apache Storm是一個分佈式實時大數據處理系統。Storm設計用於在容錯和水平可擴展方法中處理大量數據。它是一個流數據框架,具備最高的攝取率。雖然Storm是無狀態的,它經過Apache ZooKeeper管理分佈式環境和集羣狀態。經過Storm能夠並行地對實時數據執行各類操做。Storm易於部署和操做,而且它能夠保證每一個消息將經過拓撲至少處理一次。java

Apache Storm核心概念數據庫

Apache Storm從一端讀取​​實時數據的原始流,並將其傳遞經過一系列小處理單元,並在另外一端輸出處理/有用的信息。apache

下圖描述了Apache Storm的核心概念。安全

Apache Storm的組件網絡

Tuple數據結構

Tuple是Storm中的主要數據結構。它是有序元素的列表。默認狀況下,Tuple支持全部數據類型。一般,它被建模爲一組逗號分隔的值,並傳遞到Storm集羣。多線程

Stream架構

流是元組的無序序列。併發

Spouts

流的源。一般,Storm從原始數據源(如Twitter Streaming API,Apache Kafka隊列,Kestrel隊列等)接受輸入數據。不然,您能夠編寫spouts以從數據源讀取數據。「ISpout」是實現spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。

Bolts

Bolts是邏輯處理單元。Spouts將數據傳遞到Bolts和Bolts過程,併產生新的輸出流。Bolts能夠執行過濾,聚合,加入,與數據源和數據庫交互的操做。Bolts接收數據併發射到一個或多個Bolts。 「IBolt」是實現Bolts的核心接口。一些常見的接口是IRichBolt,IBasicBolt等。

拓撲

Spouts和Bolts鏈接在一塊兒,造成拓撲結構。實時應用程序邏輯在Storm拓撲中指定。簡單地說,拓撲是有向圖,其中頂點是計算,邊緣是數據流。

簡單拓撲從spouts開始。Spouts將數據發射到一個或多個Bolts。Bolt表示拓撲中具備最小處理邏輯的節點,而且Bolts的輸出能夠發射到另外一個Bolts做爲輸入。

Storm保持拓撲始終運行,直到您終止拓撲。Apache Storm的主要工做是運行拓撲,並在給定時間運行任意數量的拓撲。

任務

如今你有一個關於Spouts和Bolts的基本想法。它們是拓撲的最小邏輯單元,而且使用單個Spout和Bolt陣列構建拓撲。應以特定順序正確執行它們,以使拓撲成功運行。Storm執行的每一個Spout和Bolt稱爲「任務」。簡單來講,任務是Spouts或Bolts的執行。

進程

拓撲在多個工做節點上以分佈式方式運行。Storm將全部工做節點上的任務均勻分佈。工做節點的角色是監聽做業,並在新做業到達時啓動或中止進程。

流分組

數據流從Spouts流到Bolts,或從一個Bolts流到另外一個Bolts。流分組控制元組在拓撲中的路由方式,並幫助咱們瞭解拓撲中的元組流。有以下分組:

  1. Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
  2. Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
  3. Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.
  4. All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care.
  5. Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
  6. None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
  7. Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
  8. Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.

Apache Storm集羣架構

Apache Storm的主要亮點是,它是一個容錯,快速,沒有「單點故障」(SPOF)分佈式應用程序。咱們能夠根據須要在多個系統中安裝Apache Storm,以增長應用程序的容量。

讓咱們看看Apache Storm集羣如何設計和其內部架構。下圖描述了集羣設計。

Apache Storm有兩種類型的節點,Nimbus(主節點)和Supervisor(工做節點)。Nimbus是Apache Storm的核心組件。Nimbus的主要工做是運行Storm拓撲。Nimbus分析拓撲並收集要執行的任務。而後,它將任務分配給可用的supervisor。

Supervisor將有一個或多個工做進程。Supervisor將任務委派給工做進程。工做進程將根據須要產生儘量多的執行器並運行任務。Apache Storm使用內部分佈式消息傳遞系統來進行Nimbus和管理程序之間的通訊。Storm普遍使用Thrift協議進行內部通訊和數據定義。Storm拓撲只是Thrift Structs。在Apache Storm中運行拓撲的Storm Nimbus是一個Thrift服務。

Nimbus(主節點)

Nimbus是Storm集羣的主節點。集羣中的全部其餘節點稱爲工做節點。主節點負責在全部工做節點之間分發數據,向工做節點分配任務和監視故障。

Supervisor(工做節點)

遵循指令的節點被稱爲Supervisors。Supervisor有多個工做進程,它管理工做進程以完成由nimbus分配的任務。

Worker process(工做進程)

工做進程將執行與特定拓撲相關的任務。工做進程不會本身運行任務,而是建立執行者(Executor)並要求他們執行特定的任務。工做進程將有多個執行者。

Executor(執行者)

執行者只是工做進程產生的單個線程。執行者運行一個或多個任務,但僅用於特定的spout或bolt。

Task(任務)

任務執行實際的數據處理。因此,它是一個spout或bolt。

ZooKeeper framework(ZooKeeper框架)

Apache的ZooKeeper的是使用羣集(節點組)本身和維護具備強大的同步技術共享數據之間進行協調的服務。Nimbus是無狀態的,因此它依賴於ZooKeeper來監視工做節點的狀態。

ZooKeeper幫助supervisor與nimbus交互,它負責維持nimbus,supervisor的狀態。

Storm是無狀態的。即便無狀態性質有它本身的缺點,它實際上幫助Storm以最好的可能和最快的方式處理實時數據。

Storm雖然不是徹底無狀態的。它將其狀態存儲在Apache ZooKeeper中。因爲狀態在Apache ZooKeeper中可用,故障的網絡能夠從新啓動,並從它離開的地方工做。一般,像monitor這樣的服務監視工具將監視Nimbus,並在出現任何故障時從新啓動它。

Apache Storm工做流程

一個工做的Storm集羣應該有一個Nimbus和一個或多個supervisors。另外一個重要的節點是Apache ZooKeeper,它將用於nimbus和supervisors之間的協調。

如今讓咱們仔細看看Apache Storm的工做流程 −

  • 最初,nimbus將等待「Storm拓撲」提交給它。
  • 一旦提交拓撲,它將處理拓撲並收集要執行的全部任務和任務將被執行的順序。
  • 而後,nimbus將任務均勻分配給全部可用的supervisors。
  • 在特定的時間間隔,全部supervisor將向nimbus發送心跳以通知它們仍然運行着。
  • 當supervisor終止而且不向心跳發送心跳時,則nimbus將任務分配給另外一個supervisor。
  • 當nimbus自己終止時,supervisor將在沒有任何問題的狀況下對已經分配的任務進行工做。
  • 一旦全部的任務都完成後,supervisor將等待新的任務進去。
  • 同時,終止nimbus將由服務監控工具自動從新啓動。
  • 從新啓動的網絡將從中止的地方繼續。一樣,終止supervisor也能夠自動從新啓動。因爲網絡管理程序和supervisor均可以自動從新啓動,而且二者將像之前同樣繼續,所以Storm保證至少處理全部任務一次。
  • 一旦處理了全部拓撲,則網絡管理器等待新的拓撲到達,而且相似地,管理器等待新的任務。

默認狀況下,Storm集羣中有兩種模式:

  • 本地模式 -此模式用於開發,測試和調試,由於它是查看全部拓撲組件協同工做的最簡單方法。在這種模式下,咱們能夠調整參數,使咱們可以看到咱們的拓撲如何在不一樣的Storm配置環境中運行。在本地模式下,storm拓撲在本地機器上在單個JVM中運行。
  • 生產模式 -在這種模式下,咱們將拓撲提交到工做Storm集羣,該集羣由許多進程組成,一般運行在不一樣的機器上。如在storm的工做流中所討論的,工做集羣將無限地運行,直到它被關閉。

Storm使用經驗分享

1.使用組件的並行度代替線程池或額外的線程

Storm自身是一個分佈式、多線程的框架,對每一個Spout和Bolt,咱們均可以設置其併發度;它也支持經過rebalance命令來動態調整併發度,把負載分攤到多個Worker上。

若是本身在組件內部採用線程池作一些計算密集型的任務,好比JSON解析,有可能使得某些組件的資源消耗特別高,其餘組件又很低,致使Worker之間資源消耗不均衡,這種狀況在組件並行度比較低的時候更明顯。

好比某個Bolt設置了1個並行度,但在Bolt中又啓動了線程池,這樣致使的一種後果就是,集羣中分配了這個Bolt的Worker進程可能會把機器的資源都給消耗光了,影響到其餘Topology在這臺機器上的任務的運行。若是真有計算密集型的任務,咱們能夠把組件的併發度設大,Worker的數量也相應提升,讓計算分配到多個節點上。

爲了不某個Topology的某些組件把整個機器的資源都消耗光的狀況,除了不在組件內部啓動線程池來作計算之外,也能夠經過CGroup控制每一個Worker的資源使用量。

不要在組件內部使用使用額外的線程,好比啓動了額外的線程或Timer去處理邏輯,Storm並不保證額外的線程中處理數據的線程安全。

2.不要用DRPC批量處理大數據

RPC提供了應用程序和Storm Topology之間交互的接口,可供其餘應用直接調用,使用Storm的併發性來處理數據,而後將結果返回給調用的客戶端。這種方式在數據量不大的狀況下,一般不會有問題,而當須要處理批量大數據的時候,問題就比較明顯了。

(1)處理數據的Topology在超時以前可能沒法返回計算的結果。

(2)批量處理數據,可能使得集羣的負載短暫偏高,處理完畢後,又下降回來,負載均衡性差。

批量處理大數據不是Storm設計的初衷,Storm考慮的是時效性和批量之間的均衡,更多地看中前者。須要準實時地處理大數據量,能夠考慮Spark等批量框架。

3.不要在Spout中處理耗時的操做

Spout中nextTuple方法會發射數據流,在啓用Ack的狀況下,fail方法和ack方法會被觸發。須要明確一點,在Storm中Spout是單線程(JStorm的Spout分了3個線程,分別執行nextTuple方法、fail方法和ack方法)。若是nextTuple方法很是耗時,某個消息被成功執行完畢後,Acker會給Spout發送消息,Spout若沒法及時消費,可能形成ACK消息超時後被丟棄,而後Spout反而認爲這個消息執行失敗了,形成邏輯錯誤。反之若fail方法或者ack方法的操做耗時較多,則會影響Spout發射數據的量,形成Topology吞吐量下降。

4.注意fieldsGrouping的數據均衡性

fieldsGrouping是根據一個或者多個Field對數據進行分組,不一樣的目標Task收到不一樣的數據,而同一個Task收到的數據會相同。假設某個Bolt根據用戶ID對數據進行fieldsGrouping,若是某一些用戶的數據特別多,而另一些用戶的數據又比較少,那麼就可能使得下一級處理Bolt收到的數據不均衡,整個處理的性能就會受制於某些數據量大的節點。能夠加入更多的分組條件或者更換分組策略,使得數據具備均衡性。

5.優先使用localOrShuffleGrouping

localOrShuffleGrouping是指若是目標Bolt中的一個或者多個Task和當前產生數據的Task在同一個Worker進程裏面,那麼就走內部的線程間通訊,將Tuple直接發給在當前Worker進程的目的Task。不然,同shuffleGrouping。

localOrShuffleGrouping的數據傳輸性能優於shuffleGrouping,由於在Worker內部傳輸,只須要經過Disruptor隊列就能夠完成,沒有網絡開銷和序列化開銷。所以在數據處理的複雜度不高,而網絡開銷和序列化開銷佔主要地位的狀況下,能夠優先使用localOrShuffleGrouping來代替shuffleGrouping。

6.設置合理的MaxSpoutPending值

在啓用Ack的狀況下,Spout中有個RotatingMap用來保存Spout已經發送出去,但尚未等到Ack結果的消息。RotatingMap的最大個數是有限制的,爲p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也能夠由TopologyBuilder在setSpout經過setMaxSpoutPending方法來設定),num-tasks是Spout的Task數。若是不設置MaxSpoutPending的大小或者設置得太大,可能消耗掉過多的內存致使內存溢出,設置過小則會影響Spout發射Tuple的速度。

7.設置合理的Worker數

Worker數越多,性能越好?並非!

這是因爲一方面,每新增長一個Worker進程,都會將一些本來線程間的內存通訊變爲進程間的網絡通訊,這些進程間的網絡通訊還須要進行序列化與反序列化操做,這些下降了吞吐率。

另外一方面,每新增長一個Worker進程,都會額外地增長多個線程(Netty發送和接收線程、心跳線程、System Bolt線程以及其餘系統組件對應的線程等),這些線程切換消耗了很多CPU,sys 系統CPU消耗佔比增長,在CPU總使用率受限的狀況下,下降了業務線程的使用效率。

8.平衡吞吐量和時效性

Storm的數據傳輸默認使用Netty。在數據傳輸性能方面,有以下的參數能夠調整:

storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分別爲接收消息線程和發送消息線程的數量。

netty.transfer.batch.size是指每次 Netty Client向 Netty Server發送的數據的大小,若是須要發送的Tuple消息大於netty.transfer.batch.size,則Tuple消息會按照netty.transfer.batch.size進行切分,而後屢次發送。

storm.messaging.netty.buffer_size爲每次批量發送的Tuple序列化以後的TaskMessage消息的大小。

storm.messaging.netty.flush.check.interval.ms表示當有TaskMessage須要發送的時候, Netty Client檢查能夠發送數據的頻率。下降storm.messaging.netty.flush.check.interval.ms的值,能夠提升時效性。增長netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,能夠提高網絡傳輸的吐吞量,使得網絡的有效載荷提高(減小TCP包的數量,而且TCP包中的有效數據量增長),一般時效性就會下降一些。所以須要根據自身的業務狀況,合理在吞吐量和時效性直接的平衡。

除了這些參數,咱們怎麼找到Storm中性能的瓶頸,能夠經過以下的一些途徑來進行:

在Storm的UI中,對每一個Topology都提供了相應的統計信息,其中有3個參數對性能來講參考意義比較明顯,包括Execute latency、Process latency和Capacity。

分別看一下這3個參數的含義和做用。

(1)Execute latency:消息的平均處理時間,單位爲毫秒。

(2)Process latency:消息從收到到被ack掉所花的時間,單位爲毫秒。若是沒有啓用Acker機制,那麼Process latency的值爲0。

(3)Capacity:計算公式爲Capacity = Bolt或者Executor調用execute方法處理的消息數量 * 消息平均執行時間 / 時間區間。這個值越接近1,說明Bolt或者Executor基本一直在調用execute方法,所以並行度不夠,須要擴展這個組件的Executor數量。

爲了在Storm中達到高性能,咱們在設計和開發Topology的時候,須要注意如下原則:

(1)模塊和模塊之間解耦,模塊之間的層次清晰,每一個模塊能夠獨立擴展,而且符合流水線的原則。

(2)無狀態設計,無鎖設計,水平擴展支持。

(3)爲了達到高的吞吐量,延遲會加大;爲了低延遲,吞吐量可能下降,須要在兩者之間平衡。

(4)性能的瓶頸永遠在熱點,解決熱點問題。

(5)優化的前提是測量,而不是主觀臆測。收集相關數據,再動手,事半功倍。

 

參考:

https://www.w3cschool.cn/apache_storm/apache_storm_introduction.html

https://zhuanlan.zhihu.com/p/20504669

http://storm.apache.org/index.html

相關文章
相關標籤/搜索