本文主要研究一下storm的messageTimeouthtml
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.javajava
/** * True if Storm should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being * accidentally timed out during the test. */ @isBoolean public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts"; /** * The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within * this time frame, Storm will fail the message on the spout. Some spouts implementations will then replay the message at a later time. */ @isInteger @isPositiveNumber @NotNull public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; /** * How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a * component-specific configuration. */ @isInteger public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = "topology.tick.tuple.freq.secs";
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.javaapache
public static void addAcker(Map<String, Object> conf, StormTopology topology) { int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology); Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>(); outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); Map<String, Object> ackerConf = new HashMap<>(); ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf); for (Bolt bolt : topology.get_bolts().values()) { ComponentCommon common = bolt.get_common(); common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); } for (SpoutSpec spout : topology.get_spouts().values()) { ComponentCommon common = spout.get_common(); Map<String, Object> spoutConf = componentConf(spout); spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); common.set_json_conf(JSONValue.toJSONString(spoutConf)); common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping()); } topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.javajson
protected void setupTicks(boolean isSpout) { final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null); if (tickTimeSecs != null) { boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId)) || (!enableMessageTimeout && isSpout)) { LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId); } else { StormTimer timerTask = workerData.getUserTimer(); timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> { TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); try { receiveQueue.publish(tickTuple); receiveQueue.flush(); // avoid buffering } catch (InterruptedException e) { LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag."); Thread.currentThread().interrupt(); return; } } ); } } }
__system
),taskId設置爲Constants.SYSTEM_TASK_ID(-1
),streamId設置爲Constants.SYSTEM_TICK_STREAM_ID(__tick
)storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.javasegmentfault
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception { String streamId = tuple.getSourceStreamId(); if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) { spoutOutputCollector.flush(); } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) { pending.rotate(); } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) { metricsTick(idToTask.get(taskId - idToTaskBase), tuple); } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) { Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject(); if (spoutObj instanceof ICredentialsListener) { ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0)); } } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) { Long id = (Long) tuple.getValue(0); TupleInfo pendingForId = pending.get(id); if (pendingForId != null) { pending.put(id, pendingForId); } } else { Long id = (Long) tuple.getValue(0); Long timeDeltaMs = (Long) tuple.getValue(1); TupleInfo tupleInfo = pending.remove(id); if (tupleInfo != null && tupleInfo.getMessageId() != null) { if (taskId != tupleInfo.getTaskId()) { throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId()); } Long timeDelta = null; if (hasAckers) { long startTimeMs = tupleInfo.getTimestamp(); if (startTimeMs != 0) { timeDelta = timeDeltaMs; } } if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) { ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo); } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) { failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM"); } } } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.javajvm
public Map<K, V> rotate() { Map<K, V> dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); if (_callback != null) { for (Entry<K, V> entry : dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } return dead; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.javaide
public void init(final ArrayList<Task> idToTask, int idToTaskBase) { this.threadId = Thread.currentThread().getId(); executorTransfer.initLocalRecvQueues(); while (!stormActive.get()) { Utils.sleep(100); } LOG.info("Opening spout {}:{}", componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); this.spouts = new ArrayList<>(); for (Task task : idToTask) { if (task != null) { this.spouts.add((ISpout) task.getTaskObject()); } } this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { @Override public void expire(Long key, TupleInfo tupleInfo) { Long timeDelta = null; if (tupleInfo.getTimestamp() != 0) { timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); } failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT"); } }); //...... }
topology.enable.message.timeouts默認true
)、Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs默認30
)對於沒有開啓可靠(msgId)消息的spout,其pending隊列爲空,於是這裏的rotate以及ExpiredCallback就至關於沒有效果
)