聊聊storm trident batch的分流與聚合

本文主要研究一下storm trident batch的分流與聚合java

實例

TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .partitionBy(new Fields("user"))
                .partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId"))
                .parallelismHint(3)
                .global()
                .aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg"))
                .each(new Fields("agg"),new PrintEachFunc(),new Fields())
        ;
  • 這裏最後構造了3個bolt,分別爲b-0、b-一、b-2
  • b-0主要是partitionAggregate,它的parallelismHint爲3
  • b-1主要是處理CombinerAggregator的init,它的parallelismHint爲1,因爲它的上游bolt有3個task,於是它的TridentBoltExecutor的tracked.condition.expectedTaskReports爲3,它要等到這三個task的聚合數據都到了以後,才能finishBatch
  • b-2主要是處理CombinerAggregator的combine以及each操做
  • 整個數據流從spout開始的一個batch,到了b-0經過partitionBy分流爲3個子batch,到了b-1則聚合了3個子batch以後才finishBatch,到了b-2則在b-1聚合以後的結果在作最後的聚合
log實例
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO  com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1]
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO  com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1]
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO  com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1]
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO  com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO  com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1]
23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO  com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}}
23:22:00.722 [Thread-22-b-0-executor[7 7]] INFO  com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO  com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0
23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO  com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1]
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO  com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1]
23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO  com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}}
23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO  com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}}
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - zero called
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0]
23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}}
23:22:00.726 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0]
23:22:00.727 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}}
23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0]
23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO  com.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}}
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO  com.example.demo.trident.AggAgg - zero called
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO  com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}}
23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO  com.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}]
  • 這裏看到storm的線程的命名已經帶上了bolt的命名,好比b-0、b-一、b-2

TridentBoltExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.javanode

public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }

    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }

    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }
  • execute方法裏頭在TrackedBatch不存在時會建立一個,建立的時候會調用_bolt.initBatchState方法
  • 這裏頭能夠看到在接收到正常tuple的時候,先調用_bolt.execute(tracked.info, tuple)執行,而後在調用_collector的ack,若是_bolt.execute拋出FailedException,則直接failBatch,它會標記tracked.failed爲true,最後在整個batch的tuple收發結束以後調用checkFinish,一旦發現有tracked.failed,則會調用_collector.fail
  • 這裏的_bolt有兩類,分別是TridentSpoutExecutor與SubtopologyBolt;若是是TridentSpoutExecutor,則tracked.condition.expectedTaskReports爲0,這裏每收到一個tuple(實際是發射一個batch的指令),在_bolt.execute以後就立馬finishBatch;而對於SubtopologyBolt,這裏tracked.condition.expectedTaskReports不爲0,須要等到最後的[id,count]指令再checkFinish

TridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.javaapache

@Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
        _emitter = _spout.getEmitter(_txStateId, conf, context);
        _collector = new AddIdCollector(_streamName, collector);
    }

    @Override
    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
    }
  • TridentSpoutExecutor使用的是AddIdCollector,它的initBatchState以及finishBatch方法均爲空操做
  • execute方法分COMMIT_STREAM_ID、SUCCESS_STREAM_ID、普通stream來處理
  • 普通的stream發來的tuple就是發射batch的指令,這裏就調用_emitter.emitBatch發射batch的tuples

SubtopologyBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.javasegmentfault

@Override
    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }

    @Override
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = _roots.get(sourceStream);
        if(ir==null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext) batchInfo.state, tuple);
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    protected static class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList<>();
        RootFactory _factory;
        ProjectionFactory _project;
        String _stream;
        
        public InitialReceiver(String stream, Fields allFields) {
            // TODO: don't want to project for non-batch bolts...???
            // how to distinguish "batch" streams from non-batch streams?
            _stream = stream;
            _factory = new RootFactory(allFields);
            List<String> projected = new ArrayList<>(allFields.toList());
            projected.remove(0);
            _project = new ProjectionFactory(_factory, new Fields(projected));
        }
        
        public void receive(ProcessorContext context, Tuple tuple) {
            TridentTuple t = _project.create(_factory.create(tuple));
            for(TridentProcessor r: _receivers) {
                r.execute(context, _stream, t);
            }            
        }
        
        public void addReceiver(TridentProcessor p) {
            _receivers.add(p);
        }
        
        public Factory getOutputFactory() {
            return _project;
        }
    }
  • 它的initBatchState方法,會建立ProcessorContext,而後會調用TridentProcessor(好比AggregateProcessor、EachProcessor)的startBatch方法
  • execute方法則調用InitialReceiver的execute,而它則是調用TridentProcessor的execute方法(好比AggregateProcessor)
  • finishBatch的時候則是調用TridentProcessor(好比AggregateProcessor、EachProcessor)的finishBatch方法

WindowTridentProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.javaapp

@Override
    public void startBatch(ProcessorContext processorContext) {
        // initialize state for batch
        processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();
    }

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));
    }

    @Override
    public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
  • 能夠看到WindowTridentProcessor在startBatch的時候,給processorContext.state[tridentContext.getStateIndex()]從新new了一個list
  • 在execute的時候,將接收到的tuple存到processorContext.state[tridentContext.getStateIndex()]中
  • 在finishBatch的時候,將processorContext.state[tridentContext.getStateIndex()]的數據添加到windowStore以及windowManager的ConcurrentLinkedQueue中
  • window的trigger會從ConcurrentLinkedQueue取出窗口數據,添加到pendingTriggers中;而WindowTridentProcessor在finishBatch的時候,會移除pendingTriggers的數據,而後經過FreshCollector進行emit
  • 經過FreshCollector發射出來的數據,會被它的TupleReceiver接收處理(好比ProjectedProcessor、PartitionPersistProcessor),PartitionPersistProcessor就是將數據存到state中,而ProjectedProcessor則根據window的outputFields提取字段,而後將數據傳遞給下游的各類processor,好比EachProcessor

小結

  • trident spout發射一個batch的數據,而後等待下游執行完這個batch數據就會按batch來finishBatch;對於bolt與bolt來講,之間tuple的ack間隔取決於每一個tuple的處理時間(TridentBoltExecutor會在tuple處理完以後自動幫你進行ack),若是總體處理時間過長,會致使整個topology的tuple處理超時,觸發spout的fail操做,這個時候就會從新觸發該batchId,若是spout是transactional的,那麼batchId對應的tuples在從新觸發時不變
  • window操做會打亂trident spout原始的batch,一個batch的數據先是累積在ProcessContext的state中(WindowTridentProcessor每次在startBatch的時候都會重置state)中,在finishBatch的時候,將數據拷貝到windowStore以及windowManager的ConcurrentLinkedQueue,以後等待window的trigger觸發,計算出窗口數據,而後放到pendingTriggers中,而在bolt finishBatch的時候是從pendingTriggers移除窗口數據,而後交給FreshCollector而後給到下游的processor處理,而下游的processor的startBatch及finishBatch時跟隨原始的spout的節奏來的,而非window來觸發
  • 假設數據源源不斷,那麼spout發送batch的速度取決於Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS(topology.trident.batch.emit.interval.millis,在defaults.yaml默認爲500)參數,而窗口的interval一般通常比默認的batch interval要大,這個樣子window就會聚合多個batch的數據;同時因爲前面finishBatch的時候,才把數據添加到windowManager的ConcurrentLinkedQueue,於是這個時候的pendingTriggers尚未數據,於是一般前面幾回finishBatch的時候從窗口獲取的數據爲空,於是後續的processor也沒有數據處理,要注意判空防止出現空指針
  • 若是對數據進行groupBy/partitionBy,當parallelism爲1時,這個時候groupBy/partitionBy是按batch來的;當parallelism大於1時,原始的spout在emit一個batch的時候,會分發到多個partition/task,原始batch的數據流就被分流了,每一個task本身處理完數據以後就執行各自的finishBatch操做(tuple按emit的順序來,最後一個是[id,count],它就至關於結束batch的指令,用於檢測及觸發完成batch操做),而後將新batch的數據發送給下游,新的batch發送完的時候發送[id,cout],依次在下游bolt進行batch操做;global操做將數據分發到同一個partition/task;batchGlobal在parallelism爲1的時候效果跟global同樣,在parallelism大於1時,就按batchId將數據分發到不一樣的partition/task
  • aggregate操做用於聚合數據,通常配合groupBy或partitionBy,會對上游的batch再次進行分流,而後按分流後的batch來aggregate;這個時候若是parallelism大於1,則是分task來進行aggregate,以後還想把這些聚合在一塊兒的話,能夠配合global().aggregate()操做;只要中間沒有window操做,那麼仍是會按原始的batch來最後aggregate的,由於TridentBoltExecutor的tracked.condition.expectedTaskReports記錄了該bolt須要等到哪幾個task彙報[id,count],在接收[id,count]數據的時候,會先判斷tracked.reportedTasks是否等於cond.expectedTaskReports,相等以後再判斷tracked.receivedTuples是否等於tracked.expectedTupleCount,相等才能進行finishBatch,完成當前batch,而後向下遊發射[id,count]數據;經過expectedTaskReports的判斷,是的整個batch在通過多個task分流處理以後最後還能按原始的batch聚合在一塊兒;不過要注意window操做會在window階段打亂trident spout原始的batch

doc

相關文章
相關標籤/搜索