應用案例 | Blink 有何特別之處?菜鳥供應鏈場景最佳實踐

本文受權轉自阿里技術官方公衆號(ali_tech):菜鳥供應鏈業務鏈路長、節點多、實體多,使得技術團隊在建設供應鏈實時數倉的過程當中,面臨着諸多挑戰,如:如何實現實時變Key統計?如何實現實時超時統計?如何進行有效地資源優化?如何提高多實時流關聯效率?如何提高實時做業的開發效率? 而 Blink 可否解決這些問題?下面一塊兒來深刻了解。git

背景

菜鳥從2017年4月開始探索 Blink(即 Apache Flink 的阿里內部版本),2017年7月開始在線上環境使用 Blink,做爲咱們的主流實時計算引擎。github

爲何短短几個月的探索以後,咱們就選擇Blink做爲咱們主要的實時計算引擎呢?算法

在效率上,Blink 提供 DataStreamTableAPI、SQL 三種開發模式,強大的 SQL 模式已經知足大部分業務場景,配合半智能資源優化、智能傾斜優化、智能做業壓測等功能,能夠極大地提高實時做業的開發效率;在性能上,諸如 MiniBatch&MicroBatch、維表 Async&Cache、利用 Niagara 進行本地狀態管理等內部優化方案,能夠極大地提高實時做業的性能;在保障上,Blink 自帶的 Failover 恢復機制,可以實現線程級的恢復,能夠作到分鐘級恢復,配合 Kmonitor 監控平臺、烽火臺預警平臺,能夠有效地實現實時做業的數據保障。數據庫

接下來,我將結合供應鏈業務的一些業務場景,簡要說明,Blink 如何解決咱們遇到的一些實際問題。緩存

回撤機制

訂單履行是供應鏈業務中最多見的物流場景。什麼是訂單履行呢?當商家 ERP 推單給菜鳥以後,菜鳥履行系統會實時計算出每筆訂單的出庫、攬收、簽收等節點的預計時間,配送公司須要按照各節點的預計時間進行訂單的配送。爲了保證訂單的準點履約,咱們常常須要統計每家配送公司天天各個節點的預計單量,便於配送公司提早準備產能。bash

看似很簡單的實時統計加工,咱們在開發過程當中遇到了什麼問題呢?履行重算!當物流訂單的上游某個節點延遲時,履行系統會自動重算該筆訂單下游全部節點的預計時間。好比某個物流訂單出庫晚點後,其後的預計攬收時間、預計簽收時間都會重算。而對於大部分的實時計算引擎來講,並不能很友好的支持這種變 Key 統計的問題。之前,數據量沒那麼大的時候,還能夠經過 OLAP 數據庫來解決這類場景,當量上來後, OLAP 方案的成本、性能都是很大的問題。網絡

除了 OLAP 方案,咱們提倡採用 Blink 已經內置的 Retraction 機制,來解決這類變 Key 統計的問題,這也是咱們在2017年初就開始嘗試 Blink 的重要緣由。Blink 的 Retraction 機制,使用 State 在內存或者外部存儲設備中對數據進行統計處理,當上遊數據源對某些彙總 Key 的數據作更新時,Blink 會主動給下游下發一個刪除消息從而「撤回」以前的那條消息,並用最新下發的消息對錶作更新操做。併發

下面是一個簡化後的案例,供瞭解 Blink Retraction 的內部計算過程:框架

對於上述案例,能夠經過 Blink 提供的強大的、靈活的、簡易的 SQL 開發模式來實現,只須要幾行 SQL 便可完成。異步

select   plan_tms_sign_time
       ,sum(1) as plan_tms_sign_lgtord_cnt
from
       (select   lg_order_code
                ,last_value(plan_tms_sign_time) as plan_tms_sign_time
        from     dwd_csn_whc_lgt_fl_ord_ri
        group by lg_order_code
        ) ss
group by plan_tms_sign_time
;
複製代碼

維表關聯

