Trident API(翻譯)

Trident API Overview

 

Trident 的核心數據模型是「流」(Stream),進行數據處理的時候,將數據做爲一系列的batch(批)來進行。流被分割成多個partition分佈在集羣中的不一樣節點上來運行,並且對流的操做也是在流的各個partition上並行運行的。html

Trident 中有五類操做:java

  • 針對每一個小分區(partition)的本地操做,這類操做不會產生網絡數據傳輸(each、map、faltmap、partitionAggregate等)
  • 針對一個數據流的從新分區操做,這類操做不會改變數據流中的內容,可是會產生必定的網絡傳輸(shuffle、partition等)
  • 經過網絡數據傳輸進行的聚合操做(Aggregate)
  • 針對數據流的分組操做(groupBy)
  • 融合與聯結操做(merge、join)

Partition-local operations(本地分區操做)

本地分區操做是在每一個batch partition上獨立運行的操做,其中不涉及網絡數據傳輸。git

Functions

Functions函數負責接收一個input fields的集合並選擇輸出更多的tuple或者不輸出tuple。輸出tuple的fields會被添加到原始數據流的輸入域中。若是一個function不輸出tuple,那麼原始的輸入tuple就會被直接過濾掉。不然,每一個輸出 tuple 都會複製一份輸入tuple。假設你有下面這樣的函數:github

 

public class MyFunction extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }
}

假設你有一個名爲 「mystream」 的數據流,這個流中包含下面幾個 tuple,每一個 tuple 中包含有 "a"、"b"、"c" 三個域:算法

[1, 2, 3] 
[4, 1, 6] 
[3, 0, 8]

若是你運行這段代碼:apache

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

那麼最終輸出的結果 tuple 就會包含有 "a"、"b"、"c"、"d"4 個域,就像下面這樣:windows

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]

Filters

過濾器負責判斷輸入的 tuple 是否須要保留。如下面的過濾器爲例:網絡

public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
    }
}

經過使用這段代碼:併發

mystream.each(new Fields("b", "a"), new MyFilter())

就能夠將下面這樣帶有 "a"、"b"、"c"三個域的 tupleapp

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

最終轉化成這樣的結果 tuple:

[2, 1, 1]

map and flatMap

map對stream中的tuple應用map函數,並返回結果流。這能夠用於對tuples進行one-one(一對一)的轉換(transformation)操做。

舉例,若是你想將一個stream中的單詞轉換成大寫,你能夠定義一個mapping函數,以下:

 

public class UpperCase extends MapFunction {
 @Override
 public Values execute(TridentTuple input) {
   return new Values(input.getString(0).toUpperCase());
 }
}

 

mapping函數應用到stream上並生成一個由大寫單詞組成的stream。

mystream.map(new UpperCase());


flatMap與map相似,可是被用來對stream中的values進行one-to-many(一對多)操做,而後會將resulting elements(結果元素)flatten平壓至一個新的stream中。

public class Split extends FlatMapFunction {
  @Override
  public Iterable<Values> execute(TridentTuple input) {
    List<Values> valuesList = new ArrayList<>();
    for (String word : input.getString(0).split(" ")) {
      valuesList.add(new Values(word));
    }
    return valuesList;
  }
}

flatMap函數被應用在一個句子stream中,生成一個單詞stream。

mystream.flatMap(new Split())

固然這些操做能夠被鏈接在一塊兒,能夠從一個sentences stream中得到一個大寫單詞的stream

mystream.flatMap(new Split()).map(new UpperCase())

peek

peek能夠用於在每一個trident tuples流經stream時執行附加的操做(主要是爲了輸出一些信息,並不改變tuples或者fields)。這對於調試查看在管道中某個點上的tuples是有用的。

舉例,接下來的代碼將打印將單詞轉換成大寫單詞後groupBy操做的結果。

mystream.flatMap(new Split()).map(new UpperCase()).peek(new Consumer() {
	@Override
	public void accept(TridentTuple input) {
		System.out.println(input.getString(0));
	}
}).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),
		new Fields("count"));

min and minBy

min和minBy操做將返回在trident stream中一個batch of tuples的每一個partition的最小值。

假設,一個trident流中包含fields["device-id", "count"],下面是每一個partition of tuples。

Partition 0:
[123, 2]
[113, 54]
[23,  28]
[237, 37]
[12,  23]
[62,  17]
[98,  42]

Partition 1:
[64,  18]
[72,  54]
[2,   28]
[742, 71]
[98,  45]
[62,  12]
[19,  174]


