本文主要研究一下storm trident的operationshtml
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.javajava
public interface Function extends EachOperation { /** * Performs the function logic on an individual tuple and emits 0 or more tuples. * * @param tuple The incoming tuple * @param collector A collector instance that can be used to emit tuples */ void execute(TridentTuple tuple, TridentCollector collector); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Filter.javaredis
public interface Filter extends EachOperation { /** * Determines if a tuple should be filtered out of a stream * * @param tuple the tuple being evaluated * @return `false` to drop the tuple, `true` to keep the tuple */ boolean isKeep(TridentTuple tuple); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javaapache
/** * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`. * * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling" * * ```java * mystream.project(new Fields("b", "d")) * ``` * * would produce a stream containing only the fields `["b", "d"]`. * * * @param keepFields The fields in the Stream to keep * @return */ public Stream project(Fields keepFields) { projectionValidation(keepFields); return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields))); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java網絡
/** * ## Repartitioning Operation * * @param partitioner * @return */ public Stream partition(CustomStreamGrouping partitioner) { return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner))); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javadom
/** * ## Repartitioning Operation * * @param fields * @return */ public Stream partitionBy(Fields fields) { projectionValidation(fields); return partition(Grouping.fields(fields.toList())); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javaide
/** * ## Repartitioning Operation * * @return */ public Stream identityPartition() { return partition(new IdentityGrouping()); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java性能
/** * ## Repartitioning Operation * * Use random round robin algorithm to evenly redistribute tuples across all target partitions * * @return */ public Stream shuffle() { return partition(Grouping.shuffle(new NullStruct())); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java優化
/** * ## Repartitioning Operation * * Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference * for local tasks. * * @return */ public Stream localOrShuffle() { return partition(Grouping.local_or_shuffle(new NullStruct())); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.javaui
/** * ## Repartitioning Operation * * All tuples are sent to the same partition. The same partition is chosen for all batches in the stream. * @return */ public Stream global() { // use this instead of storm's built in one so that we can specify a singleemitbatchtopartition // without knowledge of storm's internals return partition(new GlobalGrouping()); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/** * ## Repartitioning Operation * * All tuples in the batch are sent to the same partition. Different batches in the stream may go to different * partitions. * * @return */ public Stream batchGlobal() { // the first field is the batch id return partition(new IndexHashGrouping(0)); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/** * ## Repartitioning Operation * * Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do * a stateQuery on every partition of data. * * @return */ public Stream broadcast() { return partition(Grouping.all(new NullStruct())); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/** * ## Grouping Operation * * @param fields * @return */ public GroupedStream groupBy(Fields fields) { projectionValidation(fields); return new GroupedStream(this, fields); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
//partition aggregate public Stream partitionAggregate(Aggregator agg, Fields functionFields) { return partitionAggregate(null, agg, functionFields); } public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields) { return partitionAggregate(null, agg, functionFields); } public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) { projectionValidation(inputFields); return chainedAgg() .partitionAggregate(inputFields, agg, functionFields) .chainEnd(); } public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) { return partitionAggregate(null, agg, functionFields); } public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) { projectionValidation(inputFields); return chainedAgg() .partitionAggregate(inputFields, agg, functionFields) .chainEnd(); } //aggregate public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) { projectionValidation(inputFields); return chainedAgg() .aggregate(inputFields, agg, functionFields) .chainEnd(); } public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) { projectionValidation(inputFields); return chainedAgg() .aggregate(inputFields, agg, functionFields) .chainEnd(); } public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) { projectionValidation(inputFields); return chainedAgg() .aggregate(inputFields, agg, functionFields) .chainEnd(); } //persistent aggregate public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), agg, functionFields); } public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(spec, null, agg, functionFields); } public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields); } public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) { projectionValidation(inputFields); // replaces normal aggregation here with a global grouping because it needs to be consistent across batches return new ChainedAggregatorDeclarer(this, new GlobalAggScheme()) .aggregate(inputFields, agg, functionFields) .chainEnd() .partitionPersist(spec, functionFields, new CombinerAggStateUpdater(agg), functionFields); } public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), agg, functionFields); } public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) { return persistentAggregate(spec, null, agg, functionFields); } public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields); } public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) { projectionValidation(inputFields); return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Aggregator.java
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T val, TridentTuple tuple, TridentCollector collector); void complete(T val, TridentCollector collector); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/CombinerAggregator.java
public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); }
沒有的話取zero的值
)與init取得的值進行新的combine操做,若是該partition中沒有tuple,則返回zero方法的值storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/ReducerAggregator.java
public interface ReducerAggregator<T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) { return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields); } public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) { return join(streams, joinFields, outFields, JoinType.INNER); } public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) { return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type); } public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) { return join(streams, joinFields, outFields, repeat(streams.size(), type)); } public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) { return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed); } public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) { return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT); } public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) { return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode); } public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) { return join(streams, joinFields, outFields, JoinType.INNER, mode); } public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) { return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode); } public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) { return join(streams, joinFields, outFields, repeat(streams.size(), type), mode); } public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) { return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode); } public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) { switch (mode) { case COMPACT: return multiReduce(strippedInputFields(streams, joinFields), groupedStreams(streams, joinFields), new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)), outFields); case PRESERVE: return multiReduce(strippedInputFields(streams, joinFields), groupedStreams(streams, joinFields), new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(), getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)), outFields); default: throw new IllegalArgumentException("Unsupported out-fields mode: " + mode); } }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream merge(Fields outputFields, Stream... streams) { return merge(outputFields, Arrays.asList(streams)); } public Stream merge(Stream... streams) { return merge(Arrays.asList(streams)); } public Stream merge(List<Stream> streams) { return merge(streams.get(0).getOutputFields(), streams); } public Stream merge(Fields outputFields, List<Stream> streams) { return multiReduce(streams, new IdentityMultiReducer(), outputFields); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) { return multiReduce(Arrays.asList(s1, s2), function, outputFields); } public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) { return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields); } public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) { return multiReduce(Arrays.asList(s1, s2), function, outputFields); } public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) { return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields); } public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) { return multiReduce(getAllOutputFields(streams), streams, function, outputFields); } public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) { return multiReduce(getAllOutputFields(streams), streams, function, outputFields); } public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) { List<Fields> fullInputFields = new ArrayList<>(); List<Stream> streams = new ArrayList<>(); List<Fields> fullGroupFields = new ArrayList<>(); for(int i=0; i<groupedStreams.size(); i++) { GroupedStream gs = groupedStreams.get(i); Fields groupFields = gs.getGroupFields(); fullGroupFields.add(groupFields); streams.add(gs.toStream().partitionBy(groupFields)); fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i))); } return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields); } public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) { List<String> names = new ArrayList<>(); for(Stream s: streams) { if(s._name!=null) { names.add(s._name); } } Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function)); return addSourcedNode(streams, n); }
字段名能夠不同
),能夠選擇JoinType,是INNER仍是OUTER,不過join是對於spout的small batch來進行join的;merge的話,就是純粹的幾個stream進行tuple的歸總。