Flink SQL 功能解密系列 —— 流計算「撤回(Retraction)」案例分析

摘要: 通俗講retract就是傳統數據裏面的更新操做,也就是說retract是流式計算場景下對數據更新的處理方式。ui

什麼是retraction(撤回)

通俗講retract就是傳統數據裏面的更新操做,也就是說retract是流式計算場景下對數據更新的處理
方式。
首先來看下流場景下的一個詞頻統計列子。google

image.png

沒有retract會致使最終結果不正確↑:spa

image.png
retract發揮的做用code

下面再分享兩個雙十一期間retract保證數據正確性的業務case:blog

case1: 菜鳥物流訂單統計

同一個訂單的商品在運輸過程當中,由於各類緣由,物流公司是有可能從A變成B的。爲了統計物流公司承擔的訂單數目,菜鳥團隊使用blink計算的retraction機制進行變key彙總操做。排序

-- TT source_table 數據以下:
order_id      tms_company
0001           中通
0002           中通
0003           圓通

-- SQL代碼
create view dwd_table as 
select
    order_id,
    StringLast(tms_company)
from source_table
group by order_id;

create view dws_table as 
select 
    tms_company,
    count(distinct order_id) as order_cnt
from dwd_table 
group by tms_company


此時結果爲:
tms_company  order_cnt
中通          2
圓通          1

-----------------------
以後又來了一條新數據 0001的訂單 配送公司改爲 圓通了。這時,第一層group by的會先向下游發送一條 (0001,中通)的撤回消息,第二層group by節點收到撤回消息後,會將這個節點 中通對應的 value減小1,並更新到結果表中;而後第一層的分桶統計邏輯向下遊正常發送(0001,圓通)的正向消息,更新了圓通物流對應的訂單數目,達到了最初的彙總目的。

order_id      tms_company
0001           中通
0002           中通
0003           圓通
0001           圓通

寫入ADS結果會是(知足需求)
tms_company  order_cnt
中通          1
圓通          2

case2: 天貓雙十一購物車加購統計:

雙11爆款清單與知名綜藝IP「火星情報局」跨界合做,汪涵、撒貝寧、陶晶瑩等大咖主持加盟,杭州、長沙兩地聯播,成功打造爲「雙11子IP」與「雙11購物風向標」,樹立電商內容綜藝化、娛樂化創新典範,爲長線模式探索打下基礎。ip

首次深度聯動線下場景,在銀泰門店落地爆款清單超級大屏,商場人流截停率28%,用戶互動時間佔營業時間的40%。get

選品模式創新,打造最全維度爆款清單:TOP2000性價比爆款+TOP100小黑盒推薦(新品清單)+TOP200買手天團推薦(人羣/場景/地域 清單)it

核心業務指標

  • 加購金額
  • 加購件數
  • 加購UV

業務計算邏輯

  • 來自TT的數據要進行去重;
  • 以投放場景和購物車維度進行分組,獲取每一個分組的最後一條(最新)數據;
  • 以投放場景和小時爲維度進行分組,統計 加購金額,加購件數和加購UV 業務指標;

業務BlinkSQL代碼

--Blink SQL
--********************************************************************--
--Comment: 天貓雙11官方爆款清單統計計算
--********************************************************************--
CREATE TABLE dwd_mkt_membercart_ri(
    cart_id      BIGINT, -- '購物車id',
    sku_id       BIGINT, -- '存放商品的skuId,無sku時,爲0',
    item_id      BIGINT, -- '外部id:商品id或者skuid',
    quantity     BIGINT, -- '購買數量',
    user_id      BIGINT, -- '用戶id',
    status       BIGINT, -- '狀態1:正常-1:刪除',
    gmt_create   VARCHAR, -- '屬性建立時間',
    gmt_modified VARCHAR, -- '屬性修改時間',
    biz_id VARCHAR, -- 投放場景,
    start_time VARCHAR, -- 投放開始時間
    end_time VARCHAR, -- 投放結束時間
    activity_price_time VARCHAR, -- 活動開始時間
    price VARCHAR, -- 商品價格
    dbsync_operation BIGINT -- 時間自動用於排序
) 
WITH 
(
    type='tt'
    -- 其餘信息省略
);

--groub by 方式重複,防止TT重發
CREATE VIEW distinct_dwd_mkt_membercart_ri AS 
SELECT
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation
FROM
    dwd_mkt_membercart_ri
GROUP BY    
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation;

-- 每一個投放和購物車數據的最後一條
CREATE VIEW tmp_dwd_mkt_membercart_ri AS 
SELECT 
    biz_id as biz_id,
    LAST_VALUE(user_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as user_id,
    LAST_VALUE(item_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as item_id, 
    LAST_VALUE(sku_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as sku_id,
    LAST_VALUE(start_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as start_time,
    LAST_VALUE(end_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as end_time,
    LAST_VALUE(activity_price_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as activity_price_time,
    LAST_VALUE(price,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as price,
    LAST_VALUE(quantity,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as quantity,
    LAST_VALUE(status,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as status,
    LAST_VALUE(gmt_modified,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_modified,
    LAST_VALUE(gmt_create,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_create,
    LAST_VALUE(dbsync_operation,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as dbsync_operation
FROM distinct_dwd_mkt_membercart_ri
WHERE DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')=DATE_FORMAT(gmt_modified , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')
GROUP BY 
cart_id,biz_id;

--存儲小時維度的計算結果
CREATE TABLE result_mkt_membercart_ri_eh(
    id VARCHAR, 
    data_time VARCHAR,  
    all_preheating_cart_cnt BIGINT, -- 預熱期間的 加購件數
    all_preheating_cart_alipay BIGINT,-- 預熱期間的 加購金額
    eh_all_preheating_cart_uv BIGINT,-- 預熱期間的 加購UV
    all_cart_cnt BIGINT, -- 投放期間的 加購件數
    all_cart_alipay BIGINT, -- 投放期間的 加購金額
    eh_all_cart_uv BIGINT, -- 投放期間的 加購UV
    primary key(id,data_time)
) WITH (
    type = 'custom',
     -- 其餘信息省略
    timeDiv='hour'
) ;
--統計小時維度的 xx xx xx 業務指標
INSERT INTO result_mkt_membercart_ri_eh 
SELECT 
    biz_id,
    DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') data_time, 
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity else 0 end) as all_preheating_cart_cnt,
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity*CAST(price AS BIGINT) else 0 end) as all_preheating_cart_alipay,
    `sum`((case when gmt_modified<=COALESCE(activity_price_time,end_time) then user_id end)) eh_all_preheating_cart_uv,
    `sum`(quantity) as all_cart_cnt,
    `sum`(quantity*CAST(price AS BIGINT)) as all_cart_alipay,
    `count`(distinct user_id) eh_all_cart_uv
FROM tmp_dwd_mkt_membercart_ri 
WHERE 
   status>0 
GROUP BY  biz_id ,DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') ;

上面case2天貓業務場景裏面的加購金額統計來講,當每一個投放場景的購物車的數據發生變化時候,就意味着上面【CREATE VIEW tmp_dwd_mkt_membercart_ri 】中的LAST_VALUE發生變化,最外層的sum統計【INSERT INTO result_mkt_membercart_ri_eh 】就要將前一條的LAST_VALUE【VALUE-1】撤回,用update的新LAST_VALUE【VALUE-2】進行求和統計,這樣blink就須要有一種機制將VALUE-1進行撤回,利用【VALUE-2】進行計算,這種機制咱們稱爲retract。io

retract 實現原理參考

https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw/edit#heading=h.cjkoun4w44l4

相關文章
相關標籤/搜索