Flink 的 API 大致上能夠劃分爲三個層次:處於最底層的 ProcessFunction、中間一層的 DataStream API 和最上層的 SQL/Table API,這三層中的每一層都很是依賴於時間屬性。時間屬性是流處理中最重要的一個方面,是流處理系統的基石之一,貫穿這三層 API。在 DataStream API 這一層中由於封裝方面的緣由,咱們可以接觸到時間的地方不是不少,因此咱們將重點放在底層的 ProcessFunction 和最上層的 SQL/Table API。數組
在不一樣的應用場景中時間語義是各不相同的,Flink 做爲一個先進的分佈式流處理引擎,它自己支持不一樣的時間語義。其核心是 Processing Time 和 Event Time(Row Time),這兩類時間主要的不一樣點以下表所示:緩存
Processing Time 是來模擬咱們真實世界的時間,其實就算是處理數據的節點本地時間也不必定就是完徹底全的咱們真實世界的時間,因此說它是用來模擬真實世界的時間。而 Event Time 是數據世界的時間,就是咱們要處理的數據流世界裏面的時間。關於他們的獲取方式,Process Time 是經過直接去調用本地機器的時間,而 Event Time 則是根據每一條處理記錄所攜帶的時間戳來斷定。網絡
這兩種時間在 Flink 內部的處理以及仍是用戶的實際使用方面,難易程度都是不一樣的。相對而言的 Processing Time 處理起來更加的簡單,而 Event Time 要更麻煩一些。而在使用 Processing Time 的時候,咱們獲得的處理結果(或者說流處理應用的內部狀態)是不肯定的。而由於在 Flink 內部對 Event Time 作了各類保障,使用 Event Time 的狀況下,不管重放數據多少次,都能獲得一個相對肯定可重現的結果。分佈式
所以在判斷應該使用 Processing Time 仍是 Event Time 的時候,能夠遵循一個原則:當你的應用遇到某些問題要從上一個 checkpoint 或者 savepoint 進行重放,是否是但願結果徹底相同。若是但願結果徹底相同,就只能用 Event Time;若是接受結果不一樣,則能夠用 Processing Time。Processing Time 的一個常見的用途是,咱們要根據現實時間來統計整個系統的吞吐,好比要計算現實時間一個小時處理了多少條數據,這種狀況只能使用 Processing Time。性能
時間的一個重要特性是:時間只能遞增,不會來回穿越。 在使用時間的時候咱們要充分利用這個特性。假設咱們有這麼一些記錄,而後咱們來分別看一下 Processing Time 還有 Event Time 對於時間的處理。優化
若是單條數據之間是亂序,咱們就考慮對於整個序列進行更大程度的離散化。簡單地講,就是把數據按照必定的條數組成一些小批次,但這裏的小批次並非攢夠多少條就要去處理,而是爲了對他們進行時間上的劃分。通過這種更高層次的離散化以後,咱們會發現最右邊方框裏的時間就是必定會小於中間方框裏的時間,中間框裏的時間也必定會小於最左邊方框裏的時間。url
這個時候咱們在整個時間序列裏插入一些相似於標誌位的一些特殊的處理數據,這些特殊的處理數據叫作 watermark。一個 watermark 本質上就表明了這個 watermark 所包含的 timestamp 數值,表示之後到來的數據已經再也沒有小於或等於這個時間的了。spa
接下來咱們重點看一下 Event Time 裏的 Record Timestamp(簡寫成 timestamp)和 watermark 的一些基本信息。絕大多數的分佈式流計算引擎對於數據都是進行了 DAG 圖的抽象,它有本身的數據源,有處理算子,還有一些數據匯。數據在不一樣的邏輯算子之間進行流動。watermark 和 timestamp 有本身的生命週期,接下來我會從 watermark 和 timestamp 的產生、他們在不一樣的節點之間的傳播、以及在每個節點上的處理,這三個方面來展開介紹。設計
Flink 支持兩種 watermark 生成方式。第一種是在 SourceFunction 中產生,至關於把整個的 timestamp 分配和 watermark 生成的邏輯放在流處理應用的源頭。咱們能夠在 SourceFunction 裏面經過這兩個方法產生 watermark:3d
整體上而言生成器能夠分爲兩類:第一類是按期生成器;第二類是根據一些在流處理數據流中遇到的一些特殊記錄生成的。
二者的區別主要有三個方面,首先按期生成是現實時間驅動的,這裏的「按期生成」主要是指 watermark(由於 timestamp 是每一條數據都須要有的),即按期會調用生成邏輯去產生一個 watermark。而根據特殊記錄生成是數據驅動的,便是否生成 watermark 不是由現實時間來決定,而是當看到一些特殊的記錄就表示接下來可能不會有符合條件的數據再發過來了,這個時候至關於每一次分配 Timestamp 以後都會調用用戶實現的 watermark 生成方法,用戶須要在生成方法中去實現 watermark 的生成邏輯。
你們要注意的是就是咱們在分配 timestamp 和生成 watermark 的過程,雖然在 SourceFunction 和 DataStream 中均可以指定,可是仍是建議生成的工做越靠近 DataSource 越好。這樣會方便讓程序邏輯裏面更多的 operator 去判斷某些數據是否亂序。Flink 內部提供了很好的機制去保證這些 timestamp 和 watermark 被正確地傳遞到下游的節點。
具體的傳播策略基本上遵循這三點。
舉個例子,假設這邊藍色的塊表明一個算子的一個任務,而後它有三個輸入,分別是 W一、W二、W3,這三個輸入能夠理解成任何的輸入,這三個輸入多是屬於同一個流,也多是屬於不一樣的流。而後在計算 watermark 的時候,對於單個輸入而言是取他們的最大值,由於咱們都知道 watermark 應該遵循一個單調遞增的一個原則。對於多輸入,它要統計整個算子任務的 watermark 時,就會取這三個計算出來的 watermark 的最小值。即一個多個輸入的任務,它的 watermark 受制於最慢的那條輸入流。這一點相似於木桶效應,整個木桶中裝的水會就是受制於最矮的那塊板。
watermark 在傳播的時候有一個特色是,它的傳播是冪等的。屢次收到相同的 watermark,甚至收到以前的 watermark 都不會對最後的數值產生影響,由於對於單個輸入永遠是取最大的,而對於整個任務永遠是取一個最小的。
同時咱們能夠注意到這種設計其實有一個侷限,具體體如今它沒有區分你這個輸入是一條流多個 partition 仍是來自於不一樣的邏輯上的流的 JOIN。對於同一個流的不一樣 partition,咱們對他作這種強制的時鐘同步是沒有問題的,由於一開始就是把一條流拆散成不一樣的部分,但每個部分之間共享相同的時鐘。可是若是算子的任務是在作相似於 JOIN 操做,那麼要求你兩個輸入的時鐘強制同步其實沒有什麼道理的,由於徹底有多是把一條離如今時間很近的數據流和一個離當前時間很遠的數據流進行 JOIN,這個時候對於快的那條流,由於它要等慢的那條流,因此說它可能就要在狀態中去緩存很是多的數據,這對於整個集羣來講是一個很大的性能開銷。
在正式介紹 watermark 的處理以前,先簡單介紹 ProcessFunction,由於 watermark 在任務裏的處理邏輯分爲內部邏輯和外部邏輯。外部邏輯其實就是經過 ProcessFunction 來體現的,若是你須要使用 Flink 提供的時間相關的 API 的話就只能寫在 ProcessFunction 裏。
ProcessFunction 和時間相關的功能主要有三點:
一個簡單的應用是,咱們在作一些時間相關的處理的時候,可能須要緩存一部分數據,但這些數據不能一直去緩存下去,因此須要有一些過時的機制,咱們能夠經過 timer 去設定這麼一個時間,指定某一些數據可能在未來的某一個時間點過時,從而把它從狀態裏刪除掉。全部的這些和時間相關的邏輯在 Flink 內部都是由本身的 Time Service(時間服務)完成的。
一個算子的實例在收到 watermark 的時候,首先要更新當前的算子時間,這樣的話在 ProcessFunction 裏方法查詢這個算子時間的時候,就能獲取到最新的時間。第二步它會遍歷計時器隊列,這個計時器隊列就是咱們剛剛說到的 timer,你能夠同時註冊不少 timer,Flink 會把這些 Timer 按照觸發時間放到一個優先隊列中。第三步 Flink 獲得一個時間以後就會遍歷計時器的隊列,而後逐一觸發用戶的回調邏輯。 經過這種方式,Flink 的某一個任務就會將當前的 watermark 發送到下游的其餘任務實例上,從而完成整個 watermark 的傳播,從而造成一個閉環。
下面咱們來看一看 Table/SQL API 中的時間。爲了讓時間參與到 Table/SQL 這一層的運算中,咱們須要提早把時間屬性放到表的 schema 中,這樣的話咱們纔可以在 SQL 語句或者 Table 的一些邏輯表達式裏面去使用這些時間去完成需求。
其實以前社區就怎麼在 Table/SQL 中去使用時間這個問題作過必定的討論,是把獲取當前 Processing Time 的方法是做爲一個特殊的 UDF,仍是把這一個列物化到整個的 schema 裏面,最終採用了後者。咱們這裏就分開來說一講 Processing Time 和 Event Time 在使用的時候怎麼在 Table 中指定。
對於 Processing Time,咱們知道要獲得一個 Table 對象(或者註冊一個 Table)有兩種手段:
(1)能夠從一個 DataStream 轉化成一個 Table;
(2)直接經過 TableSource 去生成這麼一個 Table;
對於第一種方法而言,咱們只須要在你已有的這些列中(例子中 f1 和 f2 就是兩個已有的列),在最後用「列名.proctime」這種寫法就能夠把最後的這一列註冊爲一個 Processing Time,之後在寫查詢的時候就能夠去直接使用這一列。若是 Table 是經過 TableSource 生成的,就能夠經過實現這一個 DefinedRowtimeAttributes 接口,而後就會自動根據你提供的邏輯去生成對應的 Processing Time。
相對而言,在使用 Event Time 時則有一個限制,由於 Event Time 不像 Processing Time 那樣是隨拿隨用。若是你要從 DataStream 去轉化獲得一個 Table,必需要提早保證原始的 DataStream 裏面已經存在了 Record Timestamp 和 watermark。若是你想經過 TableSource 生成的,也必定要保證你要接入的一個數據裏面存在一個類型爲 long 或者 timestamp 的這麼一個時間字段。
具體來講,若是你要從 DataStream 去註冊一個表,和 proctime 相似,你只須要加上「列名.rowtime」就能夠。須要注意的是,若是你要用 Processing Time,必須保證你要新加的字段是整個 schema 中的最後一個字段,而 Event Time 的時候你其實能夠去替換某一個已有的列,而後 Flink 會自動的把這一列轉化成須要的 rowtime 這個類型。 若是是經過 TableSource 生成的,只須要實現 DefinedRowtimeAttributes 接口就能夠了。須要說明的一點是,在 DataStream API 這一側其實不支持同時存在多個 Event Time(rowtime),可是在 Table 這一層理論上能夠同時存在多個 rowtime。由於 DefinedRowtimeAttributes 接口的返回值是一個對於 rowtime 描述的 List,即其實能夠同時存在多個 rowtime 列,在未來可能會進行一些其餘的改進,或者基於去作一些相應的優化。
指定完了時間列以後,當咱們要真正去查詢時就會涉及到一些具體的操做。這裏我列舉的這些操做都是和時間列緊密相關,或者說必須在這個時間列上才能進行的。好比說「Over 窗口聚合」和「Group by 窗口聚合」這兩種窗口聚合,在寫 SQL 提供參數的時候只能容許你在這個時間列上進行這種聚合。第三個就是時間窗口聚合,你在寫條件的時候只支持對應的時間列。最後就是排序,咱們知道在一個無盡的數據流上對數據作排序幾乎是不可能的事情,但由於這個數據自己到來的順序已是按照時間屬性來進行排序,因此說咱們若是要對一個 DataStream 轉化成 Table 進行排序的話,你只能是按照時間列進行排序,固然同時你也能夠指定一些其餘的列,可是時間列這個是必須的,而且必須放在第一位。
爲何說這些操做只能在時間列上進行?由於咱們有的時候能夠把到來的數據流就當作是一張按照時間排列好的一張表,而咱們任何對於表的操做,其實都是必須在對它進行一次順序掃描的前提下完成的。由於你們都知道數據流的特性之一就是一過性,某一條數據處理過去以後,未來其實不太好去訪問它。固然由於 Flink 中內部提供了一些狀態機制,咱們能夠在必定程度上去弱化這個特性,可是最終仍是不能超越的限制狀態不能太大。全部這些操做爲何只能在時間列上進行,由於這個時間列可以保證咱們內部產生的狀態不會無限的增加下去,這是一個最終的前提。
本文視頻回顧講解內容更生動易理解,查看視頻請點擊:https://ververica.cn/developers/flink-training-course2/
本文爲雲棲社區原創內容,未經容許不得轉載。