本文參考Apache Beam官方編程手冊
能夠結合官方的Mobile Game 代碼閱讀本文。apache
在默認狀況下,Apache Beam是不分窗的,也就是採用GlobalWindow,而若是同時也不設置自定義的觸發器,那麼Beam會在全部數據都收集到以後纔開始對數據進行處理。這一般只能適用於有限數據且對實時性要求不高的狀況。當輸入爲無限流數據,咱們能夠
1)設置合適的窗口大小(根據時間戳),在窗口末端進行數據處理;
2)設置觸發器,當條件知足時觸發,進行數據處理;
3)同時設置窗口和觸發器。
時間戳說明:Beam的數據都是保存在PCollection中。當讀入數據時,PCollection爲每一個元素都自動生成一個內置的時間戳,對於無限輸入,數據的時間戳不一樣。而對於有限輸入,因爲是同時讀入,全部的元素的時間戳都是同樣的,這時候分窗是沒有意義的(都在一個窗口)。而咱們能夠手動爲每一個元素設置時間戳,一般採用數據中已有的時間屬性(好比日誌中通常都會帶有事件時間)。能夠在DoFn中爲數據帶上時間戳,如:編程
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }
1)全局窗口
就是默認不分窗的狀況。
apply(Windows.<TYPE>into(new GlobalWindows()));
2)固定時間大小窗口
最多見的分窗方式,按照時間戳把數據處理窗口分爲固定長度。
apply(Windows.<TYPE>into(FixedWindows.of(Duration.standardMinutes(XX))))app
3)滑動窗口
須要設置2個參數,窗口大小和窗口產生週期。窗口之間有重疊,一般用於計算平均數的狀況(暫沒用過)ide
4)會話窗口
通常用於相同key數據聚合,同一個key的數據之間時間間隔較大的會被分到不一樣的窗口。ui
**spa
**
當使用用戶自定義的時間戳時,先處理的數據並不老是時間戳較小的,有可能出現時間戳小的數據在後面才產生的狀況。Beam一般會給窗口設定一個處理期限時間(圖中縱軸),當超過這個時間的數據被視爲超時數據,而這些期限時間的連線即水位線。日誌
系統會根據實際狀況進行預測生成水位線,在默認狀況下不對超時數據進行處理,而咱們能夠經過設置觸發器對超時數據進行額外處理。code
1)時間時間觸發器
根據時間戳進行觸發。blog
.triggering(AfterWatermark.pastEndOfWindow()//水位線到達時觸發一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位線以前,每次觸發後第一個數據來到以後的5分鐘時再觸發 .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位線以後,每次觸發後第一個數據來到以後的10分鐘時再觸發
以上分別對水位線上中下的3種數據進行不一樣的處理。須要注意的是withEarlyFirings和withLateFirings方法生成的觸發器是連續的而不是一次性的。
2)處理時間觸發器three
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,僅在第一個數據到達後的5分鐘時觸發一次。
3)數據驅動型觸發器
AfterPane.elementCountAtleast(XX)
當處理到XX個時觸發一次。須要注意的是當數據個數小於XX時永遠不會觸發數據處理。
4)混合觸發器
將多個觸發器混合起來,好比1)中的代碼就是3個觸發器混合。其餘的還有
①Repeatedly.forever(一次性觸發器)
將一次性觸發器變爲連續型觸發器,觸發後再次等待觸發。例如與AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一塊兒用能夠實現每一個數據到達後的5分鐘進行處理,常常用於全局窗口,能夠用orFinally(觸發器)來設置中止條件。
②AfterEach.inOrder(觸發器1,觸發器2...)
當觸發器1知足後等待觸發器2...知道全部觸發器知足後開始數據處理。
③AfterFirst(觸發器1,觸發器2..)和AfterAll(觸發器1,觸發器2..)
這2個分別爲或,與的邏輯。
④orFinally
見①
Accumulating Mode:
If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode:
If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]
.withAllowedLateness(Duration.XXXX(XXX))可設置容許超時多長時間的數據。