[case44]聊聊storm trident的operations

本文主要研究一下storm trident的operationshtml

function filter projection

Function

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.javajava

public interface Function extends EachOperation {
    /**
     * Performs the function logic on an individual tuple and emits 0 or more tuples.
     *
     * @param tuple The incoming tuple
     * @param collector A collector instance that can be used to emit tuples
     */
    void execute(TridentTuple tuple, TridentCollector collector);
}
  • Function定義了execute方法,它發射的字段會追加到input tuple中

Filter

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Filter.javaredis

public interface Filter extends EachOperation {

    /**
     * Determines if a tuple should be filtered out of a stream
     *
     * @param tuple the tuple being evaluated
     * @return `false` to drop the tuple, `true` to keep the tuple
     */
    boolean isKeep(TridentTuple tuple);
}
  • Filter提供一個isKeep方法,用來決定該tuple是否輸出

projection

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javaapache

/**
     * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
     *
     * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
     *
     * ```java
     * mystream.project(new Fields("b", "d"))
     * ```
     *
     * would produce a stream containing only the fields `["b", "d"]`.
     *
     *
     * @param keepFields The fields in the Stream to keep
     * @return
     */
    public Stream project(Fields keepFields) {
        projectionValidation(keepFields);
        return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
    }
  • 這裏使用了ProjectedProcessor來進行projection操做

repartitioning operations

partition

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java網絡

/**
     * ## Repartitioning Operation
     *
     * @param partitioner
     * @return
     */
    public Stream partition(CustomStreamGrouping partitioner) {
        return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
    }
  • 這裏使用了CustomStreamGrouping

partitionBy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javadom

/**
     * ## Repartitioning Operation
     *
     * @param fields
     * @return
     */
    public Stream partitionBy(Fields fields) {
        projectionValidation(fields);
        return partition(Grouping.fields(fields.toList()));
    }
  • 這裏使用Grouping.fields

identityPartition

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javaide

/**
     * ## Repartitioning Operation
     *
     * @return
     */
    public Stream identityPartition() {
        return partition(new IdentityGrouping());
    }
  • 這裏使用IdentityGrouping

shuffle

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java性能

/**
     * ## Repartitioning Operation
     *
     * Use random round robin algorithm to evenly redistribute tuples across all target partitions
     *
     * @return
     */
    public Stream shuffle() {
        return partition(Grouping.shuffle(new NullStruct()));
    }
  • 這裏使用Grouping.shuffle

localOrShuffle

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java優化

/**
     * ## Repartitioning Operation
     *
     * Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
     * for local tasks.
     *
     * @return
     */
    public Stream localOrShuffle() {
        return partition(Grouping.local_or_shuffle(new NullStruct()));
    }
  • 這裏使用Grouping.local_or_shuffle

global

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javaui

/**
     * ## Repartitioning Operation
     *
     * All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
     * @return
     */
    public Stream global() {
        // use this instead of storm's built in one so that we can specify a singleemitbatchtopartition
        // without knowledge of storm's internals
        return partition(new GlobalGrouping());
    }
  • 這裏使用GlobalGrouping

batchGlobal

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

/**
     * ## Repartitioning Operation
     *
     *  All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
     *  partitions.
     *
     * @return
     */
    public Stream batchGlobal() {
        // the first field is the batch id
        return partition(new IndexHashGrouping(0));
    }
  • 這裏使用IndexHashGrouping,是對整個batch維度的repartition

broadcast

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

/**
     * ## Repartitioning Operation
     *
     * Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
     * a stateQuery on every partition of data.
     *
     * @return
     */
    public Stream broadcast() {
        return partition(Grouping.all(new NullStruct()));
    }
  • 這裏使用Grouping.all

groupBy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

/**
     * ## Grouping Operation
     *
     * @param fields
     * @return
     */
    public GroupedStream groupBy(Fields fields) {
        projectionValidation(fields);
        return new GroupedStream(this, fields);
    }
  • 這裏返回的是GroupedStream

aggregators

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

