本文主要研究一下storm的ack機制html
public class AckSentenceSpout extends BaseRichSpout { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private int index = 0; private String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas" }; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.pending = new ConcurrentHashMap<UUID, Values>(); } @Override public void nextTuple() { Values values = new Values(sentences[index]); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); // this.collector.emit(values); //NOTE 這裏要傳入msgId this.collector.emit(values, msgId); index++; if (index >= sentences.length) { index = 0; } Utils.sleep(100); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { this.pending.remove(msgId); } //NOTE 對於ack是失敗的,要從新發送 @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); } }
public class AckWordCountBolt extends BaseRichBolt { private static final Logger LOGGER = LoggerFactory.getLogger(AckWordCountBolt.class); private OutputCollector collector; private HashMap<String, Long> counts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap<String, Long>(); } public void execute(Tuple tuple) { try{ String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); //NOTE 傳入當前處理的tuple做爲anchor this.collector.emit(tuple, new Values(word, count)); //NOTE 這裏要本身ack this.collector.ack(tuple); }catch (Exception e){ LOGGER.error(e.getMessage(),e); //NOTE 處理異常要fail this.collector.fail(tuple); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.javajava
@Override public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { try { return sendSpoutMsg(streamId, tuple, messageId, null); } catch (InterruptedException e) { LOG.warn("Spout thread interrupted during emit()."); throw new RuntimeException(e); } } private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws InterruptedException { emittedCount.increment(); List<Integer> outTasks; if (outTaskId != null) { outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); } else { outTasks = taskData.getOutgoingTasks(stream, values); } final boolean needAck = (messageId != null) && hasAckers; final List<Long> ackSeq = needAck ? new ArrayList<>() : null; final long rootId = needAck ? MessageId.generateId(random) : 0; for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators. Integer t = outTasks.get(i); MessageId msgId; if (needAck) { long as = MessageId.generateId(random); msgId = MessageId.makeRootId(rootId, as); ackSeq.add(as); } else { msgId = MessageId.makeUnanchored(); } final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId); AddressedTuple adrTuple = new AddressedTuple(t, tuple); executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits()); } if (isEventLoggers) { taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits()); } if (needAck) { boolean sample = executor.samplerCheck(); TupleInfo info = new TupleInfo(); info.setTaskId(this.taskId); info.setStream(stream); info.setMessageId(messageId); if (isDebug) { info.setValues(values); } if (sample) { info.setTimestamp(System.currentTimeMillis()); } pending.put(rootId, info); List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId); taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits()); } else if (messageId != null) { // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical if (isDebug) { if (spoutExecutorThdId != Thread.currentThread().getId()) { throw new RuntimeException("Detected background thread emitting tuples for the spout. " + "Spout Output Collector should only emit from the main spout executor thread."); } } globalTupleInfo.clear(); globalTupleInfo.setStream(stream); globalTupleInfo.setValues(values); globalTupleInfo.setMessageId(messageId); globalTupleInfo.setTimestamp(0); globalTupleInfo.setId("0:"); Long timeDelta = 0L; executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo); } return outTasks; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.javanode
@Override public void ack(Tuple input) { if (!ackingEnabled) { return; } long ackValue = ((TupleImpl) input).getAckVal(); Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds(); for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) { task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID, new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } if (!task.getUserContext().getHooks().isEmpty()) { BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); boltAckInfo.applyOn(task.getUserContext()); } if (delta >= 0) { executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getAcked(input.getSourceStreamId())); } } @Override public void fail(Tuple input) { if (!ackingEnabled) { return; } Set<Long> roots = input.getMessageId().getAnchors(); for (Long root : roots) { task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); boltFailInfo.applyOn(task.getUserContext()); if (delta >= 0) { executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getFailed(input.getSourceStreamId())); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.javaapache
// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) { Tuple tuple = getTuple(stream, values); List<Integer> tasks = getOutgoingTasks(stream, values); for (Integer t : tasks) { AddressedTuple addressedTuple = new AddressedTuple(t, tuple); transfer.tryTransfer(addressedTuple, pendingEmits); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.javajson
// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) { if (isDebug) { LOG.info("TRANSFERRING tuple {}", addressedTuple); } JCQueue localQueue = getLocalQueue(addressedTuple); if (localQueue != null) { return tryTransferLocal(addressedTuple, localQueue, pendingEmits); } return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer); } /** * Adds tuple to localQueue (if overflow is empty). If localQueue is full adds to pendingEmits instead. pendingEmits can be null. * Returns false if unable to add to localQueue. */ public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue<AddressedTuple> pendingEmits) { workerData.checkSerialize(serializer, tuple); if (pendingEmits != null) { if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) { queuesToFlush.set(tuple.dest - indexingBase, localQueue); return true; } else { pendingEmits.add(tuple); return false; } } else { return localQueue.tryPublish(tuple); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java緩存
public static StormTopology systemTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { return _instance.systemTopologyImpl(topoConf, topology); } protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { validateBasic(topology); StormTopology ret = topology.deepCopy(); addAcker(topoConf, ret); if (hasEventLoggers(topoConf)) { addEventLogger(topoConf, ret); } addMetricComponents(topoConf, ret); addSystemComponents(topoConf, ret); addMetricStreams(ret); addSystemStreams(ret); validateStructure(ret); return ret; } public static void addAcker(Map<String, Object> conf, StormTopology topology) { int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology); Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>(); outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); Map<String, Object> ackerConf = new HashMap<>(); ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf); for (Bolt bolt : topology.get_bolts().values()) { ComponentCommon common = bolt.get_common(); common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); } for (SpoutSpec spout : topology.get_spouts().values()) { ComponentCommon common = spout.get_common(); Map<String, Object> spoutConf = componentConf(spout); spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); common.set_json_conf(JSONValue.toJSONString(spoutConf)); common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping()); } topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker); } public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) { Map<GlobalStreamId, Grouping> inputs = new HashMap<>(); Set<String> boltIds = topology.get_bolts().keySet(); Set<String> spoutIds = topology.get_spouts().keySet(); for (String id : spoutIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } for (String id : boltIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } return inputs; } public static IBolt makeAckerBolt() { return _instance.makeAckerBoltImpl(); } public IBolt makeAckerBoltImpl() { return new Acker(); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.javaapp
public class Acker implements IBolt { public static final String ACKER_COMPONENT_ID = "__acker"; public static final String ACKER_INIT_STREAM_ID = "__ack_init"; public static final String ACKER_ACK_STREAM_ID = "__ack_ack"; public static final String ACKER_FAIL_STREAM_ID = "__ack_fail"; public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout"; public static final int TIMEOUT_BUCKET_NUM = 3; private static final Logger LOG = LoggerFactory.getLogger(Acker.class); private static final long serialVersionUID = 4430906880683183091L; private OutputCollector collector; private RotatingMap<Object, AckObject> pending; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM); } @Override public void execute(Tuple input) { if (TupleUtils.isTick(input)) { Map<Object, AckObject> tmp = pending.rotate(); LOG.debug("Number of timeout tuples:{}", tmp.size()); return; } boolean resetTimeout = false; String streamId = input.getSourceStreamId(); Object id = input.getValue(0); AckObject curr = pending.get(id); if (ACKER_INIT_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); pending.put(id, curr); } curr.updateAck(input.getLong(1)); curr.spoutTask = input.getInteger(2); } else if (ACKER_ACK_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); pending.put(id, curr); } curr.updateAck(input.getLong(1)); } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) { // For the case that ack_fail message arrives before ack_init if (curr == null) { curr = new AckObject(); } curr.failed = true; pending.put(id, curr); } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { resetTimeout = true; if (curr != null) { pending.put(id, curr); } //else if it has not been added yet, there is no reason time it out later on } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) { collector.flush(); return; } else { LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask()); return; } int task = curr.spoutTask; if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) { Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime)); if (curr.val == 0) { pending.remove(id); collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple); } else if (curr.failed) { pending.remove(id); collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple); } else if (resetTimeout) { collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple); } else { throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code."); } } collector.ack(input); } @Override public void cleanup() { LOG.info("Acker: cleanup successfully"); } private long getTimeDeltaMillis(long startTimeMillis) { return Time.currentTimeMillis() - startTimeMillis; } private static class AckObject { public long val = 0L; public long startTime = Time.currentTimeMillis(); public int spoutTask = -1; public boolean failed = false; // val xor value public void updateAck(Long value) { val = Utils.bitXor(val, value); } } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.javadom
public class SpoutExecutor extends Executor { //...... @Override public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception { String streamId = tuple.getSourceStreamId(); if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) { spoutOutputCollector.flush(); } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) { pending.rotate(); } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) { metricsTick(idToTask.get(taskId - idToTaskBase), tuple); } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) { Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject(); if (spoutObj instanceof ICredentialsListener) { ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0)); } } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) { Long id = (Long) tuple.getValue(0); TupleInfo pendingForId = pending.get(id); if (pendingForId != null) { pending.put(id, pendingForId); } } else { Long id = (Long) tuple.getValue(0); Long timeDeltaMs = (Long) tuple.getValue(1); TupleInfo tupleInfo = pending.remove(id); if (tupleInfo != null && tupleInfo.getMessageId() != null) { if (taskId != tupleInfo.getTaskId()) { throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId()); } Long timeDelta = null; if (hasAckers) { long startTimeMs = tupleInfo.getTimestamp(); if (startTimeMs != 0) { timeDelta = timeDeltaMs; } } if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) { ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo); } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) { failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM"); } } } } public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) { try { ISpout spout = (ISpout) taskData.getTaskObject(); int taskId = taskData.getTaskId(); if (executor.getIsDebug()) { LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId()); } spout.ack(tupleInfo.getMessageId()); if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); } if (hasAckers && timeDelta != null) { executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta, taskData.getTaskMetrics().getAcked(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); } } public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) { try { ISpout spout = (ISpout) taskData.getTaskObject(); int taskId = taskData.getTaskId(); if (executor.getIsDebug()) { LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason); } spout.fail(tupleInfo.getMessageId()); new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); if (timeDelta != null) { executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta, taskData.getTaskMetrics().getFailed(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); } } }
若是是遠程node則會遠程進行遠程調用
),發送到的stream爲Acker.ACKER_ACK_STREAM_ID或者Acker.ACKER_FAIL_STREAM_ID