聊聊storm WindowTridentProcessor的FreshCollector

本文主要研究一下storm WindowTridentProcessor的FreshCollectorhtml

實例

TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .partitionBy(new Fields("user"))
                .window(windowConfig,windowsStoreFactory,new Fields("user","score"),new UserCountAggregator(),new Fields("aggData"))
                .parallelismHint(1)
                .each(new Fields("aggData"), new PrintEachFunc(),new Fields());
  • 這個實例在window操做以後跟了一個each操做

WindowTridentProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.javajava

public class WindowTridentProcessor implements TridentProcessor {
	
	private FreshCollector collector;

	//......

    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }

    public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
}
  • WindowTridentProcessor在prepare的時候建立了FreshCollector
  • finishBatch的時候,調用FreshCollector.emit將窗口的aggregate的結果集傳遞過去
  • 傳遞的數據結構爲ConsList,實際上是個AbstractList的實現,由Object類型的first元素,以及List<Object>結構的_elems組成

FreshCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/FreshCollector.javaapache

public class FreshCollector implements TridentCollector {
    FreshOutputFactory _factory;
    TridentContext _triContext;
    ProcessorContext context;
    
    public FreshCollector(TridentContext context) {
        _triContext = context;
        _factory = new FreshOutputFactory(context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc) {
        this.context = pc;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create(values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }            
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 

    public Factory getOutputFactory() {
        return _factory;
    }    
}
  • FreshCollector在構造器裏頭根據context的selfOutputFields(第一個field固定爲_task_info,以後的幾個field爲用戶在window方法定義的functionFields)構造FreshOutputFactory
  • emit方法,首先使用FreshOutputFactory根據outputFields構造TridentTupleView,以後獲取TupleReceiver,調用TupleReceiver的execute方法把TridentTupleView傳遞過去
  • 這裏的TupleReceiver有ProjectedProcessor、PartitionPersistProcessor

TridentTupleView.FreshOutputFactory

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/TridentTupleView.javawindows

public static class FreshOutputFactory  implements Factory {
        Map<String, ValuePointer> _fieldIndex;
        ValuePointer[] _index;

        public FreshOutputFactory(Fields selfFields) {
            _fieldIndex = new HashMap<>();
            for(int i=0; i<selfFields.size(); i++) {
                String field = selfFields.get(i);
                _fieldIndex.put(field, new ValuePointer(0, i, field));
            }
            _index = ValuePointer.buildIndex(selfFields, _fieldIndex);
        }
        
        public TridentTuple create(List<Object> selfVals) {
            return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
        }

        @Override
        public Map<String, ValuePointer> getFieldIndex() {
            return _fieldIndex;
        }

        @Override
        public int numDelegates() {
            return 1;
        }
        
        @Override
        public List<String> getOutputFields() {
            return indexToFieldsList(_index);
        }        
    }
  • FreshOutputFactory是TridentTupleView的一個靜態類,其構造方法主要是計算_index以及_fieldIndex
  • _fieldIndex是一個map,key是field字段,value是ValuePointer,記錄其delegateIndex(這裏固定爲0)、index及field信息;第一個field爲_task_info,index爲0;以後的fields爲用戶在window方法定義的functionFields
  • 這裏的create方法主要是構造TridentTupleView,其構造器第一個值爲IPersistentVector,第二個值爲_index,第三個值爲_fieldIndex

ValuePointer

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/ValuePointer.java數組

public class ValuePointer {
    public static Map<String, ValuePointer> buildFieldIndex(ValuePointer[] pointers) {
        Map<String, ValuePointer> ret = new HashMap<String, ValuePointer>();
        for(ValuePointer ptr: pointers) {
            ret.put(ptr.field, ptr);
        }
        return ret;        
    }

    public static ValuePointer[] buildIndex(Fields fieldsOrder, Map<String, ValuePointer> pointers) {
        if(fieldsOrder.size()!=pointers.size()) {
            throw new IllegalArgumentException("Fields order must be same length as pointers map");
        }
        ValuePointer[] ret = new ValuePointer[pointers.size()];
        for(int i=0; i<fieldsOrder.size(); i++) {
            ret[i] = pointers.get(fieldsOrder.get(i));
        }
        return ret;
    }    
    
    public int delegateIndex;
    protected int index;
    protected String field;
    
    public ValuePointer(int delegateIndex, int index, String field) {
        this.delegateIndex = delegateIndex;
        this.index = index;
        this.field = field;
    }

    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this);
    }    
}
  • 這裏的buildIndex,主要是根據selfOutputFields的順序返回ValuePointer數組

ProjectedProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/ProjectedProcessor.java數據結構

public class ProjectedProcessor implements TridentProcessor {
    Fields _projectFields;
    ProjectionFactory _factory;
    TridentContext _context;
    
