本文主要研究一下storm的WindowedBolthtml
@Test public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("integer", new RandomIntegerSpout(), 1); BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt() //windowLength , slidingInterval .withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS)) //經過withTimestampField指定tuple的某個字段做爲這個tuple的timestamp .withTimestampField("timestamp") //輸入流中最新的元組時間戳的最小值減去Lag值=watermark,用於指定觸發watermark的的interval,默認爲1秒 //當watermark被觸發的時候,tuple timestamp比watermark早的window將被計算 .withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS)) //withLag用於處理亂序的數據,當接收到的tuple的timestamp小於等lastWaterMarkTs(`取這批watermark的最大值`),則會被丟棄 .withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)); builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer"); builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum"); SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology()); }
這裏主要設置了withWindow、withTimestampField、withWatermarkInterval、withLagjava
SlidingWindowSumBoltapache
public class SlidingWindowSumBolt extends BaseWindowedBolt { private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class); private int sum = 0; private OutputCollector collector; @Override public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { /* * The inputWindow gives a view of * (a) all the events in the window * (b) events that expired since last activation of the window * (c) events that newly arrived since last activation of the window */ List<Tuple> tuplesInWindow = inputWindow.get(); List<Tuple> newTuples = inputWindow.getNew(); List<Tuple> expiredTuples = inputWindow.getExpired(); LOG.debug("Events in current window: " + tuplesInWindow.size()); /* * Instead of iterating over all the tuples in the window to compute * the sum, the values for the new events are added and old events are * subtracted. Similar optimizations might be possible in other * windowing computations. */ for (Tuple tuple : newTuples) { sum += (int) tuple.getValue(0); } for (Tuple tuple : expiredTuples) { sum -= (int) tuple.getValue(0); } collector.emit(new Values(sum)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sum")); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.javawindows
/** * A bolt abstraction for supporting time and count based sliding & tumbling windows. */ public interface IWindowedBolt extends IComponent { /** * This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting, * the tuples are automatically anchored to the tuples in the inputWindow. */ void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector); /** * Process the tuple window and optionally emit new tuples based on the tuples in the input window. */ void execute(TupleWindow inputWindow); void cleanup(); /** * Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing * time. * * @return the timestamp extractor */ TimestampExtractor getTimestampExtractor(); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.javadom
/** * A windowed bolt abstraction for supporting windowing operation with state. */ public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt { /** * If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory. * <p> * The default is to keep all the window events in memory. * </p> * * @return true if the windows should be persisted */ default boolean isPersistent() { return false; } /** * The maximum number of window events to keep in memory. */ default long maxEventsInMemory() { return 1_000_000L; // default } }
HBaseKeyValueState、InMemoryKeyValueState、RedisKeyValueState
)中storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.javajvm
/** * Tuple count based sliding window configuration. * * @param windowLength the number of tuples in the window * @param slidingInterval the number of tuples after which the window slides */ public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) { return withWindowLength(windowLength).withSlidingInterval(slidingInterval); } /** * Time duration based sliding window configuration. * * @param windowLength the time duration of the window * @param slidingInterval the time duration after which the window slides */ public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) { return withWindowLength(windowLength).withSlidingInterval(slidingInterval); } /** * A time duration based tumbling window. * * @param duration the time duration after which the window tumbles */ public BaseWindowedBolt withTumblingWindow(Duration duration) { return withWindowLength(duration).withSlidingInterval(duration); } /** * A count based tumbling window. * * @param count the number of tuples after which the window tumbles */ public BaseWindowedBolt withTumblingWindow(Count count) { return withWindowLength(count).withSlidingInterval(count); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.javaide
public class WindowedBoltExecutor implements IRichBolt { public static final String LATE_TUPLE_FIELD = "late_tuple"; private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s private static final int DEFAULT_MAX_LAG_MS = 0; // no lag //...... private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf, TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) { WindowManager<Tuple> manager = stateful ? new StatefulWindowManager<>(lifecycleListener, queue) : new WindowManager<>(lifecycleListener, queue); Count windowLengthCount = null; Duration slidingIntervalDuration = null; Count slidingIntervalCount = null; // window length if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { windowLengthDuration = new Duration( ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } // sliding interval if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) { slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { slidingIntervalDuration = new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } else { // default is a sliding window of count 1 slidingIntervalCount = new Count(1); } // tuple ts if (timestampExtractor != null) { // late tuple stream lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); if (lateTupleStream != null) { if (!context.getThisStreams().contains(lateTupleStream)) { throw new IllegalArgumentException( "Stream for late tuples must be defined with the builder method withLateTupleStream"); } } // max lag if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) { maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue(); } else { maxLagMs = DEFAULT_MAX_LAG_MS; } // watermark interval int watermarkInterval; if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) { watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue(); } else { watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS; } waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval, maxLagMs, getComponentStreams(context)); } else { if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) { throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field"); } } // validate validate(topoConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration); evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, evictionPolicy); manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); return manager; } @Override public void execute(Tuple input) { if (isTupleTs()) { long ts = timestampExtractor.extractTimestamp(input); if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) { windowManager.add(input, ts); } else { if (lateTupleStream != null) { windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); } else { LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } windowedOutputCollector.ack(input); } } else { windowManager.add(input); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.javaui
public class WaterMarkEventGenerator<T> implements Runnable { /** * Creates a new WatermarkEventGenerator. * * @param windowManager The window manager this generator will submit watermark events to * @param intervalMs The generator will check if it should generate a watermark event with this interval * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late * @param inputStreams The input streams this generator is expected to handle */ public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs, int eventTsLagMs, Set<GlobalStreamId> inputStreams) { this.windowManager = windowManager; streamToTs = new ConcurrentHashMap<>(); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("watermark-event-generator-%d") .setDaemon(true) .build(); executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); this.interval = intervalMs; this.eventTsLag = eventTsLagMs; this.inputStreams = inputStreams; } public void start() { this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } //...... /** * Tracks the timestamp of the event in the stream, returns true if the event can be considered for processing or false if its a late * event. */ public boolean track(GlobalStreamId stream, long ts) { Long currentVal = streamToTs.get(stream); if (currentVal == null || ts > currentVal) { streamToTs.put(stream, ts); } checkFailures(); return ts >= lastWaterMarkTs; } @Override public void run() { try { long waterMarkTs = computeWaterMarkTs(); if (waterMarkTs > lastWaterMarkTs) { this.windowManager.add(new WaterMarkEvent<>(waterMarkTs)); lastWaterMarkTs = waterMarkTs; } } catch (Throwable th) { LOG.error("Failed while processing watermark event ", th); throw th; } } /** * Computes the min ts across all streams. */ private long computeWaterMarkTs() { long ts = 0; // only if some data has arrived on each input stream if (streamToTs.size() >= inputStreams.size()) { ts = Long.MAX_VALUE; for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) { ts = Math.min(ts, entry.getValue()); } } return ts - eventTsLag; } }
輸入流中最新的元組時間戳的最小值減去Lag值
),而後若是比lastWaterMarkTs值大,則更新lastWaterMarkTs