storm trident

簡介

Storm是一個實時流計算框架,Trident是對storm的一個更高層次的抽象,Trident最大的特色以batch的形式處理stream。java

     一些最基本的操做函數有Filter、Function,Filter能夠過濾掉tuple,Function能夠修改tuple內容,輸出0或多個tuple,並能把新增的字段追加到tuple後面。算法

     聚合有partitionAggregate和Aggregator接口。partitionAggregate對當前partition中的tuple進行聚合,它不是重定向操做。Aggregator有三個接口:CombinerAggregator, ReducerAggregator,Aggregator,它們屬於重定向操做,它們會把stream重定向到一個partition中進行聚合操做。sql

     重定向操做會改變數據流向,但不會改變數據內容,重定向操會產生網絡傳輸,可能影響一部分效率。而Filter、Function、partitionAggregate則屬於本地操做,不會產生網絡傳輸。數據庫

     GroupBy會根據指定字段,把整個stream切分紅一個個grouped stream,若是在grouped stream上作聚合操做,那麼聚合就會發生在這些grouped stream上而不是整個batch。若是groupBy後面跟的是aggregator,則是聚合操做,若是跟的是partitionAggregate,則不是聚合操做。api

Trident主要有5類操做:網絡

一、做用在本地的操做,不產生網絡傳輸。併發

二、對數據流的重分佈,不改變流的內容,可是產生網絡傳輸。框架

三、聚合操做,有可能產生網絡傳輸。ide

四、做用在分組流(grouped streams)上的操做。函數

五、Merge和join

partition

概念

partition中文意思是分區,有人將partition理解爲Storm裏面的task,即併發的基本執行單位。我理解應該是像數據庫裏面的分區,是將一個batch的數據分區,分紅多個partition,或者能夠理解爲多個子batch,而後多個partition能夠併發處理。這裏關鍵的區別是:partition是數據,不是執行的代碼。你把數據(tuple)分區之後,若是你沒有多個task(併發度)來處理這些分區後的數據,那分區也是沒有做用的。因此這裏的關係是這樣的:先有batch,由於Trident內部是基於batch來實現的;而後有partition;分區後再分配併發度,而後才能進行併發處理。併發度的分配是利用parallelismHint來實現的。

操做

既然有partition的概念,那麼也就有partition的操做。Trident提供的分區操做,相似於Storm裏面講的grouping。分區操做有:

重分區操做經過運行一個函數改變元組在任務之間的分佈,也能夠調整分區的數量(好比重分區以後將並行度調大),重分區須要網絡傳輸的參與。重分區函數包含如下這幾個:

  1. shuffle:使用隨機輪詢算法在全部目標分區間均勻分配元組;
  2. broadcast:每一個元組複製到全部的目標分區。這在DRPC中很是有用,例如,須要對每一個分區的數據作一個stateQuery操做;
  3. partitionBy:接收一些輸入字段,根據這些字段輸入字段進行語義分區。經過對字段取hash值或者取模來選擇目標分區。partitionBy保證相同的字段必定被分配到相同的目標分區;
  4. global:全部的元組分配到相同的分區,該分區是流種全部batch決定的;
  5. batchGlobal:同一個batch中的元組被分配到相同的目標分區,不一樣batch的元組有可能被分配到不一樣的目標分區;
  6. partition:接收一個自定義的分區函數,自定義分區函數須要實現backtype.storm.grouping.CustomStreamGrouping接口。

注意,除了這裏明確提出來的分區操做,Trident裏面還有aggregate()函數隱含有分區的操做,它用的是global()操做,這個在後面接收聚合操做的時候還會再介紹。

API

each() 方法

     做用:操做batch中的每個tuple內容,通常與Filter或者Function函數配合使用。

     下面經過一個例子來介紹each()方法,假設咱們有一個FakeTweetsBatchSpout,它會模擬一個Stream,隨機產生一個個消息。咱們能夠經過設置這個Spout類的構造參數來改變這個Spout的batch Size的大小。

    1.Filter類:過濾tuple

     一個經過actor字段過濾消息的Filter:

public static class PerActorTweetsFilter extends BaseFilter {
  String actor;

  public PerActorTweetsFilter(String actor) {
    this.actor = actor;
  }
  @Override
  public boolean isKeep(TridentTuple tuple) {
    return tuple.getString(0).equals(actor);
  }
}

Topology:

topology.newStream("spout", spout)
  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  .each(new Fields("actor", "text"), new Utils.PrintFilter());

從上面例子看到,each()方法有一些構造參數

  • 第一個構造參數:做爲Field Selector,一個tuple可能有不少字段,經過設置Field,咱們能夠隱藏其它字段,僅僅接收指定的字段(其它字段實際還在)。
  • 第二個是一個Filter:用來過濾掉除actor名叫"dave"外的其它消息。

     2.Function類:加工處理tuple內容

     一個能把tuple中text內容變成大寫的Function:

public static class UppercaseFunction extends BaseFunction {
  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
    collector.emit(new Values(tuple.getString(0).toUpperCase()));
  }
}

Topology:

topology.newStream("spout", spout)
  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
  .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

首先,UppercaseFunction函數的輸入是Fields("text", "actor"),其做用是把其中的"text"字段內容都變成大寫。

  其次,它比Filter多出一個輸出字段,做用是每一個tuple在通過這個Function函數處理後,輸出字段都會被追加到tuple後面,在本例中,執行完Function以後的tuple內容多了一個"uppercased_text",而且這個字段排在最後面。

    3. Field Selector與project

   咱們須要注意的是,上面每一個each()方法的第一個Field字段僅僅是隱藏掉沒有指定的字段內容,實際上被隱藏的字段依然還在tuple中,若是想要完全丟掉它們,咱們就須要用到project()方法。

   投影操做做用是僅保留Stream指定字段的數據,好比有一個Stream包含以下字段: [「a」, 「b」, 「c」, 「d」],運行以下代碼:

mystream.project(new Fields("b", "d"))

則輸出的流僅包含 [「b」, 「d」]字段。

aggregation的介紹

首先聚合操做分兩種:partitionAggregate(),以及aggregate()。

    1.partitionAggregate

partitionAggregate()的操做是在partition上,一個batch的tuple被分紅多個partition後,每一個partition都會單獨運行partitionAggregate中指定的聚合操做。分區聚合在一批tuple的每個分區上運行一個函數。與函數不一樣的是,分區聚合的輸出元組會覆蓋掉輸入元組。請看以下示例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假設你有一個包含a,b兩個字段的輸入流,元組的分區狀況以下:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

運行上面的那一行代碼將會輸出以下的元組,這些元組只包含一個sum字段: 

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

    2.aggregate

aggregate()隱含了一個global分區操做,也就是它作的是全局聚合操做。它針對的是整個batch的聚合計算。

這兩種聚合操做,均可以傳入不一樣的aggregator實現具體的聚合任務。Trident中有三種aggregator接口,分別爲:ReducerAggregator,CombinerAggregator,Aggregator。

下面是CombinerAggregator接口的定義:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

CombinerAggregator返回只有一個字段的一個元組。CombinerAggregator在每一個輸入元組上運行init函數,而後經過combine函數聚合結果值直到只剩下一個元組。若是分區中沒有任何元組,CombinerAggregator將返回zero函數中定義的元組。好比,下面是Count聚合器的實現:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

ReducerAggregator接口的定義以下:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator經過init函數獲得一個初始的值,而後對每一個輸入元組調用reduce方法計算值,產生一個元組做爲輸出。好比Count的ReducerAggregator實現以下:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

最經常使用的聚合器的接口是Aggregator,它的定義以下:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregator可以發射任意數量,任意字段的元組。而且能夠在執行期間的任什麼時候候發射元組,它的執行流程以下:

  1. 處理batch以前調用init方法,init函數的返回值是一個表示聚合狀態的對象,該對象會傳遞到aggregate和complete函數;
  2. 每一個在batch分區中的元組都會調用aggregate方法,該方法可以更新聚合狀態而且發射元組;
  3. 當batch分區中的全部元組都被aggregate函數處理完時調用complete函數。

下面是使用Aggregator接口實現的Count聚合器:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

有些時候,咱們須要通知執行不少個聚合器,則可使用以下的鏈式調用執行:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

上面的代碼將會在每個分區執行Count和Sum聚合器,輸出結果是包含count和sum兩個字段的元組。

最重要的區別是CombinerAggregator,它是先在partition上作partial aggregate,而後再將這些部分聚合結果經過global分區到一個總的分區,在這個總的分區上對結果進行彙總。

groupBy()分組操做

首先它包含兩個操做,一個是分區操做,一個是分組操做。

若是後面是partitionAggregate()的話,就只有分組操做:在每一個partition上分組,分完組後,在每一個分組上進行聚合;

若是後面是aggregate()的話,先根據partitionBy分區,在每一個partition上分組,,分完組後,在每一個分組上進行聚合。

parallelismHint併發度的介紹

它設置它前面全部操做的併發度,直到遇到某個repartition操做爲止。

topology.newStream("spout", spout)
      .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
      .parallelismHint(5)
      .each(new Fields("actor", "text"), new Utils.PrintFilter());

意味着:parallelismHit以前的spout,each都是5個相同的操做一塊兒併發,對,一共有5個spout同時發射數據,其實parallelismHint後面的each操做,也是5個併發。分區操做是做爲Bolt劃分的分界點的。