    public ProjectedProcessor(Fields projectFields) {
        _projectFields = projectFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        if(tridentContext.getParentTupleFactories().size()!=1) {
            throw new RuntimeException("Projection processor can only have one parent");
        }
        _context = tridentContext;
        _factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
    }

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        TridentTuple toEmit = _factory.create(tuple);
        for(TupleReceiver r: _context.getReceivers()) {
            r.execute(processorContext, _context.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {
    }

    @Override
    public Factory getOutputFactory() {
        return _factory;
    }
}
  • ProjectedProcessor在prepare的時候,建立了ProjectionFactory,其_projectFields就是window方法定義的functionFields,這裏還使用tridentContext.getParentTupleFactories().get(0)提取了parent的第一個Factory,因爲是FreshCollector傳遞過來的,於是這裏是TridentTupleView.FreshOutputFactory
  • execute的時候,首先調用ProjectionFactory.create方法,對TridentTupleView進行字段提取操做,toEmit就是根據window方法定義的functionFields從新提取的TridentTupleView
  • execute方法以後對_context.getReceivers()挨個調用execute操做,將toEmit傳遞過去,這裏的receiver就是window操做以後的各類processor了,好比EachProcessor

TridentTupleView.ProjectionFactory

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/TridentTupleView.javaide

public static class ProjectionFactory implements Factory {
        Map<String, ValuePointer> _fieldIndex;
        ValuePointer[] _index;
        Factory _parent;

        public ProjectionFactory(Factory parent, Fields projectFields) {
            _parent = parent;
            if(projectFields==null) projectFields = new Fields();
            Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();
            _fieldIndex = new HashMap<>();
            for(String f: projectFields) {
                _fieldIndex.put(f, parentFieldIndex.get(f));
            }            
            _index = ValuePointer.buildIndex(projectFields, _fieldIndex);
        }
        
        public TridentTuple create(TridentTuple parent) {
            if(_index.length==0) return EMPTY_TUPLE;
            else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);
        }

        @Override
        public Map<String, ValuePointer> getFieldIndex() {
            return _fieldIndex;
        }

        @Override
        public int numDelegates() {
            return _parent.numDelegates();
        }

        @Override
        public List<String> getOutputFields() {
            return indexToFieldsList(_index);
        }
    }
  • ProjectionFactory是TridentTupleView的靜態類,它在構造器裏頭根據projectFields構造_index及_fieldIndex,這樣create方法就能根據所需的字段建立TridentTupleView

EachProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.javaui

public class EachProcessor implements TridentProcessor {
    Function _function;
    TridentContext _context;
    AppendCollector _collector;
    Fields _inputFields;
    ProjectionFactory _projection;
    
    public EachProcessor(Fields inputFields, Function function) {
        _function = function;
        _inputFields = inputFields;
    }
    
    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size()!=1) {
            throw new RuntimeException("Each operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new AppendCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _function.prepare(conf, new TridentOperationContext(context, _projection));
    }

    @Override
    public void cleanup() {
        _function.cleanup();
    }    

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext, tuple);
        _function.execute(_projection.create(tuple), _collector);
    }

    @Override
    public void startBatch(ProcessorContext processorContext) {
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {
    }

    @Override
    public Factory getOutputFactory() {
        return _collector.getOutputFactory();
    }    
}
  • EachProcessor的execute方法,首先設置_collector的context爲processorContext,而後調用_function.execute方法
  • 這裏調用了_projection.create(tuple)來提取字段,主要是根據_function定義的inputFields來提取
  • 這裏傳遞給_function的collector爲AppendCollector

AppendCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.javathis

public class AppendCollector implements TridentCollector {
    OperationOutputFactory _factory;
    TridentContext _triContext;
    TridentTuple tuple;
    ProcessorContext context;
    
    public AppendCollector(TridentContext context) {
        _triContext = context;
        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc, TridentTuple t) {
        this.context = pc;
        this.tuple = t;
    }

    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }
    }

    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 
    
    public Factory getOutputFactory() {
        return _factory;
    }
}
  • AppendCollector在構造器裏頭建立了OperationOutputFactory,其emit方法也是提取OperationOutputFields,而後挨個調用_triContext.getReceivers()的execute方法;若是each以後沒有其餘操做,那麼AppendCollector的_triContext.getReceivers()就爲空

小結

  • WindowTridentProcessor裏頭使用的是FreshCollector,WindowTridentProcessor在finishBatch的時候,會從TridentWindowManager提取window建立的pendingTriggers(提取以後會將其數據從pendingTriggers移除),裏頭包含了窗口累積的數據,而後使用FreshCollector發射這些數據,默認第一個value爲TriggerInfo,第二個value就是窗口累積發射的values
  • FreshCollector的emit方法首先使用TridentTupleView.FreshOutputFactory根據selfOutputFields(第一個field固定爲_task_info,以後的幾個field爲用戶在window方法定義的functionFields)構建TridentTupleView,而後挨個調用_triContext.getReceivers()的execute方法
  • 後續的receivers中有一個ProjectedProcessor,用於根據window方法定義的functionFields從新提取的TridentTupleView,它的execute方法也相似FreshCollector.emit方法,先提取所需字段構造TridentTupleView,而後挨個調用_triContext.getReceivers()的execute方法(好比EachProcessor.execute)
  • EachProcessor使用的collector爲AppendCollector,它的emit方法也相似FreshCollector的emit方法,先進行字段提取構造TridentTupleView,而後挨個調用_triContext.getReceivers()的execute方法
  • FreshCollector的emit方法與ProjectedProcessor的execute方法以及AppendCollector的emit方法都很是相似,首先是使用Factory提取所需字段構建TridentTupleView,而後挨個調用_triContext.getReceivers()的execute方法;當一個_triContext沒有receiver的時候,tuple的傳遞也就中止了

doc

相關文章
相關標籤/搜索