Storm Trident 詳細介紹

1、概要
1.1 Storm(簡介)
     Storm是一個實時的可靠地分佈式流計算框架。
     具體就很少說了,舉個例子,它的一個典型的大數據實時計算應用場景:從Kafka消息隊列讀取消息(能夠是logs,clicks,sensor data);經過Storm對消息進行計算聚合等預處理;把處理結果持久化到NoSQL數據庫或者HDFS作進一步深刻分析。
1.2 Trident(簡介)
     Trident是對Storm的更高一層的抽象,除了提供一套簡單易用的流數據處理API以外,它以batch(一組tuples)爲單位進行處理,這樣一來,可使得一些處理更簡單和高效。
     咱們知道把Bolt的運行狀態僅僅保存在內存中是不可靠的,若是一個node掛掉,那麼這個node上的任務就會被從新分配,可是以前的狀態是沒法恢復的。所以,比較聰明的方式就是把storm的計算狀態信息持久化到database中,基於這一點,trident就變得尤其重要。由於在處理大數據時,咱們在與database打交道時一般會採用批處理的方式來避免給它帶來壓力,而trident偏偏是以 batch groups 的形式處理數據,並提供了一些聚合功能的API。

2、Trident API 實踐
     Trident其實就是一套API,但現階段網上關於Trident API中各個函數的用法含義資料很少,下面我就根據一些英文資料和本身的理解,詳細介紹一下Trident API各個函數的用法和含義。
2.1 each() 方法
     做用:操做batch中的每個tuple內容,通常與Filter或者Function函數配合使用。
     下面經過一個例子來介紹each()方法,假設咱們有一個FakeTweetsBatchSpout,它會模擬一個Stream,隨機產生一個個消息。咱們能夠經過設置這個Spout類的構造參數來改變這個Spout的batch Size的大小。
2.1.1 Filter類:過濾tuple
     一個經過actor字段過濾消息的Filter:
public static class PerActorTweetsFilter extends BaseFilter {
    String actor;
    public PerActorTweetsFilter(String actor) {
        this.actor = actor;
    }
    @Override
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getString(0).equals(actor);
    }
}

   Topology:    java

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

     從上面例子看到,each()方法有一些構造參數: 
第一個構造參數:做爲Field Selector,一個tuple可能有不少字段,經過設置Field,咱們能夠隱藏其它字段,僅僅接收指定的字段(其它字段實際還在)。  node

第二個是一個Filter:用來過濾掉除actor名叫"dave"外的其它消息。

2.1.2 Function類:加工處理tuple內容
     一個能把tuple中text內容變成大寫的Function:
public static class UppercaseFunction extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        collector.emit(new Values(tuple.getString(0).toUpperCase()));
    }
}

     Topology:  算法

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	.each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
	.each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

    首先,UppercaseFunction函數的輸入是Fields("text", "actor"),其做用是把其中的"text"字段內容都變成大寫。  數據庫

  其次,它比Filter多出一個輸出字段,做用是每一個tuple在通過這個Function函數處理後,輸出字段都會被追加到tuple後面,在本例中,執行完Function以後的tuple內容多了一個"uppercased_text",而且這個字段排在最後面。

2.1.3 Field Selector與project
   咱們須要注意的是,上面每一個each()方法的第一個Field字段僅僅是隱藏掉沒有指定的字段內容,實際上被隱藏的字段依然還在tuple中,若是想要完全丟掉它們,咱們就須要用到project()方法。
   投影操做做用是僅保留Stream指定字段的數據,好比有一個Stream包含以下字段: [「a」, 「b」, 「c」, 「d」],運行以下代碼:
mystream.project(new Fields("b", "d"))  則輸出的流僅包含 [「b」, 「d」]字段。

2.2 parallelismHint()方法和partitionBy()
2.2.1 parallelismHint()
     指定Topology的並行度,即用多少線程執行這個任務。咱們能夠稍微改一下咱們的Filter,經過打印當前任務的partitionIndex來區分當前是哪一個線程。
