本文主要研究一下storm trident batch的分流與聚合java
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .partitionBy(new Fields("user")) .partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId")) .parallelismHint(3) .global() .aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg")) .each(new Fields("agg"),new PrintEachFunc(),new Fields()) ;
log實例node
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFO com.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}} 23:22:00.722 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1] 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1] 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}} 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFO com.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}} 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - zero called 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0] 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}} 23:22:00.726 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0] 23:22:00.727 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}} 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0] 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFO com.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - zero called 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFO com.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}]
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.javaapache
public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // } //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } }
實際是發射一個batch的指令
),在_bolt.execute以後就立馬finishBatch;而對於SubtopologyBolt,這裏tracked.condition.expectedTaskReports不爲0,須要等到最後的[id,count]指令再checkFinishstorm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.javasegmentfault
@Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) { _emitter = _spout.getEmitter(_txStateId, conf, context); _collector = new AddIdCollector(_streamName, collector); } @Override public void execute(BatchInfo info, Tuple input) { // there won't be a BatchInfo for the success stream TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) { if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) { ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt); _activeBatches.remove(attempt.getTransactionId()); } else { throw new FailedException("Received commit for different transaction attempt"); } } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { // valid to delete before what's been committed since // those batches will never be accessed again _activeBatches.headMap(attempt.getTransactionId()).clear(); _emitter.success(attempt); } else { _collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt); } } @Override public void finishBatch(BatchInfo batchInfo) { } @Override public Object initBatchState(String batchGroup, Object batchId) { return null; }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.javaapp
@Override public Object initBatchState(String batchGroup, Object batchId) { ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]); for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) { p.startBatch(ret); } return ret; } @Override public void execute(BatchInfo batchInfo, Tuple tuple) { String sourceStream = tuple.getSourceStreamId(); InitialReceiver ir = _roots.get(sourceStream); if(ir==null) { throw new RuntimeException("Received unexpected tuple " + tuple.toString()); } ir.receive((ProcessorContext) batchInfo.state, tuple); } @Override public void finishBatch(BatchInfo batchInfo) { for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); } } protected static class InitialReceiver { List<TridentProcessor> _receivers = new ArrayList<>(); RootFactory _factory; ProjectionFactory _project; String _stream; public InitialReceiver(String stream, Fields allFields) { // TODO: don't want to project for non-batch bolts...??? // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List<String> projected = new ArrayList<>(allFields.toList()); projected.remove(0); _project = new ProjectionFactory(_factory, new Fields(projected)); } public void receive(ProcessorContext context, Tuple tuple) { TridentTuple t = _project.create(_factory.create(tuple)); for(TridentProcessor r: _receivers) { r.execute(context, _stream, t); } } public void addReceiver(TridentProcessor p) { _receivers.add(p); } public Factory getOutputFactory() { return _project; } }
好比AggregateProcessor、EachProcessor
)的startBatch方法好比AggregateProcessor
)好比AggregateProcessor、EachProcessor
)的finishBatch方法storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.javaide
@Override public void startBatch(ProcessorContext processorContext) { // initialize state for batch processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>(); } @Override public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { // add tuple to the batch state Object state = processorContext.state[tridentContext.getStateIndex()]; ((List<TridentTuple>) state).add(projection.create(tuple)); } @Override public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List<Integer> pendingTriggerIds = null; List<String> triggerKeys = new ArrayList<>(); Iterable<Object> triggerValues = null; if (retriedAttempt(batchId)) { pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); List<Object> values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List<Object> aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue)); } collector.setContext(null); }
好比ProjectedProcessor、PartitionPersistProcessor
),PartitionPersistProcessor就是將數據存到state中,而ProjectedProcessor則根據window的outputFields提取字段,而後將數據傳遞給下游的各類processor,好比EachProcessorTridentBoltExecutor會在tuple處理完以後自動幫你進行ack
),若是總體處理時間過長,會致使整個topology的tuple處理超時,觸發spout的fail操做,這個時候就會從新觸發該batchId,若是spout是transactional的,那麼batchId對應的tuples在從新觸發時不變WindowTridentProcessor每次在startBatch的時候都會重置state
)中,在finishBatch的時候,將數據拷貝到windowStore以及windowManager的ConcurrentLinkedQueue,以後等待window的trigger觸發,計算出窗口數據,而後放到pendingTriggers中,而在bolt finishBatch的時候是從pendingTriggers移除窗口數據,而後交給FreshCollector而後給到下游的processor處理,而下游的processor的startBatch及finishBatch時跟隨原始的spout的節奏來的,而非window來觸發topology.trident.batch.emit.interval.millis,在defaults.yaml默認爲500
)參數,而窗口的interval一般通常比默認的batch interval要大,這個樣子window就會聚合多個batch的數據;同時因爲前面finishBatch的時候,才把數據添加到windowManager的ConcurrentLinkedQueue,於是這個時候的pendingTriggers尚未數據,於是一般前面幾回finishBatch的時候從窗口獲取的數據爲空,於是後續的processor也沒有數據處理,要注意判空防止出現空指針tuple按emit的順序來,最後一個是[id,count],它就至關於結束batch的指令,用於檢測及觸發完成batch操做
),而後將新batch的數據發送給下游,新的batch發送完的時候發送[id,cout],依次在下游bolt進行batch操做;global操做將數據分發到同一個partition/task;batchGlobal在parallelism爲1的時候效果跟global同樣,在parallelism大於1時,就按batchId將數據分發到不一樣的partition/task