若是想單獨設置Spout怎麼辦?要在Spout以後,Bolt以前增長一個ParallelismHint,而且還要增長一個分區操做:

topology.newStream("spout", spout)
	  .parallelismHint(2)
	  .shuffle()
	  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	  .parallelismHint(5)
	  .each(new Fields("actor", "text"), new Utils.PrintFilter());

不少人只是設置了Spout的併發度,而沒有調用分區操做,這樣是達不到效果的,由於Trident是不會自動進行分區操做的。像我以前介紹的,先分區,再設置併發度。若是Spout不設置併發度,只設置shuffle,默認是1個併發度,這樣後面設置5個併發度不會影響到Spout,由於併發度的影響到shuffle分區操做就中止了。

例子

groupBy+aggregate+parallelismHint

package com.demo;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;



public class MyAgg extends BaseAggregator<Map<String, Integer>> {
	

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	/**
	 * 屬於哪一個分區
	 */
	private int partitionId;

	/**
	 * 分區數量
	 */
	private int numPartitions;
	private String batchId;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
		
	}

	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		String word = tuple.getString(0);
		Integer value = val.get(word);
		if (value == null) {
			value = 0;
		}
		value++;
		// 把數據保存到一個map對象中
		val.put(word, value);
		System.err.println("I am partition [" + partitionId
				+ "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId);
	}

	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}

	public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
		this.batchId = arg0.toString();
		return new HashMap<String, Integer>();
	}

}
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

groupBy+partitionAggregate+parallelismHint

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.groupBy(new Fields("sentence"))
				.partitionAggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map")))
				.toStream()
			    .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [1] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

因爲shuffle已經把tuple平均分配給5個partition了,用groupBy+partitionAggregate來聚合又沒有partitionBy分區的做用,因此,直接在5個分區上進行聚合,結果就是每一個分區各有一個tuple。

而用groupBy+aggregate,雖然也是shuffle,可是因爲具備partitiononBy分區的做用,值相同的tuple都分配到同一個分區,結果就是每一個分區根據不一樣的值來作匯聚。

aggregate+parallelismHint(沒有groupBy)

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.aggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
			    .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [1] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

partitionAggregate+parallelismHint(沒有groupBy操做)

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
				new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));
		spout.setCycle(false);
		TridentTopology tridentTopology = new TridentTopology();
		tridentTopology
				.newStream("spout", spout)
				.shuffle()
				.partitionAggregate(new Fields("sentence"), new MyAgg(),
						new Fields("Map"))
				.toStream()
			    .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0
I am partition [0] and I have kept a tweet by: 2 a 1:0
I am partition [1] and I have kept a tweet by: 2 a 2:0
I am partition [0] and I have kept a tweet by: 2 d 2:0
I am partition [0] and I have kept a tweet by: 2 e 3:0
I am partition [1] and I have kept a tweet by: 2 f 3:0

咱們能夠發現,partitionAggregate加上groupBy,或者不加上groupBy,對結果都同樣:groupBy對於partitionAggregate沒有影響。可是對於aggregate來講,加上groupBy,就不是作全局聚合了,而是對分組作聚合;不加上groupBy,就是作全局聚合。

若是spout設置並行度,可是沒有加shuffle,不會起做用,分區默認爲1,;若是不設置並行度而且沒有加shuffle,分區默認爲1。

Merge和Joins

api的最後一部分即是如何把各類流匯聚到一塊兒。最簡單的方式就是把這些流匯聚成一個流。咱們能夠這麼作:   

topology.merge(stream1, stream2, stream3);

Trident指定新的合併以後的流中的字段爲stream1中的字段。
另外一種合併流的方式就是join。一個標準的join就像是一個sql,必須有標準的輸入,所以,join只針對符合條件的Stream。join應用在來自Spout的每個小Batch中。

下面的例子中,stream1流包含key,val1,val2三個字段,stream2流包含x,val1兩個字段:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

stream1流的key字段與stream2流的x字段組join操做,另外,Trident要求全部新流的輸出字段被重命名,由於輸入流可能包含相同的字段名稱。鏈接流發射的元組將會包含:

  1. 鏈接字段的列表。在上面的例子中,字段key對應stream1的key,stream2的x;
  2. 來自全部流的全部非鏈接字段的列表,按照傳遞到鏈接方法的順序排序。在上面的例子中,字段a與字段b對應stream1的val1和val2,c對應於stream2的val1.

     當join的是來源於不一樣Spout的stream時,這些Spout在發射數據時須要同步,一個Batch所包含的tuple會來自各個Spout。   

相關文章
相關標籤/搜索