//partition aggregate
    public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .partitionAggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .partitionAggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    //aggregate
    public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .aggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .aggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
                .aggregate(inputFields, agg, functionFields)
                .chainEnd();
    }

    //persistent aggregate
    public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(spec, null, agg, functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        // replaces normal aggregation here with a global grouping because it needs to be consistent across batches 
        return new ChainedAggregatorDeclarer(this, new GlobalAggScheme())
                .aggregate(inputFields, agg, functionFields)
                .chainEnd()
               .partitionPersist(spec, functionFields, new CombinerAggStateUpdater(agg), functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(spec, null, agg, functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
    }
  • trident的aggregators主要分爲三類,分別是partitionAggregate、aggregate、persistentAggregate;aggregator操做會改變輸出
  • partitionAggregate其做用的粒度爲每一個partition,而非整個batch
  • aggregrate操做做用的粒度爲batch,對每一個batch,它先使用global操做將該batch的tuple從全部partition合併到一個partition,最後再對batch進行aggregation操做;這裏提供了三類參數,分別是Aggregator、CombinerAggregator、ReducerAggregator;調用stream.aggregrate方法時,至關於一次global aggregation,此時使用Aggregator或ReducerAggregator時,stream會先將tuple劃分到一個partition,而後再進行aggregate操做;而使用CombinerAggregator時,trident會進行優化,先對每一個partition進行局部的aggregate操做,而後再劃分到一個partition,最後再進行aggregate操做,於是相對Aggregator或ReducerAggregator能夠節省網絡傳輸耗時
  • persistentAggregate操做會對stream上全部batch的tuple進行aggretation,而後將結果存儲在state中

Aggregator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Aggregator.java

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T val, TridentTuple tuple, TridentCollector collector);
    void complete(T val, TridentCollector collector);
}
  • Aggregator首先會調用init進行初始化,而後經過參數傳遞給aggregate以及complete方法
  • 對於batch partition中的每一個tuple執行一次aggregate;當batch partition中的tuple執行完aggregate以後執行complete方法
  • 假設自定義Aggregator爲累加操做,那麼對於[4]、[7]、[8]這批tuple,init爲0,對於[4],val=0,0+4=4;對於[7],val=4,4+7=11;對於[8],val=11,11+8=19;而後batch結束,val=19,此時執行complete,可使用collector發射數據

CombinerAggregator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/CombinerAggregator.java

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}
  • CombinerAggregator每收到一個tuple,就調用init獲取當前tuple的值,調用combine操做使用前一個combine的結果(沒有的話取zero的值)與init取得的值進行新的combine操做,若是該partition中沒有tuple,則返回zero方法的值
  • 假設combine爲累加操做,zero返回0,那麼對於[4]、[7]、[8]這批tuple,init值分別是四、七、8,對於[4],沒有前一個combine結果,因而val1=0,val2=4,combine結果爲4;對於[7],val1=4,val2=7,combine結果爲11;對於[8],val1爲11,val2爲8,combine結果爲19
  • CombinerAggregator操做的網絡開銷相對較低,所以性能比其餘兩類aggratator好

ReducerAggregator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/ReducerAggregator.java

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}
  • ReducerAggregator在對一批tuple進行計算時,先調用一次init獲取初始值,而後再執行reduce操做,curr值爲前一次reduce操做的值,沒有的話,就是init值
  • 假設reduce爲累加操做,init返回0,那麼對於[4]、[7]、[8]這批tuple,對於[4],init爲0,而後curr=0,先是0+4=4;對於[7],curr爲4,就是4+7=11;對於[8],curr爲11,最後就是11+8=19

topology stream operations