供應鏈業務的實體角色很是多(倉、配、分撥、站點、小件員、貨主、行業、地區等),實體繁多,這意味着咱們在建設實時明細中間層的時候,會使用大量的維表關聯,這對 Blink 在維表關聯的性能上提出了更高的要求——如何提高大量的大小維表的關聯性能?Blink 歷來沒讓用戶失望,Blink SQL 模式在維表關聯的性能上,也作了大量的優化:

優化1:Async IO,有一些實時計算引擎,維表關聯是採用同步訪問的方式,即來一條數據,去數據庫查詢一次,等待返回後輸出關聯結果。這種方式,能夠發現網絡等待時間極大地阻礙了吞吐和延遲。而 Blink 採用了異步訪問的模式,能夠併發地處理多個請求和回覆,從而連續地請求之間不須要阻塞等待,吞吐量大大提高。

優化2:緩存,維表關聯涉及到大量的維表查詢請求,其中可能存在大量相同 Key 的重複請求。Blink SQL 模式提供了緩存的機制,並提供 LRU 和 ALLCache 兩種緩存方案。

用戶能夠經過配置 Cache='LRU' 參數,開啓 LRU 緩存優化。開啓後,Blink 會爲每一個 JoinTable 節點建立一個 LRU 本地緩存。當每一個查詢進來的時候,先去緩存中查詢,若是存在則直接關聯輸出,減小了一次 IO 請求。若是不存在,再發起數據庫查詢請求,請求返回的結果會先存入緩存中以備下次查詢。

若是維表數據不大,用戶能夠經過配置 Cache='ALL' 參數,對維表進行全量緩存。這樣,全部對該維表的查詢操做,都會直接走本地緩存模式,幾乎沒有 IO,關聯的性能很是好。

優化3:緩存無效 Key,若是維表很大,沒法採用 ALLCache 的方案,而在使用 LRU 緩存時,會存在很多維表中不存在的 Key 。因爲命中不了緩存,致使緩存的收益較低,仍然會有大量請求發送到數據庫,而且 LRU 模式下緩存裏的 key 不會永久保留,能夠經過調整參數,設置保留時間。

優化4:Distribute By 提升緩存命中率,默認狀況下,維表關聯的節點與上游節點之間是 Chain 在一塊兒,不通過網絡。這在緩存大小有限、Key 總量大、熱點不明顯的狀況下, 緩存的收益可能較低。這種狀況下能夠將上游節點與維表關聯節點的數據傳輸改爲按 Key 分區。這樣一般能夠縮小單個節點的 Key 個數,提升緩存的命中率。

除了上述幾點優化,Blink SQL 模式還在嘗試引入 SideInput、Partitioned ALL Cache 等優化方案,相信在隨後開源的 Blink 版本中,維表關聯的性能會愈來愈好。

下面是一張來自 Flink Committer 雲邪 異步查詢的流程圖,供理解與同步請求的差別。

數據傾斜

無數據不傾斜,咱們在實時數倉建設過程當中,也固然會遇到數據傾斜問題。在統計賣家的單量時,有些賣家單量大,有些賣家單量小,單量超大的賣家,就會產生數據傾斜;在統計行業的單量時,有些行業單量大,有些行業單量小,單量超大的行業,就會產生數據傾斜;在統計貨品的庫存流水狀況時,有些貨品庫存流水頻繁,一些貨品庫存流水較少,庫存流水超頻繁的貨品就會產生數據傾斜……

咱們應該如何處理數據傾斜問題呢?以統計賣家的單量爲例,之前咱們會先把訂單這個 Key 做 Hash,先針對 Hash 以後的值作一次去重的聚合操做,再在此基礎上,再作一次針對原 Key 去重的聚合操做。兩次相似的聚合操做,致使代碼寫起來比較複雜,體力勞動比較多。

2017年,咱們的實時數據開始全面切換到 Blink 上,Blink 在數據傾斜這塊,又給咱們提供了什麼的方案呢?Blink 給出的答案是:MiniBatch/MicroBatch+LocalGlobal+PartialFinal。

