窗口主要用於聚合(aggregations)、鏈接(joins)、模式匹配(pattern matching)等。Windows能夠看做是一個內存表,其中基於一些策略添加和清除事件。
Storm has support for sliding and tumbling windows based on time duration and/or event count.windows
Storm支持sliding滑動窗口和tumbling滾動窗口,基於時間週期 and/or 事件計數。api
import java.util.List; /** * A view of events in a sliding window. * * @param <T> the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple} */ public interface Window<T> { /** * Gets the list of events in the window. * * @return the list of events in the window. */ List<T> get(); /** * Get the list of newly added events in the window since the last time the window was generated. * * @return the list of newly added events in the window. */ List<T> getNew(); /** * Get the list of events expired from the window since the last time the window was generated. * * @return the list of events expired from the window. */ List<T> getExpired(); } |
package org.apache.storm.windowing; import org.apache.storm.tuple.Tuple; /** * A {@link Window} that contains {@link Tuple} objects. */ public interface TupleWindow extends Window<Tuple> { } |
package org.apache.storm.windowing; import org.apache.storm.tuple.Tuple; import java.util.List; /** * Holds the expired, new and current tuples in a window. */ public class TupleWindowImpl implements TupleWindow { private final List<Tuple> tuples; private final List<Tuple> newTuples; private final List<Tuple> expiredTuples; public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) { this.tuples = tuples; this.newTuples = newTuples; this.expiredTuples = expiredTuples; } @Override public List<Tuple> get() { return tuples; } @Override public List<Tuple> getNew() { return newTuples; } @Override public List<Tuple> getExpired() { return expiredTuples; } @Override public String toString() { return "TupleWindowImpl{" + "tuples=" + tuples + ", newTuples=" + newTuples + ", expiredTuples=" + expiredTuples + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TupleWindowImpl that = (TupleWindowImpl) o; if (tuples != null ? !tuples.equals(that.tuples) : that.tuples != null) return false; if (newTuples != null ? !newTuples.equals(that.newTuples) : that.newTuples != null) return false; return expiredTuples != null ? expiredTuples.equals(that.expiredTuples) : that.expiredTuples == null; } @Override public int hashCode() { int result = tuples != null ? tuples.hashCode() : 0; result = 31 * result + (newTuples != null ? newTuples.hashCode() : 0); result = 31 * result + (expiredTuples != null ? expiredTuples.hashCode() : 0); return result; } } |
例如,一個Sliding window持續時間爲10s,滑動間隔爲5s。每隔5s窗口會被估算,w2和w3的一些tuples會重複。在w2到w3進行滑動的時刻,e1和e2將過時expired (dropped out of the event queue),
經過在拓撲中定義topologyBuilder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),1).shuffleGrouping("spout");
其中SlidingWindowBolt繼承了BaseWindowedBolt,時間窗經過BaseWindowedBolt.withWindow(Duration windowLength, Duration slidingInterval)定義。
withWindow(Count windowLength, Count slidingInterval) Tuple count based sliding window that slides after `slidingInterval` number of tuples.滑窗 窗口長度:tuple數, 滑動間隔: tuple數 withWindow(Count windowLength) Tuple count based window that slides with every incoming tuple.滑窗 窗口長度:tuple數, 滑動間隔: 每一個tuple進來都滑,每一個tuple傳入都會觸發一次windowbolt計算。 withWindow(Count windowLength, Duration slidingInterval) Tuple count based sliding window that slides after `slidingInterval` time duration.滑窗 窗口長度:tuple數, 滑動間隔: 時間間隔 withWindow(Duration windowLength, Duration slidingInterval) Time duration based sliding window that slides after `slidingInterval` time duration.滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔 withWindow(Duration windowLength) Time duration based window that slides with every incoming tuple.滑窗 窗口長度:時間間隔, 滑動間隔: 每一個tuple進來都滑 withWindow(Duration windowLength, Count slidingInterval) Time duration based sliding window configuration that slides after `slidingInterval` number of tuples.滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔 |
public class SlidingWindowBolt extends BaseWindowedBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { for(Tuple tuple: inputWindow.get()) { // do the windowing computation ... } // emit the results collector.emit(new Values(computedValue)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } |
例如,一個Tumbling window的持續時間爲5s。
經過在拓撲中定義topologyBuilder.setBolt("slidingwindowbolt", new TumblingWindowBolt().withTumblingWindow(new Count(30)),1).shuffleGrouping("spout");
時間窗經過BaseWindowedBolt.withTumblingWindow(Duration windowLength)定義。
withTumblingWindow(BaseWindowedBolt.Count count) Count based tumbling window that tumbles after the specified count of tuples.滾窗 窗口長度:Tuple數 withTumblingWindow(BaseWindowedBolt.Duration duration) Time duration based tumbling window that tumbles after the specified time duration.滾窗 窗口長度:時間間隔 |
/** * Specify the tuple field that represents the timestamp as a long value. If this field * is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. * * @param fieldName the name of the field that contains the timestamp */ public BaseWindowedBolt withTimestampField(String fieldName) |
/** * Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps * cannot be out of order by more than this amount. * * @param duration the max lag duration */ public BaseWindowedBolt withLag(Duration duration) |
例如,若是延遲是5秒,而一個tuple t1到達的時間戳是06:00:05。沒有tuple的到達時間戳比06:00:00更早。若是一個tuple在t1以後的到達時間戳是05:59:59,窗口已經移動到t1,它將被視爲一個late tuple,不會處理。
目前,the late tuples只在INFO級別的工做日誌文件中登陸。
Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
/** * Specify the watermark event generation interval. Watermark events * are used to track the progress of time * * @param interval the interval at which watermark events are generated */ public BaseWindowedBolt withWatermarkInterval(Duration interval) |
When a watermark is received, all windows up to that timestamp will be evaluated. For example, consider tuple timestamp based processing with following window parameters,
Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
Current timestamp = 09:00:00
Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) are received between 9:00:00 and 9:00:01.
At time t = 09:00:01, watermark w1 = 6:00:31 is emitted since no tuples earlier than 6:00:31 can arrive.
Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s).
e6 is not evaluated since watermark timestamp 6:00:31 is lesser than the tuple ts 6:00:36.
Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) are received between 9:00:01 and 9:00:02
At time t = 09:00:02 another watermark w2 = 08:00:34 is emitted since no tuples earlier than 8:00:34 can arrive now. Three windows will be evaluated,
e10 is not evaluated since the tuple ts 8:00:39 is beyond the watermark time 8:00:34.
The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
Storm的窗口功能目前提供了at-least once保證。經過Bolt的execute方法發射的tuples被自動地錨定到inputWindow中的全部tuple中。下游的bolt會對接收到的bolt進行ack以完成tuple tree。若是不是,tuple將被replay,而且窗口計算會從新進行。