本文主要研究一下storm的tickTuplehtml
public class TickWordCountBolt extends BaseBasicBolt { private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class); Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return conf; } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(TupleUtils.isTick(input)){ //execute tick logic LOGGER.info("execute tick tuple, emit and clear counts"); counts.entrySet().stream() .forEach(entry -> collector.emit(new Values(entry.getKey(), entry.getValue()))); counts.clear(); }else{ String word = input.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
@Test public void testTickTuple() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); //併發度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new TickWordCountBolt(), 5) // .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3) .fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1) .shuffleGrouping("count"); SubmitHelper.submitRemote("tickDemo",builder); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.javajava
public static boolean isTick(Tuple tuple) { return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.javaapache
/** * Define a new bolt in this topology with the specified amount of parallelism. * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's * outputs. * @param bolt the bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process * somewhere around the cluster. * @return use the returned object to declare the inputs to this component * * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException { validateUnusedId(id); initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); } private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if (parallelism != null) { int dop = parallelism.intValue(); if (dop < 1) { throw new IllegalArgumentException("Parallelism must be positive."); } common.set_parallelism_hint(dop); } Map<String, Object> conf = component.getComponentConfiguration(); if (conf != null) { common.set_json_conf(JSONValue.toJSONString(conf)); } commons.put(id, common); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.javajson
protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer { //...... }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.javasegmentfault
public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer> implements ComponentConfigurationDeclarer<T> { @Override public T addConfiguration(String config, Object value) { Map<String, Object> configMap = new HashMap<>(); configMap.put(config, value); return addConfigurations(configMap); } //...... }
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> { String id; public ConfigGetter(String id) { this.id = id; } @SuppressWarnings("unchecked") @Override public T addConfigurations(Map<String, Object> conf) { if (conf != null) { if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { throw new IllegalArgumentException("Cannot set serializations for a component using fluent API"); } if (!conf.isEmpty()) { String currConf = commons.get(id).get_json_conf(); commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf)); } } return (T) this; } //...... } private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) { Map<String, Object> res = new HashMap<>(into); res.putAll(newMap); return JSONValue.toJSONString(res); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java併發
private Map<String, Object> normalizedComponentConf( Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) { List<String> keysToRemove = retrieveAllConfigKeys(); keysToRemove.remove(Config.TOPOLOGY_DEBUG); keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING); keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM); keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID); keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS); keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY); keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT); keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS); keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT); keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS); keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS); keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME); keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER); keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG); keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); Map<String, Object> componentConf; String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf(); if (specJsonConf != null) { try { componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf); } catch (ParseException e) { throw new RuntimeException(e); } for (Object p : keysToRemove) { componentConf.remove(p); } } else { componentConf = new HashMap<>(); } Map<String, Object> ret = new HashMap<>(); ret.putAll(topoConf); ret.putAll(componentConf); return ret; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.javajvm
protected void setupTicks(boolean isSpout) { final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null); if (tickTimeSecs != null) { boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId)) || (!enableMessageTimeout && isSpout)) { LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId); } else { StormTimer timerTask = workerData.getUserTimer(); timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> { TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); try { receiveQueue.publish(tickTuple); receiveQueue.flush(); // avoid buffering } catch (InterruptedException e) { LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag."); Thread.currentThread().interrupt(); return; } } ); } } }
__system
),taskId設置爲Constants.SYSTEM_TASK_ID(-1
),streamId設置爲Constants.SYSTEM_TICK_STREAM_ID(__tick
)receiveQueue
).publish(tickTuple)private final DirectInserter directInserter = new DirectInserter(this); /** * Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt(). */ public void publish(Object obj) throws InterruptedException { Inserter inserter = getInserter(); inserter.publish(obj); } private Inserter getInserter() { Inserter inserter; if (producerBatchSz > 1) { inserter = thdLocalBatcher.get(); if (inserter == null) { BatchInserter b = new BatchInserter(this, producerBatchSz); inserter = b; thdLocalBatcher.set(b); } } else { inserter = directInserter; } return inserter; } private static class DirectInserter implements Inserter { private JCQueue q; public DirectInserter(JCQueue q) { this.q = q; } /** * Blocking call, that can be interrupted via Thread.interrupt */ @Override public void publish(Object obj) throws InterruptedException { boolean inserted = q.tryPublishInternal(obj); int idleCount = 0; while (!inserted) { q.metrics.notifyInsertFailure(); if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName()); } idleCount = q.backPressureWaitStrategy.idle(idleCount); if (Thread.interrupted()) { throw new InterruptedException(); } inserted = q.tryPublishInternal(obj); } } //...... } // Non Blocking. returns true/false indicating success/failure. Fails if full. private boolean tryPublishInternal(Object obj) { if (recvQueue.offer(obj)) { metrics.notifyArrivals(1); return true; } return false; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.javaasync
/** * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of * elements consumed from Q */ public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) { try { return consumeImpl(consumer, exitCond); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q * * @param consumer * @param exitCond */ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException { int drainCount = 0; while (exitCond.keepRunning()) { Object tuple = recvQueue.poll(); if (tuple == null) { break; } consumer.accept(tuple); ++drainCount; } int overflowDrainCount = 0; int limit = overflowQ.size(); while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow Object tuple = overflowQ.poll(); ++overflowDrainCount; consumer.accept(tuple); } int total = drainCount + overflowDrainCount; if (total > 0) { consumer.flush(); } return total; }