MiniBatch/MicroBatch,能夠實現微批處理,進而減小對 State 的訪問,提高吞吐。由於微批處理會致使必定的延遲,最好結合 Blink 提供的容許延遲的相關參數來使用。

LocalGlobal,分爲 Local 和 Global 兩個階段,有點相似 MapReduce 中的 Combine 和 Reduce 兩個階段。LocalGlobal 能夠很好地處理非去重類的聚合操做,但對 Count Distinct 的優化效果通常,由於在 Local 階段,可能 Distinct Key 的去重率並不會很高,進而致使後續的 Global 階段,仍然會有熱點。

PartialFinal,能夠很好地解決 Count Distinct 帶來的數據傾斜問題。PartialFinal 能夠將 Distinct Key 自動打散,先聚合一次,在此基礎上,再聚合一次,從而實現打散熱點的做用。PartialFinal 跟手動 Hash 再聚合兩次的效果一致,經過 Blink 提供的 PartialFinal 參數,能夠自動實現,再也不須要人爲手工編寫 Hash 再聚合兩次的代碼。

由上能夠看出,Blink 在數據傾斜的處理上,已經實現了自動化,之前人爲編寫的打散熱點方案,如今幾個參數就能所有搞定,大大提高了代碼的編寫效率。

下面是相關參數,用戶能夠直接在 Blink 的做業參數中進行配置。

# miniBatch/microBatch攢批的間隔時間
blink.miniBatch.allowLatencyMs=5000
blink.microBatch.allowLatencyMs=5000
# 防止OOM,每一個批次最多緩存多少條數據
blink.miniBatch.size=20000

# 開啓LocalGlobal
blink.localAgg.enabled=true
# 開啓PartialFinal
blink.partialAgg.enabled=true
複製代碼

超時統計

上架是倉儲業務的重要組成部分。上架,顧名思義,就是要把到倉的貨品,上到倉庫的存儲貨架上。上架通常分爲採購上架、銷退上架、調撥上架等。及時上架是對倉庫的重要考覈項之一,不管哪種類型的上架,咱們常常須要針對到貨後超過 x 小時未上架的訂單進行預警。

可是,Blink 的計算是消息機制,須要上游發送消息才能觸發下游計算,而上述的場景中,未上架就說明不會有上架的消息流入 Blink,進而沒法完成下游的計算。

對於這種實時超時統計的問題,應該如何來解呢?咱們嘗試了幾種方案,供參考:

方案1:針對部分 Source Connector,Blink 提供了"延時下發"的功能,用戶能夠經過指定 DataDeliveryDelayMs 參數,實現消息延遲下發。正常的消息正常流入,正常消息也能夠經過配置該參數,使其按照本身的需求延時流入。這樣,經過正常流入的消息關聯延時流入的消息,能夠觸發 Blink 在消息正常流入時計算一次,在延時消息流入時再觸發計算一次。這種方案,能夠實現咱們的業務需求,可是這種方案會把全部消息從新發送一遍,而不只僅是到貨後超過x小時未上架的消息,這樣會形成計算資源的浪費,咱們不建議在數據量很大的場景下使用該方案。

方案2:若是有第三方的消息中間件,而這個消息中間件又能支持配置超時下發的規則,這將是一個比較好的方案。據瞭解,Kafka 的最新版本已經可以根據業務需求,配置消息超時下發的規則。咱們只須要在 Blink 中,經過正常流入的消息流關聯關鍵Kafka 超時下發的消息流,就能夠觸發 Blink 進行超時消息的統計。這樣,除了Blink,咱們須要同時保障 Kafka 的穩定性。Kafka 的超時消息訂閱,能夠參見:[1]。