join

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);        
    }
    
    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
        return join(streams, joinFields, outFields, JoinType.INNER);        
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);        
    }
    
    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
        return join(streams, joinFields, outFields, repeat(streams.size(), type));
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
        
    }
    
    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
        return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
        return join(streams, joinFields, outFields, JoinType.INNER, mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);

    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        switch (mode) {
            case COMPACT:
                return multiReduce(strippedInputFields(streams, joinFields),
                        groupedStreams(streams, joinFields),
                        new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
                        outFields);

            case PRESERVE:
                return multiReduce(strippedInputFields(streams, joinFields),
                        groupedStreams(streams, joinFields),
                        new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
                                getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
                        outFields);

            default:
                throw new IllegalArgumentException("Unsupported out-fields mode: " + mode);
        }
    }
  • 能夠看到join最後調用了multiReduce,對於COMPACT類型使用的GroupedMultiReducer是JoinerMultiReducer,對於PRESERVE類型使用的GroupedMultiReducer是PreservingFieldsOrderJoinerMultiReducer

merge

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public Stream merge(Fields outputFields, Stream... streams) {
        return merge(outputFields, Arrays.asList(streams));
    }
    
    public Stream merge(Stream... streams) {
        return merge(Arrays.asList(streams));
    }
    
    public Stream merge(List<Stream> streams) {
        return merge(streams.get(0).getOutputFields(), streams);
    } 

    public Stream merge(Fields outputFields, List<Stream> streams) {
        return multiReduce(streams, new IdentityMultiReducer(), outputFields);
    }
  • 能夠看到merge最後是調用了multiReduce,使用的MultiReducer是IdentityMultiReducer

multiReduce

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
    }

    public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
    }    
    
    public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
    }
    
    public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
    } 
    
    public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
    }
        
    public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);        
    }    
    
    public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
        List<Fields> fullInputFields = new ArrayList<>();
        List<Stream> streams = new ArrayList<>();
        List<Fields> fullGroupFields = new ArrayList<>();
        for(int i=0; i<groupedStreams.size(); i++) {
            GroupedStream gs = groupedStreams.get(i);
            Fields groupFields = gs.getGroupFields();
            fullGroupFields.add(groupFields);
            streams.add(gs.toStream().partitionBy(groupFields));
            fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
            
        }
        return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
    }

    public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
        List<String> names = new ArrayList<>();
        for(Stream s: streams) {
            if(s._name!=null) {
                names.add(s._name);
            }
        }
        Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
        return addSourcedNode(streams, n);
    }
  • multiReduce方法有個MultiReducer參數,join與merge雖然都調用了multiReduce,可是他們傳的MultiReducer值不同

小結

  • trident的操做主要有幾類,一類是基本的function、filter、projection操做;一類是repartitioning操做,主要是一些grouping;一類是aggregate操做,包括aggregate、partitionAggregate、persistentAggregate;一類是在topology對stream的join、merge操做
  • function的話,如有emit字段會追加到原始的tuple上;filter用於過濾tuple;projection用於提取字段
  • repartitioning操做有Grouping.local_or_shuffle、Grouping.shuffle、Grouping.all、GlobalGrouping、CustomStreamGrouping、IdentityGrouping、IndexHashGrouping等;partition操做能夠理解爲將輸入的tuple分配到task上,也能夠理解爲是對stream進行grouping
  • aggregate操做的話,普通的aggregate操做有3類接口,分別是Aggregator、CombinerAggregator、ReducerAggregator,其中Aggregator是最爲通用的,它繼承了Operation接口,並且在方法參數裏頭可使用到collector,這是CombinerAggregator與ReducerAggregator所沒有的;而CombinerAggregator與Aggregator及ReducerAggregator不一樣的是,調用stream.aggregrate方法時,trident會優先在partition進行局部聚合,而後再歸一到一個partition作最後聚合,相對來講比較節省網絡傳輸耗時,可是若是將CombinerAggregator與非CombinerAggregator的進行chaining的話,就享受不到這個優化;partitionAggregate主要是在partition維度上進行操做;而persistentAggregate則是在整個stream的維度上對全部batch的tuple進行操做,結果持久化在state上
  • 對於stream的join及merge操做,其最後都是依賴multiReduce來實現,只是傳遞的MultiReducer值不同;join的話join的話須要字段來進行匹配(字段名能夠不同),能夠選擇JoinType,是INNER仍是OUTER,不過join是對於spout的small batch來進行join的;merge的話,就是純粹的幾個stream進行tuple的歸總。

doc

相關文章
相關標籤/搜索