Filter:
public static class PerActorTweetsFilter extends BaseFilter {
    private int partitionIndex;
    private String actor;

    public PerActorTweetsFilter(String actor) {
        this.actor = actor;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        this.partitionIndex = context.getPartitionIndex();
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        boolean filter = tuple.getString(0).equals(actor);

        if (filter) {
            System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);
        }

        return filter;
    }
}

Topology:  網絡

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5)
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

     若是咱們指定執行Filter任務的線程數量爲5,那麼最終的執行結果會如何呢?看一下咱們的測試結果:  框架

I am partition [4] and I have kept a tweet by: dave
I am partition [3] and I have kept a tweet by: dave
I am partition [0] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [1] and I have kept a tweet by: dave
     咱們能夠很清楚的發現,一共有5個線程在執行Filter。
     若是咱們想要2個Spout和5個Filter怎麼辦呢?以下面代碼所示,實現很簡單。
topology.newStream("spout", spout).parallelismHint(2).shuffle()
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5)
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

2.2.2 partitionBy()和重定向操做(repartitioning operation)  分佈式

     咱們注意到上面的例子中用到了shuffle(),shuffle()是一個重定向操做。那什麼是重定向操做呢?重定向定義了咱們的tuple如何被route到下一處理層,固然不一樣的層之間可能會有不一樣的並行度,shuffle()的做用是把tuple隨機的route下一層的線程中,而partitionBy()則根據咱們的指定字段按照一致性哈希算法route到下一層的線程中,也就是說,若是咱們用partitionBy()的話,同一個字段名的tuple會被route到同一個線程中。
     好比,若是咱們把上面代碼中的shuffle()改爲partitionBy(new Fields("actor")),猜一下結果會怎樣?
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
     測試結果正如咱們上面描述的那樣,相同字段的tuple被route到了同一個partition中。
重定向操做有以下幾種:
shuffle:經過隨機分配算法來均衡tuple到各個分區
broadcast:每一個tuple都被廣播到全部的分區,這種方式在drcp時很是有用,好比在每一個分區上作stateQuery
partitionBy:根據指定的字段列表進行劃分,具體作法是用指定字段列表的hash值對分區個數作取模運算,確保相同字段列表的數據被劃分到同一個分區
global:全部的tuple都被髮送到一個分區,這個分區用來處理整個Stream
batchGlobal:一個Batch中的全部tuple都被髮送到同一個分區,不一樣的Batch會去往不一樣的分區
Partition:經過一個自定義的分區函數來進行分區,這個自定義函數實現了 backtype.storm.grouping.CustomStreamGrouping

2.3 聚合(Aggregation)     
     咱們前面講過,Trident的一個很重要的特色就是它是以batch的形式處理tuple的。咱們能夠很容易想到的針對一個batch的最基本操做應該就是聚合。Trident提供了聚合API來處理batches,來看一個例子:

2.3.1 Aggregator:
public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {
    @Override
    public Map<String, Integer> init(Object batchId, TridentCollector collector) {
        return new HashMap<String, Integer>();
    }

    @Override
    public void aggregate(Map<String, Integer> val, TridentTuple tuple,
        TridentCollector collector) {
        String location = tuple.getString(0);
        val.put(location, MapUtils.getInteger(val, location, 0) + 1);
    }

    @Override
    public void complete(Map<String, Integer> val, TridentCollector collector) {
        collector.emit(new Values(val));
    }
}

Topology:  ide

topology.newStream("spout", spout)
	.aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
	.each(new Fields("location_counts"), new Utils.PrintFilter());

     這個aggregator很簡單:計算每個batch的location的數量。經過這個例子咱們能夠看到Aggregator接口:  函數

init(): 當剛開始接收到一個batch時執行
aggregate(): 在接收到batch中的每個tuple時執行
complete(): 在一個batch的結束時執行     

     咱們前面講過aggregate()方法是一個重定向方法,由於它會隨機啓動一個單獨的線程來進行這個聚合操做。
     下面咱們來看一下測試結果:
[{USA=3, Spain=1, UK=1}]
[{USA=3, Spain=2}]
[{France=1, USA=4}]
[{USA=4, Spain=1}]
[{USA=5}]
     咱們能夠看到打印的結果,其中每一條的和都是5,這是由於咱們的Spout的每一個batch中tuple數量設置的是5,因此每一個線程的計算結果也會是5。 除此以外,Trident還提供了其它兩個Aggregator接口: CombinerAggregator, ReducerAggregator,具體使用方法請參考Trident API。

2.3.2 partitionAggregate():
     若是咱們將上面的Topology稍微改造一下,猜一下結果會是如何?
topology.newStream("spout", spout)
	.partitionBy(new Fields("location"))
	.partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3)
	.each(new Fields("location_counts"), new Utils.PrintFilter());

     咱們一塊兒來分析一下,首先partitionBy()方法將tuples按其location字段重定向到下一處理邏輯,並且相同location字段的tuple必定會被分配到同一個線程中處理。其次,partitionAggregate()方法,注意它與Aggregate不一樣,它不是一個重定向方法,它僅僅是對當前partition上的各個batch執行聚合操做。由於咱們根據location進行了重定向操做,測試數據一共有4個location,而當前一共有3個partition,所以能夠猜想咱們的最終測試結果中,有一個partition會處理兩個location的batch,最終測試結果以下:  測試

[{France=10, Spain=5}]
[{USA=63}]
[{UK=22}]
     須要注意的是,partitionAggregate雖然也是聚合操做,但與上面的Aggregate徹底不一樣,它不是一個重定向操做。

2.4 groupBy
     咱們能夠看到上面幾個例子的測試結果,其實咱們一般想要的是每一個location的數量是多少,那該怎麼處理呢?看下面這個Topology:
topology.newStream("spout", spout)
	.groupBy(new Fields("location"))
	.aggregate(new Fields("location"), new Count(), new Fields("count"))
	.each(new Fields("location", "count"), new Utils.PrintFilter());

     咱們先看一下執行的結果: 

[France, 25]
[UK, 2]
[USA, 25]
[Spain, 44]
[France, 26]
[UK, 3]
     上面這段代碼計算出了每一個location的數量,即便咱們的Count函數沒有指定並行度。這就是groupBy()起的做用,它會根據指定的字段建立一個GroupedStream,相同字段的tuple都會被重定向到一塊兒,匯聚成一個group。groupBy()以後是aggregate,與以前的聚合整個batch不一樣,此時的aggregate會單獨聚合每一個group。咱們也能夠這麼認爲,groupBy會把Stream按照指定字段分紅一個個stream group,每一個group就像一個batch同樣被處理。
     不過須要注意的是,groupBy()自己並非一個重定向操做,但若是它後面跟的是aggregator的話就是,跟的是partitionAggregate的話就不是。
3、總結       Storm是一個實時流計算框架,Trident是對storm的一個更高層次的抽象,Trident最大的特色以batch的形式處理stream。      一些最基本的操做函數有Filter、Function,Filter能夠過濾掉tuple,Function能夠修改tuple內容,輸出0或多個tuple,並能把新增的字段追加到tuple後面。      聚合有partitionAggregate和Aggregator接口。partitionAggregate對當前partition中的tuple進行聚合,它不是重定向操做。Aggregator有三個接口:CombinerAggregator, ReducerAggregator,Aggregator,它們屬於重定向操做,它們會把stream重定向到一個partition中進行聚合操做。      重定向操做會改變數據流向,但不會改變數據內容,重定向操會產生網絡傳輸,可能影響一部分效率。而Filter、Function、partitionAggregate則屬於本地操做,不會產生網絡傳輸。      GroupBy會根據指定字段,把整個stream切分紅一個個grouped stream,若是在grouped stream上作聚合操做,那麼聚合就會發生在這些grouped stream上而不是整個batch。若是groupBy後面跟的是aggregator,則是聚合操做,若是跟的是partitionAggregate,則不是聚合操做。
相關文章
相關標籤/搜索