窗口計算是流處理中經常使用的狀況之一,在這種狀況下,無邊界數據流被按必定的標準(例如時間)分割成有限集合,並在每一組事件上應用計算。一個例子是在最近一小時內計算最熱門的Twitter話題。java
窗口主要用於聚合(aggregations)、鏈接(joins)、模式匹配(pattern matching)等。Windows能夠看做是一個內存表,其中基於一些策略添加和清除事件。
到目前爲止,Storm依靠開發人員來構建本身的窗口邏輯。對於拓撲結構中的標準方式,開發人員不能使用推薦的或高層次的抽象來定義窗口。apache
Storm has support for sliding and tumbling windows based on time duration and/or event count.windows
Storm支持sliding滑動窗口和tumbling滾動窗口,基於時間週期 and/or 事件計數。api
Storm中Window接口是時間窗口的抽象。有三個事件參數的泛型方法:get爲目前當前window的事件,getNew爲window新增事件,getExpired爲過時事件。get方法中的事件包含getNew中的事件。less
import java.util.List; /** * A view of events in a sliding window. * * @param <T> the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple} */ public interface Window<T> { /** * Gets the list of events in the window. * * @return the list of events in the window. */ List<T> get(); /** * Get the list of newly added events in the window since the last time the window was generated. * * @return the list of newly added events in the window. */ List<T> getNew(); /** * Get the list of events expired from the window since the last time the window was generated. * * @return the list of events expired from the window. */ List<T> getExpired(); } |
TupleWindow接口繼承了Window接口,限定了必須包含Tuple對象。dom
package org.apache.storm.windowing; import org.apache.storm.tuple.Tuple; /** * A {@link Window} that contains {@link Tuple} objects. */ public interface TupleWindow extends Window<Tuple> { } |
TupleWindowImpl類實現了TupleWindow接口。ide
package org.apache.storm.windowing; import org.apache.storm.tuple.Tuple; import java.util.List; /** * Holds the expired, new and current tuples in a window. */ public class TupleWindowImpl implements TupleWindow { private final List<Tuple> tuples; private final List<Tuple> newTuples; private final List<Tuple> expiredTuples; public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) { this.tuples = tuples; this.newTuples = newTuples; this.expiredTuples = expiredTuples; } @Override public List<Tuple> get() { return tuples; } @Override public List<Tuple> getNew() { return newTuples; } @Override public List<Tuple> getExpired() { return expiredTuples; } @Override public String toString() { return "TupleWindowImpl{" + "tuples=" + tuples + ", newTuples=" + newTuples + ", expiredTuples=" + expiredTuples + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TupleWindowImpl that = (TupleWindowImpl) o; if (tuples != null ? !tuples.equals(that.tuples) : that.tuples != null) return false; if (newTuples != null ? !newTuples.equals(that.newTuples) : that.newTuples != null) return false; return expiredTuples != null ? expiredTuples.equals(that.expiredTuples) : that.expiredTuples == null; } @Override public int hashCode() { int result = tuples != null ? tuples.hashCode() : 0; result = 31 * result + (newTuples != null ? newTuples.hashCode() : 0); result = 31 * result + (expiredTuples != null ? expiredTuples.hashCode() : 0); return result; } } |
Tuples被分組在窗口,窗口每一個滑動間隔會滑動。一個tuple可能屬於多個時間窗口。這種叫作slidlingWindow(滑動窗口)。ui
在設置slidlingWindow時須要指定兩個時間間隔:this
按照固定的時間間隔或者Tuple數量滑動窗口:lua
例如,一個Sliding window持續時間爲10s,滑動間隔爲5s。每隔5s窗口會被估算,w2和w3的一些tuples會重複。在w2到w3進行滑動的時刻,e1和e2將過時expired (dropped out of the event queue),
時間窗會在t=5s的時候移動,會包含第一個5s的時候接收到的事件。
經過在拓撲中定義topologyBuilder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),1).shuffleGrouping("spout");
其中SlidingWindowBolt繼承了BaseWindowedBolt,時間窗經過BaseWindowedBolt.withWindow(Duration windowLength, Duration slidingInterval)定義。
withWindow(Count windowLength, Count slidingInterval) Tuple count based sliding window that slides after `slidingInterval` number of tuples.滑窗 窗口長度:tuple數, 滑動間隔: tuple數 withWindow(Count windowLength) Tuple count based window that slides with every incoming tuple.滑窗 窗口長度:tuple數, 滑動間隔: 每一個tuple進來都滑,每一個tuple傳入都會觸發一次windowbolt計算。 withWindow(Count windowLength, Duration slidingInterval) Tuple count based sliding window that slides after `slidingInterval` time duration.滑窗 窗口長度:tuple數, 滑動間隔: 時間間隔 withWindow(Duration windowLength, Duration slidingInterval) Time duration based sliding window that slides after `slidingInterval` time duration.滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔 withWindow(Duration windowLength) Time duration based window that slides with every incoming tuple.滑窗 窗口長度:時間間隔, 滑動間隔: 每一個tuple進來都滑 withWindow(Duration windowLength, Count slidingInterval) Time duration based sliding window configuration that slides after `slidingInterval` number of tuples.滑窗 窗口長度:時間間隔, 滑動間隔: 時間間隔 |
public class SlidingWindowBolt extends BaseWindowedBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { for(Tuple tuple: inputWindow.get()) { // do the windowing computation ... } // emit the results collector.emit(new Values(computedValue)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } |
Tuples會根據時間和數量被分到一個單獨的窗口。任何一個tuple都只屬於一個窗口。這種叫作TumblingWindow(滾動窗口)。指定窗口長度和滑動間隔做爲元組數或時間間隔的計數。
例如,一個Tumbling window的持續時間爲5s。
經過在拓撲中定義topologyBuilder.setBolt("slidingwindowbolt", new TumblingWindowBolt().withTumblingWindow(new Count(30)),1).shuffleGrouping("spout");
其中TumblingWindowBolt繼承了BaseWindowedBolt,它有用於指定窗口長度和滑動間隔的api。
時間窗經過BaseWindowedBolt.withTumblingWindow(Duration windowLength)定義。
withTumblingWindow(BaseWindowedBolt.Count count) Count based tumbling window that tumbles after the specified count of tuples.滾窗 窗口長度:Tuple數 withTumblingWindow(BaseWindowedBolt.Duration duration) Time duration based tumbling window that tumbles after the specified time duration.滾窗 窗口長度:時間間隔 |
默認狀況下,在窗口中跟蹤的時間戳是由bolt處理元組的時間。窗口計算是基於處理時間戳執行的。Storm支持追蹤源數據的時間戳。
默認狀況下,窗口中跟蹤的時間戳是由bolt處理元組的時間。窗口的計算是基於時間戳進行處理。Storm支持基於源生成的時間戳來跟蹤窗口。這對處理基於時間發生時的事件頗有用,例如處理含有時間戳的日誌條目。
/** * Specify the tuple field that represents the timestamp as a long value. If this field * is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. * * @param fieldName the name of the field that contains the timestamp */ public BaseWindowedBolt withTimestampField(String fieldName) |
withTimestampField方法中參數fieldName,將從用於窗口計算的輸入tuple進行查找。若是指定了這個選項,全部的元組都須要包含時間戳字段。若是該字段不存在於元組中,則會拋出異常IllegalArgumentException並終止拓撲。要解決這個問題,必須從spout源(例如kafka)中手動刪除有問題的元組,而且必須從新啓動拓撲。
/** * Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps * cannot be out of order by more than this amount. * * @param duration the max lag duration */ public BaseWindowedBolt withLag(Duration duration) |
除了時間戳字段名以外,還可使用withLag方法指定一個時間延遲參數,該參數表示時間戳以外的tuple的最大時間限制。
例如,若是延遲是5秒,而一個tuple t1到達的時間戳是06:00:05。沒有tuple的到達時間戳比06:00:00更早。若是一個tuple在t1以後的到達時間戳是05:59:59,窗口已經移動到t1,它將被視爲一個late tuple,不會處理。
目前,the late tuples只在INFO級別的工做日誌文件中登陸。
使用時間戳字段處理tuple,Storm內部計算基於傳入的tuple時間戳的watermask。在全部輸入流中,watermask是最新的tuple時間戳(減去延遲)的最小值。
Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
若是Tuple基於timestamps,watermark的時間戳週期性地(默認每秒)被髮射,能夠看作爲窗口計算的時間週期。watermark的發射間隔能夠用下面的API來改變。
/** * Specify the watermark event generation interval. Watermark events * are used to track the progress of time * * @param interval the interval at which watermark events are generated */ public BaseWindowedBolt withWatermarkInterval(Duration interval) |
示例:
When a watermark is received, all windows up to that timestamp will be evaluated. For example, consider tuple timestamp based processing with following window parameters,
Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
Current timestamp = 09:00:00
Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) are received between 9:00:00 and 9:00:01.
At time t = 09:00:01, watermark w1 = 6:00:31 is emitted since no tuples earlier than 6:00:31 can arrive.
Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s).
e6 is not evaluated since watermark timestamp 6:00:31 is lesser than the tuple ts 6:00:36.
Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) are received between 9:00:01 and 9:00:02
At time t = 09:00:02 another watermark w2 = 08:00:34 is emitted since no tuples earlier than 8:00:34 can arrive now. Three windows will be evaluated,
e10 is not evaluated since the tuple ts 8:00:39 is beyond the watermark time 8:00:34.
The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
Storm的窗口功能目前提供了at-least once保證。經過Bolt的execute方法發射的tuples被自動地錨定到inputWindow中的全部tuple中。下游的bolt會對接收到的bolt進行ack以完成tuple tree。若是不是,tuple將被replay,而且窗口計算會從新進行。
窗口中的tuple在到期的時候自動被ac,也就是當tuple在超過窗口長度+滑動間隔的時候。注意配置topology.message.timeout.secs,對於基於時間的窗口,應該足夠大於窗口長度+滑動間隔。不然,tuple將超時並replay,並可能致使重複的評估。對於基於計數的窗口,應該對配置進行調整,以便在超時期間能夠接收到窗口長度+滑動間隔的tuples。