Partition 2:
[27,  94]
[82,  23]
[9,   86]
[53,  71]
[74,  37]
[51,  49]
[37,  98]

 

minBy操做講對每一個partition中的fields named count取最小值,而且輸出這個最小值的tuple。

mystream.minBy(new Fields("count"))

上面代碼執行後各partition的結果:

Partition 0:
[123, 2]


Partition 1:
[62,  12]


Partition 2:
[82,  23]

你能夠看在org.apache.storm.trident.Stream類中查看min和minBy操做。

相關例子能夠從下面的連接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology

max and maxBy

max和maxBy操做將返回在trident stream中一個batch of tuples的每一個partition的最大值。

假設,一個trident流中包含fields["device-id", "count"],下面是每一個partition of tuples。

maxBy操做講對每一個partition中的fields named count取最大值,而且輸出這個最大值的tuple。

mystream.maxBy(new Fields("count"))

上面代碼執行後各partition的結果:

Partition 0:
[113, 54]


Partition 1:
[19,  174]


Partition 2:
[37,  98]

你能夠看在org.apache.storm.trident.Stream類中查看max和maxBy操做。

相關例子能夠從下面的連接查看  TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology

Windowing

相關window分類可參照window頁面。

Trident流能夠處理在某個相同學口中batch的tuples,並將聚合結果發送給下一個操做。有兩種windowing,分別是基於processing時間或者tuples數量:1.Tumbling window翻滾窗口2。Sliding window滑動窗口

Tumbling window

元組根據processing時間或者tuples數量分組在某個窗口中。任何元組只屬於一個窗口。

Sliding window

Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window.

在每一個滑動間隔中,元組在窗口和窗口滑動中分組。一個元組能夠屬於多個窗口。

Common windowing API

通用windowing API:

 

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)

windowConfig定義窗口的屬性:window length和window sliding length。

WindowsStoreFactory用來儲存接受到的tuples,而且聚合values。

partitionAggregate

partitionAggregate會在一個batch of tuples的每一個partition上執行function。與上面的函數不一樣,由partitionAggregate發送出的tuples會將替換輸入tuples。如下面這段代碼爲例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));


假如輸入流中包含有 "a"、"b" 兩個域而且有如下幾個tuple塊:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]


通過上面的代碼以後,輸出就會變成帶有一個名爲 "sum"的域的數據流,其中的tuple就是這樣的:

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]


Storm 有三個用於定義聚合器的接口:CombinerAggregatorReducerAggregator 以及 Aggregator

這是 CombinerAggregator 接口,整個CombinerAggregator<T>會在每批次結束時將combine的結果作一次emit:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);//每條tuple調用一次,對tuple作預處理。
    T combine(T val1, T val2);//每條tuple調用一次,和以前的聚合值作combine。若是是partition中沒有tuple則返回zero值做爲combine的結果。
    T zero();//當partition中沒有數據流時,處理邏輯。
}

 

CombinerAggregator 會將帶有一個field的一個單獨的tuple返回做爲輸出。CombinerAggregator會在每一個輸入tuple上運行初始化函數init,而後使用組合函數來組合全部輸入的值。若是在某個分區中沒有 tuple, CombinerAggregator 就會輸出zero 方法的結果。例如,下面是 Count 的實現代碼:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

 

若是你使用aggregate方法來代替partitionAggregate方法,你就會發現CombinerAggregators的好處了。在這種狀況下,Trident會在發送tuple以前經過分區聚合操做來優化計算過程。

ReducerAggregator的接口:

public interface ReducerAggregator<T> extends Serializable {
    T init();//用來初始化reduce函數中的參數值curr。執行一次
    T reduce(T curr, TridentTuple tuple);//每條tuple調用1次,與curr進行聚合操做。
}

整個ReducerAggregator<T>會在每batch結束時將reduce的結果作一次emit。

ReducerAggregator會使用init方法來產生一個初始化的值,而後使用該值對每一個輸入tuple進行遍歷,並最終生成並輸出一個單獨的tuple,這個tuple中就包含有咱們須要的計算結果值。例如,下面是將Count定義爲ReducerAggregator的代碼:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}


ReducerAggregator 一樣能夠用於 persistentAggregate,你會在後面看到這一點。

最經常使用的聚合器接口仍是下面的 Aggregator接口:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}


其中父接口Operation 還有兩個方法

public interface Operation extends Serializable {
    void prepare(Map conf, TridentOperationContext context);
    void cleanup();
}


