public static class PerActorTweetsFilter extends BaseFilter { String actor; public PerActorTweetsFilter(String actor) { this.actor = actor; } @Override public boolean isKeep(TridentTuple tuple) { return tuple.getString(0).equals(actor); } }
Topology: java
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("actor", "text"), new Utils.PrintFilter());
從上面例子看到,each()方法有一些構造參數:
第一個構造參數:做爲Field Selector,一個tuple可能有不少字段,經過設置Field,咱們能夠隱藏其它字段,僅僅接收指定的字段(其它字段實際還在)。 node
public static class UppercaseFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { collector.emit(new Values(tuple.getString(0).toUpperCase())); } }
Topology: 算法
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text")) .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());
首先,UppercaseFunction函數的輸入是Fields("text", "actor"),其做用是把其中的"text"字段內容都變成大寫。 數據庫
其次,它比Filter多出一個輸出字段,做用是每一個tuple在通過這個Function函數處理後,輸出字段都會被追加到tuple後面,在本例中,執行完Function以後的tuple內容多了一個"uppercased_text",而且這個字段排在最後面。public static class PerActorTweetsFilter extends BaseFilter { private int partitionIndex; private String actor; public PerActorTweetsFilter(String actor) { this.actor = actor; } @Override public void prepare(Map conf, TridentOperationContext context) { this.partitionIndex = context.getPartitionIndex(); } @Override public boolean isKeep(TridentTuple tuple) { boolean filter = tuple.getString(0).equals(actor); if (filter) { System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor); } return filter; } }
Topology: 網絡
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
若是咱們指定執行Filter任務的線程數量爲5,那麼最終的執行結果會如何呢?看一下咱們的測試結果: 框架
I am partition [4] and I have kept a tweet by: davetopology.newStream("spout", spout).parallelismHint(2).shuffle() .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
2.2.2 partitionBy()和重定向操做(repartitioning operation) 分佈式
咱們注意到上面的例子中用到了shuffle(),shuffle()是一個重定向操做。那什麼是重定向操做呢?重定向定義了咱們的tuple如何被route到下一處理層,固然不一樣的層之間可能會有不一樣的並行度,shuffle()的做用是把tuple隨機的route下一層的線程中,而partitionBy()則根據咱們的指定字段按照一致性哈希算法route到下一層的線程中,也就是說,若是咱們用partitionBy()的話,同一個字段名的tuple會被route到同一個線程中。public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> { @Override public Map<String, Integer> init(Object batchId, TridentCollector collector) { return new HashMap<String, Integer>(); } @Override public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { String location = tuple.getString(0); val.put(location, MapUtils.getInteger(val, location, 0) + 1); } @Override public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } }
Topology: ide
topology.newStream("spout", spout) .aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")) .each(new Fields("location_counts"), new Utils.PrintFilter());
這個aggregator很簡單:計算每個batch的location的數量。經過這個例子咱們能夠看到Aggregator接口: 函數
init(): 當剛開始接收到一個batch時執行topology.newStream("spout", spout) .partitionBy(new Fields("location")) .partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3) .each(new Fields("location_counts"), new Utils.PrintFilter());
咱們一塊兒來分析一下,首先partitionBy()方法將tuples按其location字段重定向到下一處理邏輯,並且相同location字段的tuple必定會被分配到同一個線程中處理。其次,partitionAggregate()方法,注意它與Aggregate不一樣,它不是一個重定向方法,它僅僅是對當前partition上的各個batch執行聚合操做。由於咱們根據location進行了重定向操做,測試數據一共有4個location,而當前一共有3個partition,所以能夠猜想咱們的最終測試結果中,有一個partition會處理兩個location的batch,最終測試結果以下: 測試
[{France=10, Spain=5}]topology.newStream("spout", spout) .groupBy(new Fields("location")) .aggregate(new Fields("location"), new Count(), new Fields("count")) .each(new Fields("location", "count"), new Utils.PrintFilter());
咱們先看一下執行的結果:
[France, 25]