方案3:咱們可以很天然的想到 CEP,而 Blink 也已經提供了 CEP 的功能,且已經 SQL 化。用戶能夠經過 Blink CEP 完成上述業務需求的統計。在實操過程當中,咱們發現,經過 Blink CEP 統計的結果,每每與真實結果(明細彙總統計)有必定的出入。什麼緣由呢?原來到貨時間,被回傳了屢次,有可能開始回傳的是9點,可是後面發現回傳錯了,改爲了8點,而 CEP 的 Watermark 是全局地向前走的,對於這種場景,沒法很好的適配。

方案4Flink 的 ProcessFunction,是一個 Low-Level 的流處理操做。經過改寫其中的 ProcessElement 方法,能夠告訴 Blink 的 State 裏面存什麼,以及如何更新 State;經過改寫 OnTimer 方法,能夠告訴 State 什麼時候下發超時消息。經過對上述幾種方案的原理對比及性能壓測,咱們最終選擇的也是這套方案。因爲超時場景,在供應鏈業務中很是常見,咱們已經將該方案沉澱下來,一樣的場景,經過 1min 配置下相關參數,便可完成相似場景超時消息的下發。

下面是方案4簡化後的實現框架圖,供瞭解相關實現及優點。

零點起跳

每次大促,大屏上零點時刻雙十一的零點時刻一直是你們關注的焦點,爲了在零點一過就讓各項指標儘快在大屏上展示出來,咱們進行了一些端到端的優化,供參考。

優化1:合理調整 Blink 讀取上游消息源的 FlushInterval 。咱們知道 Blink 是以 Block 的形式傳輸數據,若是 Block 一直積攢不滿,Block 可能一直等待沒法下發。這種狀況,咱們能夠經過調整 FlushInterval 參數,直接控制多長時間往下游 sink 一次。這樣,Block 積滿或間隔達到知足其中一個條件,Block 就會往下流。

優化2:合理調整 MiniBatch/MicroBatch的size 和 AllowLatency 參數。前文提到,MiniBatch/MicroBatch 是微批處理模式,都會帶來必定的延遲,能夠經過合理控制 Size 和 AllowLatency 參數,來控制該模式帶來的延遲。與優化1同樣,二者知足其一,就會往下繼續執行。

優化3:合理控制寫 Checkpoint 的方式以及 Checkpoint 的大小。利用 Checkpoint 實現 Exactly Once 的容錯方式一直是 Flink 做爲流引擎的一個亮點。可是過於複雜的運算和網絡環境有可能致使 Checkpoint 的對齊時間過長,從而致使整個 Job 的延遲變長。同時,Exactly Once 模式下作 Checkpoint 的時間間隔與整個任務中數據流的延遲也是一個 Trade Off。所以咱們在處理特別複雜的 Job 時也將這個因素考慮了進去,並無使用默認的 Exactly Once 方式,而是依舊實際需求採用了 At Least Once 。同時,將 Checkpoint 的週期設置爲了60s,儘量的保證了任務在延遲較小的狀況下,在 Failover 的情形下仍然能作到快速恢復。

優化4:除了 Blink 端,在數據服務端,大屏上的實時數據,咱們建議採用查詢性能優異的 Hbase 做爲存儲引擎,能夠保證零點一過,三秒內便能實現大屏數據的跳動。

……

將來展望

Blink 在不斷快速地發展,不只僅是流處理,當前也開始支持批處理,用戶只須要寫一套代碼就能夠同時實現批和流的數據開發,當前在日誌型的數據場景上,咱們也正在探索利用 Blink 直接實現批流混合模式;不只僅是半智能資源調優,當前開始內測智能資源調優,Blink 能夠根據吞吐量、算子複雜度等因素,對線上做業的資源配置進行全智能自適應調優,不再用在大促前手動更改資源配置;不只僅是 Java,更指望有 Python 等多語言生態,來描述計算邏輯,相信開發效率又會上一個新的臺階;不只僅是 ETL,更指望有更廣闊的大數據算法集成,能夠實現複雜的大數據AI場景……將來已來,咱們相信,Blink 已經作好了迎接將來的準備。

參考資料: [1]ketao1989.github.io/2016/01/02/…

相關文章
相關標籤/搜索