Aggregator<T>接口,實現了上面五個方法:

  •  prepare:只在啓動topolopy時調用1次,若是設置了併發度,則在每個partition中調用一次;
  •  cleanup:只在正常關閉topolopy時調用1次,若是設置了併發度,則在每個partition中調用1次;
  •  init:對於global aggregation來講,每一個批次調用1次。若是使用的時partitionAggregate則每一個批次的每個partition調用一次。對於Group Streams來講,每一個相同的key組成的數據流調用一次。須要注意的是,若是使用的是事務型的spout,同時某個批次處理失敗致使該批次消息從新發送,則在接下來處理時,initu有可能調用屢次,因此init裏面代碼邏輯要支持同一批的重複調用。
  •  aggregate:每一個tuple調用1次;
  •  complete:對於global aggregation來講,每一個批次調用一次。若是使用的是partitionAggregate,則每個批次的每個partition 調用1次。對於Grouped Streams來講,每一個相同的key組成的數據流調用1次。

 

須要特別注意的是:當使用沒有group by 的Aggregator或者ReducerAggregation計算global aggretation時,每一個batch的數據流只能在1個partition(至關於storm的task)中執行,即便設置了parallelismHint的併發數n>1,實際上也只能輪循的叫不一樣批次aggregation執行,也就至關於串行執行,因此反而浪費了資源。

使用aggregation作global aggregation沒法啓動併發,可是當配合CombinerAggregator<T>時候能夠,Trident會把拓撲自動拆分紅2個bolt,第一個bolt作局部聚合,相似於Hadoop中的map;第二個bolt經過接收網絡傳輸過來的局部聚合值最後作一個全局聚合。自動優化後的第一個bolt是本地化操做,所以它能夠和它前面或者後面挨着的全部each合併在同一個bolt裏面。

 

 

trident.newStream(「trident_spout」, new MySpout())  
	.partitionAggregate(new MyAggregator(), new Fields(「testoutput1」))  
	.parallelismHint(5)  
	.aggregate(new Fields(「out1」), new MyAggregator(), new Fields(「testoutput2」));


parallelismHint(n)要寫在aggregate的前面,若是寫在aggregate後面,將致使本地化操做的第一個bolt的併發度爲1,而全局聚合的第二個bolt的併發度爲n,而實際上第二個bolt並不能真正開啓併發,只是前面提到的輪循而已。

把parallelismHint(n)寫在aggregate的前面會致使spout同時開啓n的併發度,所以要注意本身實現的spout類是否支持併發發送。

 

 下面是使用Aggregator來進行Count的一個實現

 

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

某些時候你須要同時計算multiple aggregators時,使用以下的方式進行鏈接:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

 這段代碼會在每一個分區上分別執行 Count 和 Sum 聚合器,而輸出中只會包含一個帶有 ["count", "sum"] 域的單獨的 tuple。

stateQuery and partitionPersist

stateQuery 與 partitionPersist 會分別查詢、更新 state 數據源。你能夠參考 Trident State 文檔 來了解如何使用它們。

projection

projection 方法只會保留操做中指定的域。若是你有一個帶有 ["a", "b", "c", "d"] 域的數據流,經過執行這段代碼:

mystream.project(new Fields("b", "d"))

就會使得輸出數據流中只包含有 ["b","d"] 域。

Repartitioning operations

重分區操做會執行一個用來改變在不一樣的任務間分配 tuple 的方式的函數。在重分區的過程當中分區的數量也可能會發生變化(例如,重分區以後的並行度就有可能會增大)。重分區會產生必定的網絡數據傳輸。下面是重分區操做的幾個函數:

  1. shuffle:經過隨機輪詢算法來從新分配目標區塊的全部 tuple。
  2. broadcast:每一個 tuple 都會被複制到全部的目標區塊中。這個函數在 DRPC 中頗有用 —— 好比,你可使用這個函數來獲取每一個區塊數據的查詢結果。
  3. partitionBy:該函數會接收一組域做爲參數,並根據這些域來進行分區操做。能夠經過對這些域進行哈希化,並對目標分區的數量取模的方法來選取目標區塊。partitionBy 函數可以保證來自同一組域的結果總會被髮送到相同的目標區間。
  4. global:這種方式下全部的 tuple 都會被髮送到同一個目標分區中,並且數據流中的全部的塊都會由這個分區處理。
  5. batchGlobal:同一個 batch 塊中的全部 tuple 會被髮送到同一個區塊中。固然,在數據流中的不一樣區塊仍然會分配到不一樣的區塊中。
  6. partition:這個函數使用自定義的分區方法,該方法會實現 org.apache.storm.grouping.CustomStreamGrouping 接口。

