本文主要研究一下storm的maxSpoutPendinghtml
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.javajava
/** * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to * spouts or topologies as a whole. * * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has * no effect for unreliable spouts that don't tag their tuples with a message id. */ @isInteger @isPositiveNumber public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending";
設置msgId
)的spout起做用storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.javaapache
public void init(final ArrayList<Task> idToTask, int idToTaskBase) { this.threadId = Thread.currentThread().getId(); executorTransfer.initLocalRecvQueues(); while (!stormActive.get()) { Utils.sleep(100); } LOG.info("Opening spout {}:{}", componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); //...... } public Callable<Long> call() throws Exception { init(idToTask, idToTaskBase); return new Callable<Long>() { final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount(); int recvqCheckSkips = 0; int swIdleCount = 0; // counter for spout wait strategy int bpIdleCount = 0; // counter for back pressure wait strategy int rmspCount = 0; @Override public Long call() throws Exception { int receiveCount = 0; if (recvqCheckSkips++ == recvqCheckSkipCountMax) { receiveCount = receiveQueue.consume(SpoutExecutor.this); recvqCheckSkips = 0; } long currCount = emittedCount.get(); boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); boolean isActive = stormActive.get(); if (!isActive) { inactiveExecute(); return 0L; } if (!lastActive.get()) { lastActive.set(true); activateSpouts(); } boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); boolean noEmits = true; long emptyStretch = 0; if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) { for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators. spouts.get(j).nextTuple(); } noEmits = (currCount == emittedCount.get()); if (noEmits) { emptyEmitStreak.increment(); } else { emptyStretch = emptyEmitStreak.get(); emptyEmitStreak.set(0); } } if (reachedMaxSpoutPending) { if (rmspCount == 0) { LOG.debug("Reached max spout pending"); } rmspCount++; } else { if (rmspCount > 0) { LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); } rmspCount = 0; } if (receiveCount > 1) { // continue without idling return 0L; } if (!pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } bpIdleCount = 0; if (noEmits) { spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch); return 0L; } swIdleCount = 0; return 0L; } private void backPressureWaitStrategy() throws InterruptedException { long start = Time.currentTimeMillis(); if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait."); } bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount); spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start); } private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException { emptyEmitStreak.increment(); long start = Time.currentTimeMillis(); swIdleCount = spoutWaitStrategy.idle(swIdleCount); if (reachedMaxSpoutPending) { spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); } else { if (emptyStretch > 0) { LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch); } } } // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) { if (executorTransfer.tryTransfer(t, null)) { pendingEmits.poll(); } else { // to avoid reordering of emits, stop at first failure return false; } } return true; } }; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.javasegmentfault
@Override public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { try { return sendSpoutMsg(streamId, tuple, messageId, null); } catch (InterruptedException e) { LOG.warn("Spout thread interrupted during emit()."); throw new RuntimeException(e); } } @Override public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { try { sendSpoutMsg(streamId, tuple, messageId, taskId); } catch (InterruptedException e) { LOG.warn("Spout thread interrupted during emitDirect()."); throw new RuntimeException(e); } } private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws InterruptedException { emittedCount.increment(); List<Integer> outTasks; if (outTaskId != null) { outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); } else { outTasks = taskData.getOutgoingTasks(stream, values); } final boolean needAck = (messageId != null) && hasAckers; final List<Long> ackSeq = needAck ? new ArrayList<>() : null; final long rootId = needAck ? MessageId.generateId(random) : 0; for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators. Integer t = outTasks.get(i); MessageId msgId; if (needAck) { long as = MessageId.generateId(random); msgId = MessageId.makeRootId(rootId, as); ackSeq.add(as); } else { msgId = MessageId.makeUnanchored(); } final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId); AddressedTuple adrTuple = new AddressedTuple(t, tuple); executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits()); } if (isEventLoggers) { taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits()); } if (needAck) { boolean sample = executor.samplerCheck(); TupleInfo info = new TupleInfo(); info.setTaskId(this.taskId); info.setStream(stream); info.setMessageId(messageId); if (isDebug) { info.setValues(values); } if (sample) { info.setTimestamp(System.currentTimeMillis()); } pending.put(rootId, info); List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId); taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits()); } else if (messageId != null) { // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical if (isDebug) { if (spoutExecutorThdId != Thread.currentThread().getId()) { throw new RuntimeException("Detected background thread emitting tuples for the spout. " + "Spout Output Collector should only emit from the main spout executor thread."); } } globalTupleInfo.clear(); globalTupleInfo.setStream(stream); globalTupleInfo.setValues(values); globalTupleInfo.setMessageId(messageId); globalTupleInfo.setTimestamp(0); globalTupleInfo.setId("0:"); Long timeDelta = 0L; executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo); } return outTasks; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.javaapp
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { _throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); for (String spoutId : _managedSpoutIds) { _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); } _currTransaction = getStoredCurrTransaction(); _collector = collector; Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); if (active == null) { _maxTransactionActive = 1; } else { _maxTransactionActive = active.intValue(); } _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); for (int i = 0; i < _spouts.size(); i++) { String txId = _managedSpoutIds.get(i); _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); } LOG.debug("Opened {}", this); } private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), // and there won't be a batch for tx 4 because there's max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this); } if (_active) { if (_activeTx.size() < _maxTransactionActive) { Long curr = _currTransaction; for (int i = 0; i < _maxTransactionActive; i++) { if (!_activeTx.containsKey(curr) && isReady(curr)) { // by using a monotonically increasing attempt id, downstream tasks // can be memory efficient by clearing out state for old attempts // as soon as they see a higher attempt id for a transaction Integer attemptId = _attemptIds.get(curr); if (attemptId == null) { attemptId = 0; } else { attemptId++; } _attemptIds.put(curr, attemptId); for (TransactionalState state : _states) { state.setData(CURRENT_ATTEMPTS, _attemptIds); } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); } } } }
topology.max.spout.pending
),默認爲null,只對於開啓可靠(msgId
)消息的spout起做用;對於普通spout,SpoutOutputCollectorImpl判斷沒有開啓ack的話,不會往pending隊列添加數據,於是reachedMaxSpoutPending爲false,不會觸發maxSpoutPending的機制;而對於trident的spout,默認是使用TransactionAttempt.getTransactionId()做爲batchId,按transaction進行追蹤