本文主要研究一下storm trident的coordinatorhtml
@Test public void testDebugTopologyBuild(){ FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, new Values("nickt1", 4), new Values("nickt2", 7), new Values("nickt3", 8), new Values("nickt4", 9), new Values("nickt5", 7), new Values("nickt6", 11), new Values("nickt7", 5) ); spout.setCycle(false); TridentTopology topology = new TridentTopology(); Stream stream1 = topology.newStream("spout1",spout) .each(new Fields("user", "score"), new BaseFunction() { @Override public void execute(TridentTuple tuple, TridentCollector collector) { System.out.println("tuple:"+tuple); } },new Fields()); topology.build(); }
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javajava
public class MasterBatchCoordinator extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class); public static final long INIT_TXID = 1L; public static final String BATCH_STREAM_ID = "$batch"; public static final String COMMIT_STREAM_ID = "$commit"; public static final String SUCCESS_STREAM_ID = "$success"; private static final String CURRENT_TX = "currtx"; private static final String CURRENT_ATTEMPTS = "currattempts"; private List<TransactionalState> _states = new ArrayList(); TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>(); TreeMap<Long, Integer> _attemptIds; private SpoutOutputCollector _collector; Long _currTransaction; int _maxTransactionActive; List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList(); List<String> _managedSpoutIds; List<ITridentSpout> _spouts; WindowedTimeThrottler _throttler; boolean _active = true; public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) { if(spoutIds.isEmpty()) { throw new IllegalArgumentException("Must manage at least one spout"); } _managedSpoutIds = spoutIds; _spouts = spouts; LOG.debug("Created {}", this); } public List<String> getManagedSpoutIds(){ return _managedSpoutIds; } @Override public void activate() { _active = true; } @Override public void deactivate() { _active = false; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); for(String spoutId: _managedSpoutIds) { _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); } _currTransaction = getStoredCurrTransaction(); _collector = collector; Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); if(active==null) { _maxTransactionActive = 1; } else { _maxTransactionActive = active.intValue(); } _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); for(int i=0; i<_spouts.size(); i++) { String txId = _managedSpoutIds.get(i); _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); } LOG.debug("Opened {}", this); } @Override public void close() { for(TransactionalState state: _states) { state.close(); } LOG.debug("Closed {}", this); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far, // when it sees the earlier txid it should know to emit nothing declarer.declareStream(BATCH_STREAM_ID, new Fields("tx")); declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx")); declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx")); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); ret.registerSerialization(TransactionAttempt.class); return ret; } //...... }
topology.trident.batch.emit.interval.millis,在defaults.yaml默認爲500
)讀取觸發batch的頻率配置,而後建立WindowedTimeThrottler,其maxAmt值爲1topology.max.spout.pending,在defaults.yaml中默認爲null
)設置_maxTransactionActive,若是爲null,則設置爲1storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javaapache
@Override public void nextTuple() { sync(); } private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), // and there won't be a batch for tx 4 because there's max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this); } if(_active) { if(_activeTx.size() < _maxTransactionActive) { Long curr = _currTransaction; for(int i=0; i<_maxTransactionActive; i++) { if(!_activeTx.containsKey(curr) && isReady(curr)) { // by using a monotonically increasing attempt id, downstream tasks // can be memory efficient by clearing out state for old attempts // as soon as they see a higher attempt id for a transaction Integer attemptId = _attemptIds.get(curr); if(attemptId==null) { attemptId = 0; } else { attemptId++; } _attemptIds.put(curr, attemptId); for(TransactionalState state: _states) { state.setData(CURRENT_ATTEMPTS, _attemptIds); } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); } } } }
$commit
)發送tuple;以後根據_maxTransactionActive以及WindowedTimeThrottler限制,符合要求才啓動新的TransactionAttempt,往MasterBatchCoordinator.BATCH_STREAM_ID($batch
)發送tuple,同時對WindowedTimeThrottler標記下windowEvent數量storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javasegmentfault
@Override public void ack(Object msgId) { TransactionAttempt tx = (TransactionAttempt) msgId; TransactionStatus status = _activeTx.get(tx.getTransactionId()); LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this); if(status!=null && tx.equals(status.attempt)) { if(status.status==AttemptStatus.PROCESSING) { status.status = AttemptStatus.PROCESSED; LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status); } else if(status.status==AttemptStatus.COMMITTING) { _activeTx.remove(tx.getTransactionId()); _attemptIds.remove(tx.getTransactionId()); _collector.emit(SUCCESS_STREAM_ID, new Values(tx)); _currTransaction = nextTransactionId(tx.getTransactionId()); for(TransactionalState state: _states) { state.setData(CURRENT_TX, _currTransaction); } LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this); } sync(); } }
$success
)發送tuple,更新_currTransaction爲nextTransactionId;最後再調用sync觸發新的TransactionAttemptstorm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java緩存
@Override public void fail(Object msgId) { TransactionAttempt tx = (TransactionAttempt) msgId; TransactionStatus stored = _activeTx.remove(tx.getTransactionId()); LOG.debug("Fail. [tx_attempt = {}], [tx_status = {}], [{}]", tx, stored, this); if(stored!=null && tx.equals(stored.attempt)) { _activeTx.tailMap(tx.getTransactionId()).clear(); sync(); } }
注意這裏沒有變動_currTransaction,於是sync方法觸發新的TransactionAttempt的_txid仍是當前這個失敗的_currTransaction
)storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutCoordinator.javaapp
public class TridentSpoutCoordinator implements IBasicBolt { public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class); private static final String META_DIR = "meta"; ITridentSpout<Object> _spout; ITridentSpout.BatchCoordinator<Object> _coord; RotatingTransactionalState _state; TransactionalState _underlyingState; String _id; public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) { _spout = spout; _id = id; } @Override public void prepare(Map conf, TopologyContext context) { _coord = _spout.getCoordinator(_id, conf, context); _underlyingState = TransactionalState.newCoordinatorState(conf, _id); _state = new RotatingTransactionalState(_underlyingState, META_DIR); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { _state.cleanupBefore(attempt.getTransactionId()); _coord.success(attempt.getTransactionId()); } else { long txid = attempt.getTransactionId(); Object prevMeta = _state.getPreviousState(txid); Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid)); _state.overrideState(txid, meta); collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta)); } } @Override public void cleanup() { _coord.close(); _underlyingState.close(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(MasterBatchCoordinator.BATCH_STREAM_ID, new Fields("tx", "metadata")); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } }
$success
)則表示master那邊接收到了ack已經成功了,而後coordinator就清除該txId以前的數據,而後回調ITridentSpout.BatchCoordinator的success方法$batch
)則要啓動新的TransactionAttempt,則往MasterBatchCoordinator.BATCH_STREAM_ID($batch
)發送tuple,該tuple會被下游的bolt接收(在本實例就是使用TridentSpoutExecutor包裝了用戶spout的TridentBoltExecutor
)storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.javaless
public class TridentBoltExecutor implements IRichBolt { public static final String COORD_STREAM_PREFIX = "$coord-"; public static String COORD_STREAM(String batch) { return COORD_STREAM_PREFIX + batch; } RotatingMap<Object, TrackedBatch> _batches; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L; _lastRotate = System.currentTimeMillis(); _batches = new RotatingMap<>(2); _context = context; _collector = collector; _coordCollector = new CoordinatedOutputCollector(collector); _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector)); _coordConditions = (Map) context.getExecutorData("__coordConditions"); if(_coordConditions==null) { _coordConditions = new HashMap<>(); for(String batchGroup: _coordSpecs.keySet()) { CoordSpec spec = _coordSpecs.get(batchGroup); CoordCondition cond = new CoordCondition(); cond.commitStream = spec.commitStream; cond.expectedTaskReports = 0; for(String comp: spec.coords.keySet()) { CoordType ct = spec.coords.get(comp); if(ct.equals(CoordType.single())) { cond.expectedTaskReports+=1; } else { cond.expectedTaskReports+=context.getComponentTasks(comp).size(); } } cond.targetTasks = new HashSet<>(); for(String component: Utils.get(context.getThisTargets(), COORD_STREAM(batchGroup), new HashMap<String, Grouping>()).keySet()) { cond.targetTasks.addAll(context.getComponentTasks(component)); } _coordConditions.put(batchGroup, cond); } context.setExecutorData("_coordConditions", _coordConditions); } _bolt.prepare(conf, context, _coordOutputCollector); } //...... @Override public void cleanup() { _bolt.cleanup(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { _bolt.declareOutputFields(declarer); for(String batchGroup: _coordSpecs.keySet()) { declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count")); } } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> ret = _bolt.getComponentConfiguration(); if(ret==null) ret = new HashMap<>(); ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization return ret; } }
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.javajvm
@Override public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // } //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private TupleType getTupleType(Tuple tuple, TrackedBatch batch) { CoordCondition cond = batch.condition; if(cond.commitStream!=null && tuple.getSourceGlobalStreamId().equals(cond.commitStream)) { return TupleType.COMMIT; } else if(cond.expectedTaskReports > 0 && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) { return TupleType.COORD; } else { return TupleType.REGULAR; } } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } }
prepare的時候初始化爲當時的時間
)是否超過_messageTimeoutMs,若是是則進行_batches.rotate()操做;tickTuple的發射頻率爲Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS(topology.tick.tuple.freq.secs
),在TridentBoltExecutor中它被設置爲5秒;_messageTimeoutMs爲context.maxTopologyMessageTimeout() * 1000L,它從整個topology的component的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,defaults.yaml中默認爲30
)最大值*1000coordinator發送過來的batch信息
),則更新receivedTuples計數,而後調用_bolt.execute方法(這裏的_bolt爲TridentSpoutExecutor
),對於tracked.condition.expectedTaskReports==0的則立馬調用finishBatch,將該batch從_batches中移除;若是有FailedException則直接failBatch上報error信息,以後對tuple進行ack或者fail;若是下游是each操做,一個batch中若是是部分拋出FailedException異常,則須要等到全部batch中的tuple執行完,等到TupleType.COORD觸發檢測checkFinish,這個時候才能fail通知到master,也就是有一些滯後性,好比這個batch中有3個tuple,第二個tuple拋出FailedException,還會繼續執行第三個tuple,最後該batch的tuple都處理完了,才收到TupleType.COORD觸發檢測checkFinish。storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.javaide
private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } } private void failBatch(TrackedBatch tracked) { failBatch(tracked, null); } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } }
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.javaui
public class TridentSpoutExecutor implements ITridentBatchBolt { public static final String ID_FIELD = "$tx"; public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class); AddIdCollector _collector; ITridentSpout<Object> _spout; ITridentSpout.Emitter<Object> _emitter; String _streamName; String _txStateId; TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>(); public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) { _txStateId = txStateId; _spout = spout; _streamName = streamName; } @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) { _emitter = _spout.getEmitter(_txStateId, conf, context); _collector = new AddIdCollector(_streamName, collector); } @Override public void execute(BatchInfo info, Tuple input) { // there won't be a BatchInfo for the success stream TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) { if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) { ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt); _activeBatches.remove(attempt.getTransactionId()); } else { throw new FailedException("Received commit for different transaction attempt"); } } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { // valid to delete before what's been committed since // those batches will never be accessed again _activeBatches.headMap(attempt.getTransactionId()).clear(); _emitter.success(attempt); } else { _collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt); } } @Override public void cleanup() { _emitter.close(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { List<String> fields = new ArrayList<>(_spout.getOutputFields().toList()); fields.add(0, ID_FIELD); declarer.declareStream(_streamName, new Fields(fields)); } @Override public Map<String, Object> getComponentConfiguration() { return _spout.getComponentConfiguration(); } @Override public void finishBatch(BatchInfo batchInfo) { } @Override public Object initBatchState(String batchGroup, Object batchId) { return null; } }
即TransactionAttempt信息
)IBatchSpout類型
)的BatchSpoutExecutor(假設原始spout是IBatchSpout類型的,於是會經過BatchSpoutExecutor包裝爲ITridentSpout類型
),其execute方法根據不一樣stream類型進行不一樣處理,若是是master發過來的MasterBatchCoordinator.COMMIT_STREAM_ID($commit
)則調用emitter的commit方法提交當前TransactionAttempt(本文的實例沒有commit信息
),而後將該tx從_activeBatches中移除;若是是master發過來的MasterBatchCoordinator.SUCCESS_STREAM_ID($success
)則先把_activeBatches中txId小於該txId的TransactionAttempt移除,而後調用emitter的success方法,標記TransactionAttempt成功,該方法回調原始spout(IBatchSpout類型
)的ack方法$commit
)及MasterBatchCoordinator.SUCCESS_STREAM_ID($success
)類型的tuple,則是啓動batch的消息,這裏設置batchId,而後調用emitter的emitBatch進行數據發送(這裏傳遞的batchId就是TransactionAttempt的txId
),同時將該TransactionAttempt放入_activeBatches中(這裏的batch至關於TransactionAttempt
)storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/testing/FixedBatchSpout.java
public class FixedBatchSpout implements IBatchSpout { Fields fields; List<Object>[] outputs; int maxBatchSize; HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>(); public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) { this.fields = fields; this.outputs = outputs; this.maxBatchSize = maxBatchSize; } int index = 0; boolean cycle = false; public void setCycle(boolean cycle) { this.cycle = cycle; } @Override public void open(Map conf, TopologyContext context) { index = 0; } @Override public void emitBatch(long batchId, TridentCollector collector) { List<List<Object>> batch = this.batches.get(batchId); if(batch == null){ batch = new ArrayList<List<Object>>(); if(index>=outputs.length && cycle) { index = 0; } for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) { batch.add(outputs[index]); } this.batches.put(batchId, batch); } for(List<Object> list : batch){ collector.emit(list); } } @Override public void ack(long batchId) { this.batches.remove(batchId); } @Override public void close() { } @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.setMaxTaskParallelism(1); return conf; } @Override public Fields getOutputFields() { return fields; } }
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
public Stream newStream(String txId, IRichSpout spout) { return newStream(txId, new RichSpoutBatchExecutor(spout)); } public Stream newStream(String txId, IBatchSpout spout) { Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH); return addNode(n); } public Stream newStream(String txId, ITridentSpout spout) { Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH); return addNode(n); } public Stream newStream(String txId, IPartitionedTridentSpout spout) { return newStream(txId, new PartitionedTridentSpoutExecutor(spout)); } public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) { return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout)); } public Stream newStream(String txId, ITridentDataSource dataSource) { if (dataSource instanceof IBatchSpout) { return newStream(txId, (IBatchSpout) dataSource); } else if (dataSource instanceof ITridentSpout) { return newStream(txId, (ITridentSpout) dataSource); } else if (dataSource instanceof IPartitionedTridentSpout) { return newStream(txId, (IPartitionedTridentSpout) dataSource); } else if (dataSource instanceof IOpaquePartitionedTridentSpout) { return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource); } else { throw new UnsupportedOperationException("Unsupported stream"); } }
免得用戶再去實現ITridentSpout的相關接口,屏蔽trident spout的相關邏輯,使得以前一直使用普通topology的用戶能夠快速上手trident topology
)在build方法
)、IPartitionedTridentSpout(在newStream方法
)、IOpaquePartitionedTridentSpout(在newStream方法
)適配爲ITridentSpout類型;分別使用BatchSpoutExecutor、PartitionedTridentSpoutExecutor、OpaquePartitionedTridentSpoutExecutor進行適配(TridentTopologyBuilder在buildTopology的時候,對於ITridentSpout類型的spout先用TridentSpoutExecutor包裝,再用TridentBoltExecutor包裝,最後轉換爲bolt,而整個TridentTopology真正的spout就是MasterBatchCoordinator;這裏能夠看到一個IBatchSpout的spout先通過BatchSpoutExecutor包裝爲ITridentSpout類型,以後再通過TridentSpoutExecutor及TridentBoltExecutor包裝爲bolt
)一個batch的tuples會等全部tuple執行完再觸發checkFinish
),進行reportError以及標記TrackedBatch的failed爲true,以後TridentBoltExecutor在checkFinish的時候,一旦發現tracked.failed爲true的時候,會調用_collector.fail(tuple),而後回調MasterBatchCoordinator的fail方法注意這裏沒有更改_currTransaction值,於是會繼續從失敗的txId開始重試,只有在ack方法裏頭會更改_currTransaction爲nextTransactionId
)取component中Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS最大值*1000,這裏*1000是將秒轉換爲毫秒
),超過的話進行rotate操做,_batches的最後一個bucket將會被移除掉;這裏的tickTuple的頻率爲5秒,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS按30秒算的話,_messageTimeoutMs爲30*1000,至關於每5秒檢測一下距離上次rotate時間是否超過30秒,若是超過則進行rotate,丟棄最後一個bucket的數據(TrackedBatch
),這裏至關於重置超時的TrackedBatch信息topology.message.timeout.secs,defaults.yaml中默認爲30
),這個時候ack會觸發master的fail,致使該TransactionAttempt失敗繼續重試,目前沒有對attempt的次數作限制,實際生產過程當中要注意,由於只要該batchId的一個tuple失敗,整個batchId的tuples都會重發,這個時候下游若是沒有作好處理,可能會出現一個batchId中前面部分tuple成功,後面部分失敗,致使成功的tuple不斷重複處理(要避免失敗的batch中tuples部分處理成功部分處理失敗這個問題就須要配合使用Trident的State
)。