本系列每篇文章都是從一些實際的 case 出發,分析一些生產環境中常常會遇到的問題,拋磚引玉,以幫助小夥伴們解決一些實際問題。本文介紹 Flink 時間以及時區問題,分析了在天級別的窗口時會遇到的時區問題,若是對小夥伴有幫助的話,歡迎點贊 + 再看~
本文主要分爲兩部分:
第一部分(第 1 - 3 節)的分析主要針對 flink,分析了 flink 天級別窗口的中存在的時區問題以及解決方案。
第二部分(第 4 節)的分析能夠做爲全部時區問題的分析思路,主要以解決方案中的時區偏移量爲何是加 8 小時爲案例作了通用的深度解析。
爲了讓讀者能對本文探討的問題有一個大體瞭解,本文先給出問題 sql,以及解決方案。後文給出詳細的分析~
sql 很簡單,用來統計當天累計 uv。java
--------------- 僞代碼 --------------- INSERT INTO kafka_sink_tableSELECT -- 窗口開始時間 CAST( TUMBLE_START(proctime, INTERVAL '1' DAY) AS bigint ) AS window_start, -- 當前記錄處理的時間 cast(max(proctime) AS BIGINT) AS current_ts, -- 每一個桶內的 uv count(DISTINCT id) AS part_daily_full_uv FROM kafka_source_tableGROUP BY mod(id, bucket_number), -- bucket_number 爲常數,根據具體場景指定具體數值 TUMBLE(proctime, INTERVAL '1' DAY)--------------- 僞代碼 ---------------
你是否能一眼看出這個 sql 所存在的問題?(PS:數據源以及數據匯時區都爲東八區)
沒錯,天級別窗口所存在的時區問題,即這段代碼統計的不是樓主所在東八區一成天數據的 uv,這段代碼統計的一成天的範圍在東八區是第一天早 8 點至次日早 8 點。sql
樓主目前所處時區爲東八區,解決方案以下:框架
--------------- 僞代碼 --------------- CREATE VIEW view_table AS SELECT id, -- 經過注入時間解決 -- 加上東八區的時間偏移量,設置注入時間爲時間戳列 CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000 as ingest_time FROM source_table; INSERT INTO target_tableSELECT CAST( TUMBLE_START(ingest_time, INTERVAL '1' DAY) AS bigint ) AS window_start, cast(max(ingest_time) AS BIGINT) - 8 * 3600 * 1000 AS current_ts, count(DISTINCT id) AS part_daily_full_uv FROM view_tableGROUP BY mod(id, 1024), -- 根據注入時間劃分天級別窗口 TUMBLE(ingest_time, INTERVAL '1' DAY) --------------- 僞代碼 ---------------
經過上述方案,就能夠將統計的數據時間範圍調整爲東八區的今日 0 點至明日 0 點。下文詳細說明整個需求場景以及解決方案的實現和分析過程。學習
coming,需求場景比較簡單,就是消費上游的一個埋點日誌數據源,根據埋點中的 id 統計當天 0 點至當前時刻的累計 uv,按照分鐘級別產出到下游 OLAP 引擎中進行簡單的聚合,最後在 BI 看板進行展現,沒有任何維度字段(感動到哭😭)。ui
客戶端用戶行爲埋點日誌 -> logServer -> kafka -> flink(sql) -> kafka -> druid -> BI 看板。
實現方案以及具體的實現方式不少,此次使用的是 sql API。spa
source 和 sink 表 schema 以下(只保留關鍵字段):操作系統
--------------- 僞代碼 --------------- CREATE TABLE kafka_sink_table ( -- 天級別窗口開始時間 window_start BIGINT, -- 當前記錄處理的時間 current_ts BIGINT, -- 每一個桶內的 uv(處理過程對 id 進行了分桶) part_daily_full_uv BIGINT ) WITH ( -- ... ); CREATE TABLE kafka_source_table ( -- ... -- 須要進行 uv 計算的 id id BIGINT, -- 處理時間 proctime AS PROCTIME()) WITH ( -- ... ); --------------- 僞代碼 ---------------
--------------- 僞代碼 --------------- INSERT INTO kafka_sink_tableSELECT -- 窗口開始時間 CAST( TUMBLE_START(proctime, INTERVAL '1' DAY) AS bigint ) AS window_start, -- 當前記錄處理的時間 cast(max(proctime) AS BIGINT) AS current_ts, -- 每一個桶內的 uv count(DISTINCT id) AS part_daily_full_uv FROM kafka_source_tableGROUP BY mod(id, bucket_number), -- bucket_number 爲常數,根據具體場景指定具體數值 TUMBLE(proctime, INTERVAL '1' DAY)--------------- 僞代碼 ---------------
使用 early-fire 機制(同 DataStream API 中的 ContinuousProcessingTimeTrigger),並設定觸發間隔爲 60 s。
在上述實現 sql 中,咱們對 id 進行了分桶,那麼每分鐘輸出的數據條數即爲 bucket_number 條,最終在 druid 中按照分鐘粒度將全部桶的數據進行 sum 聚合,便可獲得從當天 0 點累計到當前分鐘的全量 uv。3d
激情場景還原:頭文字 ∩ 技術小哥哥:使用 sql,easy game,閒坐摸魚...unix
頭文字 ∩ 技術小哥哥:等到 00:00 時,發現指標還在不停地往上漲,難道是 sql 邏輯錯了,不該該啊,試過度鍾,小時級別窗口都木有這個問題日誌
頭文字 ∩ 技術小哥哥:摳頭ing,算了,稍後再分析這個問題吧,如今還有正事要幹😏
頭文字 ∩ 技術小哥哥:到了早上,瞅了一眼配置的時間序列報表,發如今 08:00 點的時候指標歸零,從新開始累計。想法一閃而過,東八區?(當時爲啥沒 format 下 sink 數據中的 window_start...)
flink 在使用時間的這個概念的時候是基於 java 時間紀元(即格林威治 1970/01/01 00:00:00,也即 Unix 時間戳爲 0)概念的,窗口對齊以及觸發也是基於 java 時間紀元。
能夠經過直接查看 sink 數據的 window_start 得出上述結論。
但爲了還原整個過程,咱們按照以下 source 和 sink 數據進行整個問題的復現:
source 數據以下:
id | proctime | proctime UTC + 0(格林威治) 格式化時間 | proctime UTC + 8(北京) 格式化時間 |
---|---|---|---|
1 | 1599091140000 | 2020/09/02 23:59:00 | 2020/09/03 07:59:00 |
2 | 1599091140000 | 2020/09/02 23:59:00 | 2020/09/03 07:59:00 |
3 | 1599091140000 | 2020/09/02 23:59:00 | 2020/09/03 07:59:00 |
1 | 1599091200000 | 2020/09/03 00:00:00 | 2020/09/03 08:00:00 |
2 | 1599091200000 | 2020/09/03 00:00:00 | 2020/09/03 08:00:00 |
3 | 1599091260000 | 2020/09/03 00:01:00 | 2020/09/03 08:01:00 |
sink 數據(爲了方便理解,直接按照 druid 聚合以後的數據展現):
window_start | current_ts | part_daily_full_uv | window_start UTC + 8(北京) 格式化時間 | current_ts UTC + 8(北京) 格式化時間 |
---|---|---|---|---|
1599004800000 | 1599091140000 | 3 | 2020/09/02 08:00:00 | 2020/09/03 07:59:00 |
1599091200000 | 1599091200000 | 2 | 2020/09/03 08:00:00 | 2020/09/03 08:00:00 |
1599091200000 | 1599091260000 | 3 | 2020/09/03 08:00:00 | 2020/09/03 08:01:00 |
從上述數據能夠發現,天級別窗口開始時間在 UTC + 8(北京)的時區是天天早上 8 點,即 UTC + 0(格林威治)的凌晨 0 點。
下文先給出解決方案,而後詳細解析各個時間以及時區概念~
--------------- 僞代碼 --------------- CREATE VIEW view_table AS SELECT id, -- 經過注入時間解決 -- 加上東八區的時間偏移量,設置注入時間爲時間戳列 CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000 as ingest_time FROM source_table; INSERT INTO target_tableSELECT CAST( TUMBLE_START(ingest_time, INTERVAL '1' DAY) AS bigint ) AS window_start, cast(max(ingest_time) AS BIGINT) - 8 * 3600 * 1000 AS current_ts, count(DISTINCT id) AS part_daily_full_uv FROM view_tableGROUP BY mod(id, 1024), -- 根據注入時間劃分天級別窗口 TUMBLE(ingest_time, INTERVAL '1' DAY) --------------- 僞代碼 ---------------
我目前所屬的時區是東八區(北京時間),經過上述 sql,設置注入時間,並對注入時間加上 8 小時的偏移量進行天級別窗口的劃分,就能夠對此問題進行解決(也能夠在 create table 時,在 schema 中根據計算列添加對應的注入時間戳進行解決)。若是你在 sql 層面有更好的解決方案,歡迎討論~
Notes:
- 東 n 區的解決方案就是時間戳 +n 3600 秒的偏移量,西 n 區的解決方案就是時間戳 -n 3600 秒的偏移量
- DataStream API 存在相同的天級別窗口時區問題
這裏提出一個問題,爲何東八區是須要在時間戳上加 8 小時偏移量進行天級別窗口計算,而不是減 8 小時或是加上 32(24 + 8) 小時,小夥伴們有詳細分析過嘛~
根據上述問題,引出本文的第二大部分,即深度解析時區偏移量問題,這部分能夠做爲全部時區問題的分析思路。
時區:因爲世界各國家與地區經度不一樣,地方時也有所不一樣,所以會劃分爲不一樣的時區。
Unix 時間戳(Unix timestamp)"): Unix 時間戳(Unix timestamp),或稱 Unix 時間(Unix time)、POSIX 時間(POSIX time),是一種時間表示方式,定義爲從格林威治時間 1970 年 01 月 01 日 00 時 00 分 00 秒(UTC/GMT的午夜)起至如今的總秒數。
Unix 時間戳不只被使用在 Unix 系統、類 Unix 系統中,也在許多其餘操做系統中被普遍採用。
GMT:Greenwich Mean Time 格林威治標準時間。這是以英國格林威治天文臺觀測結果得出的時間,這是英國格林威治當地時間,這個地方的當地時間過去被當成世界標準的時間。
UT:Universal Time 世界時。根據原子鐘計算出來的時間。
UTC:Coordinated Universal Time 協調世界時。由於地球自轉愈來愈慢,每一年都會比前一年多出零點幾秒,每隔幾年協調世界時組織都會給世界時 +1 秒,讓基於原子鐘的世界時和基於天文學(人類感知)的格林威治標準時間相差不至於太大。並將獲得的時間稱爲 UTC,這是如今使用的世界標準時間。
協調世界時不與任何地區位置相關,也不表明此刻某地的時間,因此在說明某地時間時要加上時區也就是說 GMT 並不等於 UTC,而是等於 UTC + 0,只是格林威治恰好在 0 時區上。
當時看完這一系列的時間以及時區說明以後我大腦實際上是一片空白。...ojbk...,我用本身如今的一些理解,嘗試將上述全部涉及到時間的概念解釋一下。
可是因爲沒有時區劃分,這兩個同窗看到的時間都是 0 點,所以這是不符合人類對感知到的時間和本身看到的時間的理解的。因此劃分時區以後,能夠知足北京(東八區 UTC + 8)同窗看到的時間是上午 8 點,加拿大(西四區 UTC - 4)同窗看到的時間是下午 8 點。注意時區的劃分是和 UTC 綁定的。東八區即 UTC + 8。
概念關係如圖所示:
下述表格只對一些重要的時間進行了標註:
拿第一條數據解釋下,其表明在北京時間 1970/01/01 00:00:00 時,生成的一條數據所攜帶的 Unix 時間戳爲 -8 * 3600。
根據需求和上圖和上述表格內容,咱們能夠獲得以下推導過程:
Notes:
- 能夠加 32 小時嗎?答案是能夠。在東八區,對於天級別窗口的劃分,加 8 小時和加 8 + n * 24(其中 n 爲整數)小時後進行的天級別窗口劃分和計算的效果是同樣的,flink 都會將東八區的整一天內的數據劃分到一個天級別窗口內。因此加 32(8 + 24),56(8 + 48),-16(8 - 24)小時效果都相同,上述例子只是選擇了時間軸平移最小的距離,即 8 小時。注意某些系統的 Unix 時間戳爲負值時會出現異常。
- 此推理過程適用於全部遇到時區問題的場景,若是你也有其餘應用場景有這個問題,也能夠按照上述方式解決
求輸入 Unix 時間戳對應的東八區天天 0 點的 Unix 時間戳。
public static final long ONE_DAY_MILLS = 24 * 60 * 60 * 1000L; public static long transform(long timestamp) { return timestamp - (timestamp + 8 * 60 * 60 * 1000) % ONE_DAY_MILLS;}
本文首先介紹了直接給出了咱們的問題 sql 和解決方案。
第二節從需求場景以及整個數據鏈路的實現方案出發,解釋了咱們怎樣使用 flink sql 進行了需求實現,並進而引出了 sql 中天級別窗口存在的時區問題。
第三節確認了天級別窗口時區問題緣由,引出了 flink 使用了 java 時間紀元,並針對此問題給出了引擎層面和 sql 層面的解決方案。也進而提出了一個問題:爲何咱們的解決方案是加 8 小時偏移量?
第四節針對加 8 小時偏移量的緣由進行了分析,並詳細闡述了時區,UTC,GMT,Unix 時間戳之間的關係。
最後一節對本文進行了總結。
若是你有更方便的時區偏移量理解方式,歡迎留言~