Storm之trident聚合操做介紹

Trident主要有5類操做:網絡

一、做用在本地的操做,不產生網絡傳輸。併發

二、對數據流的重分佈,不改變流的內容,可是產生網絡傳輸。ide

三、聚合操做,有可能產生網絡傳輸。oop

四、做用在分組流(grouped streams)上的操做。優化

五、Mergejoinspa

 

這裏主要介紹一下34,但願對你們有所幫助,若有錯誤請指正!orm

首先說幾個名詞:對象

PartitionStorm中併發的最小執行單元是task;在tridentpartition至關於task的角色。接口

Grouped streams對數據流作groupBy操做後,將key相同的流組織在一塊兒,造成若干組流。事務

Global aggregation沒有groupBy的聚合,即全局聚合。

AggregatorTrident中定義的用於實現聚合方法的接口。

下面開始介紹:

做用在tridentStream對象上的與聚合相關的主要方法:

aggregate

partitionAggregate

persistentAggregate

groupBy

partitionBy

partitionPersist

parallelismHint

做爲聚合操做的一個參數,實現聚合功能的主要接口有:

Aggregator<T>

CombinerAggregator<T>

ReducerAggregator<T>

----------------------華麗麗的分割線----------------------------------

使用過Hive的人都知道,不含group by的聚合SQL在轉化成Hadoop做業後,在編譯時就肯定了只能有1reduce,由於全局聚合在彙總階段只能由1個計算單元完成。一樣的道理,當aggregate方法用於無groupByglobal aggregation時,每一個批次(batch)的流也只能在1partition中執行(使用AggregatorReducerAggregator接口,而CombinerAggregator例外,後面會講到)。當咱們使用aggregate計算global aggregation時,若是經過parallelismHint設置了併發數爲ntrident的作法是經過輪循的方式讓不一樣的批次依次在npartition中執行,實際上仍是在串行執行,意義不大。所以使用aggregateglobal aggregation時,並不能實現併發的功能,只適用於數據量不大的場景,這時候最好把併發設成1,不然對資源是一種浪費。

值得一提的是,若是實現了自定義分組的CustomStreamGrouping接口,後面再跟global aggregation,例如:
trident.newStream(「TRIDENT_SPOUT」, new  MySpout())
       .partition(new  MyCustomStreamGrouping())
       .aggregate(new MyAggregator(), new  Fields(「out1」))
       .parallelismHint(10);

這時候實際上咱們的自定義分組是不起做用的,由於上面已經說明,此時併發並無真正開啓,而是採起的輪循策略。只有將aggregate換成partitionAggregate,自定義的分組纔會起做用。

使用aggregate作分組聚合是它的強項,此時能夠充分發揮併發的特性。可是須要注意,假設併發度設置爲10,而咱們groupBykey的不一樣值實際上只有2個,那勢必有不少partition在空跑,形成資源浪費。

partitionAggregate一般用於global aggregation時的本地化聚合,相似於Hadoop中的map階段。partitionAggregate是在每個partition內獨立調用本身的聚合操做,互不干涉。最後還須要把局部聚合值emit出來,經過網絡傳輸供後面的aggregate作全局聚合。經過這種策略,能夠實現global aggregation的併發。partitionAggregate的前面不能跟groupBy方法,由於groupBy方法返回的GroupedStream對象沒有partitionAggregate方法。

Aggregator<T>接口是三種實現聚合功能的接口中最通用的一種。Aggregator<T>要實現5個方法:

prepare只在啓動拓撲時調用1次。若是設置了併發度,則每個partition調用1次。
cleanup
只在正常關閉拓撲時調用1次。若是設置了併發度,則每個partition調用1次。
init
對於global aggregation,每一個批次調用1次。若是使用的是partitionAggregate,則每個批次的每個partition調用1次。對於Grouped Streams,每一個相同的key組成的數據流調用1次。須要注意的是,若是使用的是事務型spout,同時某個批次處理失敗後致使該批次消息重發,則在接下來處理時init有可能會調用屢次。所以init裏面的代碼邏輯應該要支持同一批的重複調用。
aggregate
1tuple調用1次。
complete
對於global aggregation,每一個批次調用1次。若是使用的是partitionAggregate,則每個批次的每個partition調用1次。對於Grouped Streams,每一個相同的key組成的數據流調用1次。

再說一下CombinerAggregator<T>,它比較有趣,前面提到使用aggregateglobal aggregation沒法開啓併發。可是當CombinerAggregator<T>aggregate配合使用時,例如:
trident.newStream(「TRIDENT_SPOUT」, new  MySpout())
       .parallelismHint(10)
       .aggregate(new MyCombinerAggregator(), new Fields(「out1」));

Trident會把拓撲自動拆分紅2bolt,第一個bolt作局部聚合,相似於Hadoop中的map;第二個bolt經過接收網絡傳輸過來的局部聚合值最後作一個全局聚合,相似於Hadoop中的reduce。在上面的例子中,局部聚合開啓了10個併發,這就實現了使用aggregateglobal aggregation時真正開啓併發。固然,使用partitionAggregate能夠實現一樣的功能。相似於:
trident.newStream(「TRIDENT_SPOUT」, new  MySpout())
       .partitionAggregate(new  MyAggregator(), new  Fields(「out1」))
       .parallelismHint(10)
       .aggregate(new Fields(「out1」), new MyAggregator(), new  Fields(「out2」));
有三點須要注意:
1
、自動優化後的第一個bolt是本地化操做,所以它能夠和它前面或者後面挨着的全部each合併在同一個bolt裏面。
2
parallelismHint(n)要寫在aggregate的前面,若是寫在aggregate後面,將致使本地化操做的第一個bolt的併發度爲1,而全局聚合的第二個bolt的併發度爲n,而實際上第二個bolt並不能真正開啓併發,只是前面提到的輪循而已。
3
、綜合12,把parallelismHint(n)寫在aggregate的前面會致使spout同時開啓n的併發度,所以要注意本身實現的spout類是否支持併發發送。

CombinerAggregator<T>須要實現3個方法:
init
每條tuple調用1次,對tuple作預處理。
combine
每條tuple調用1次,和以前的聚合值作combine。若是是第一條tuple則和zero返回的值作combine
zero
當沒有數據流時的處理邏輯。
整個CombinerAggregator<T>會在每批次結束時將combine的結果作一次emit

persistentAggregate是實現聚合的另一種方式。前面介紹的聚合能夠當作是對每一個批次的數據作本批次內的聚合計算,至於批次之間如何merge須要本身處理。而persistentAggregate能夠當作是對源源不斷髮送過來數據流作一個總的聚合,每一個批次的聚合值只是一箇中間狀態,經過與trident新提出的state概念結合,實現中間狀態的持久化,同時支持事務性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>

關於state接口,它的使用場景很是多,這裏先不作詳細介紹。它能夠做爲Stream.stateQuery的參數按批次對持久化的數據作查詢;也能夠配合Stream.partitionPersist按批次作持久化操做,相似於IBatchBolt<T>.finishBatch所能實現的功能。

EOF

相關文章
相關標籤/搜索