Aggregation operations

Trident 使用 aggregate 方法和 persistentAggregate 方法來對數據流進行聚類操做。其中,aggregate 方法會分別對數據流中的每一個 batch 進行處理,而 persistentAggregate 方法則會對數據流中的全部 batch 執行聚類處理,並將結果存入某個 state 中。

在數據流上執行 aggregate 方法會執行一個全局的聚類操做。在你使用 ReducerAggregator 或者 Aggregator 時,數據流首先會被從新分區成一個單獨的分區,而後聚類函數就會在該分區上執行操做。而在你使用 CombinerAggregator 時,Trident 首先會計算每一個分區的部分聚類結果,而後將這些結果重分區到一個單獨的分區中,最後在網絡數據傳輸完成以後結束這個聚類過程。CombinerAggregator 比其餘的聚合器的運行效率更高,在聚類時應該儘量使用CombinerAggregator

下面是一個使用 aggregate 來獲取一個 batch 的全局計數值的例子:

mystream.aggregate(new Count(), new Fields("count"))

與 partitionAggregate同樣,aggregate的聚合器也能夠進行鏈式處理。然而,若是你在一個處理鏈中同時使用了CombinerAggregator 和non-CombinerAggregator,Trident 就不能對部分聚類操做進行優化了。

想要了解更多使用 persistentAggregate 的方法,能夠參考 Trident State 文檔 一文。

Operations on grouped streams

經過對指定的域執行 partitionBy 操做,groupBy 操做能夠將數據流進行重分區,使得相同的域的 tuple 分組能夠彙集在一塊兒。例如,下面是一個 groupBy 操做的示例:

若是你在分組數據流上執行聚合操做,聚合器會在每一個分組(而不是整個區塊)上運行。persistentAggregate 一樣能夠在一個分組數據裏上運行,這種狀況下聚合結果會存儲在 MapState 中,其中的 key 就是分組的域名。

和其餘操做同樣,對分組數據流的聚合操做也能夠以鏈式的方式執行。

Merges and joins

Trident API 的最後一部分是聯結不一樣的數據流的操做。聯結數據流最簡單的方式就是將全部的數據流融合到一個流中。你可使用 TridentTopology 的 merge 方法實現該操做,好比這樣:

topology.merge(stream1, stream2, stream3);

Trident 會將融合後的新數據流的域命名爲爲第一個數據流的輸出域。

聯結數據流的另一種方法是使用 join。像 SQL 那樣的標準 join 操做只能用於有限的輸入數據集,對於無限的數據集就沒有用武之地了。Trident 中的 join 只會應用於每一個從 spout 中輸出的小 batch。

下面是兩個流的 join 操做的示例,其中一個流含有 [「key」, 「val1」, 「val2」] 域,另一個流含有 [「x」, 「val1」] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上面的例子會使用 "key" 和 "x" 做爲 join 的域來聯結 stream1 和 stream2。Trident 要求先定義好新流的輸出域,由於輸入流的域可能會覆蓋新流的域名。從 join 中輸出的 tuple 中會包含:

  1. join 域的列表。在這個例子裏,輸出的 "key" 域與 stream1 的 "key" 域以及 stream2 的 "x" 域對應。
  2. 來自全部流的非 join 域的列表。這個列表是按照傳入 join 方法的流的順序排列的。在這個例子裏,"a" 和 "b" 域與 stream1 的 "val1" 和 "val2" 域對應;而 "c" 域則與 stream2 的 「val1」 域相對應。

在對不一樣的 spout 發送出的流進行 join 時,這些 spout 上會按照他們發送 batch 的方式進行同步處理。也就是說,一個處理中的 batch 中含有每一個 spout 發送出的 tuple。

到這裏你大概仍然會對如何進行窗口 join 操做感到困惑。窗口操做(包括平滑窗口、滾動窗口等 —— 譯者注)主要是指將當前的 tuple 與過去若干小時時間段內的 tuple 聯結起來的過程。

你可使用 partitionPersist 和 stateQuery 來實現這個過程。過去一段時間內的 tuple 會以 join 域爲關鍵字被保存到一個 state 源中。而後就可使用 stateQuery 查詢 join 域來實現這個「聯結」(join)的過程。

相關文章
相關標籤/搜索