利用blink+MQ實現流計算中的超時統計問題

案例與解決方案彙總頁:
阿里雲實時計算產品案例&解決方案彙總

一. 背景介紹

菜鳥的物流數據自己就有鏈路複雜、實操節點多、彙總維度多、考覈邏輯複雜的特色,對於實時數據的計算存在很大挑戰。通過倉配ETL團隊的努力,目前倉配實時數據已覆蓋了絕大多數場景,可是有這樣一類特殊指標:「晚點超時指標」(例如:出庫超6小時未攬收的訂單量),仍存在實時彙總計算困難。緣由在於:流計算是基於消息觸發計算的,若沒有消息到達到則沒法計算,這類指標剛好是要求在指定的超時時間計算出有多少未達到的消息。然而,這類指標對於指導實操有着重要意義,能夠告知運營小二當前多少訂單積壓在哪些做業節點,應該督促哪些實操人員加快做業,這對於物流的時效KPI達成相當重要。前端

以前的方案是:由產品前端根據用戶的請求查詢OLAP數據庫,由OLAP從明細表出結果。大促期間,用戶請求量大,加之數據量大,故對OLAP的明細查詢形成了比較大的壓力。node

二. 解決方案

2.1   問題定義算法

「超時晚點指標」 是指,一筆訂單的兩個相鄰的實操節點node_n-1 、node_n 的完成時間 time_n-一、time_n,
當知足 : time_n is null  && current_time - time_n-1 > kpi_length 時,time_flag_n 爲 true , 該筆訂單計入 超時晚點指標的計數。
以下圖,有一筆訂單其 node_1 爲出庫節點,時間爲time_1 = '2018-06-18 00:00:00' ,運營對出庫與攬收之間考覈的時長 kpi_length = 6h, 那麼當前天然時間 current_time > '2018-06-18 06:00:00' 時,且node_2攬收節點的time_2 爲null,則該筆訂單的 timeout_flag_2 = true , 「出庫超6小時未攬收訂單量」 加1。因爲要求time_2 爲null,即要求沒有攬收消息下發的狀況下讓流計算作彙總值更新,這違背了流計算基於消息觸發的基本原理,故流計算沒法直接算出這種「超時晚點指標」。數據庫

決問題的基本思路是:在考覈時刻(即 kpi_time = time_n-1+kpi_length )「製造」出一條消息下發給流計算,觸發彙總計算。繼續上面的例子:在考覈時刻「2018-06-18 06:00:00」利用MetaQ定時消息功能「製造」出一條消息下發給流計算彙總任務,觸發對該筆訂單的 time_out_flag_2 的判斷,增長彙總計數。同時,還利用 Blink 的Retraction 機制,當time_2 由null變成有值的時候,Blink 能夠對 time_out_flag_2 更新,從新計數。apache

2.2 方案架構網絡

如上圖所示:
Step1:  Blink job1 接收來自上游系統的訂單數據,作清洗加工,生成訂單明細表:dwd_ord_ri,利用TT下發給Blink job2 和 Blink job3。
Step2:Blink job2 收到 dwd_ord_ri後,對每筆訂單算出考覈時刻 kpi_time = time_n-1+kpi_length,做爲MetaQ消息的「TIMER_DELIVER_MS」 屬性,寫入MetaQ。MetaQ的定時消息功能,能夠根據用戶寫入的TIMER_DELIVER_MS 在指定時刻下發給消費者,即上圖中的Blink job3。
Step3:Blink job3 接收 TT、MetaQ 兩個消息源,先作Join,再對time_flag判斷,最後作Aggregate計算。同一筆訂單,dwd_ord_ri、timing_msg任意一個消息到來,都會觸發join,time_flag判斷,aggregate從新計算一遍,Blink的Retraction可對結果進行實時更新。架構

2.3 實現細節ide

本方案根據物流場景中多種實操節點、多種考覈時長的特色,從Blink SQL代碼 和 自定義Sink兩方面作了特殊設計,從而實現了靈活配置、高效開發。函數

(1) Blink job2 --- 生成定時消息性能

關鍵Blink SQL 代碼以下。約定每條record的第一個字段爲投遞時間列表,即MetaQ向消費者下發消息的時刻List,也就是上面所說的多個考覈時刻。第二個字段爲保序字段,好比在物流場景中常常以訂單code、運單號做爲保序主鍵。該代碼實現了對每一個出庫的物流訂單,根據其出庫時間,向後延遲6小時(21600000毫秒)、12小時(43200000毫秒)、24小時(86400000毫秒)由MetaQ向消費者下發三個定時消息。

create table metaq_timing_msg
(
deliver_time_list       varchar comment '投遞時間列表', -- 約定第一個字段爲投遞時間list
lg_code           varchar comment '物流訂單code', -- 約定第二字段爲保序主鍵
node_name               varchar comment '節點名稱',
node_time               varchar comment '節點時間',
)
WITH
(
type = 'custom',
class = 'com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink',
tag = 'store',
topic = 'blink_metaq_delay_msg_test',
producergroup = 'blinktest',
retrytimes = '5',
sleeptime = '1000'
);
insert into metaq_timing_msg
select
concat_ws(',',
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar),  --6小時
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar),  --12小時
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar)   --24小時
)   as deliver_time_list,
lg_code,
'wms'  as node_name,
store_out_time  as node_time
from
(
select
lg_code,
FIRST_VALUE(store_out_time) as store_out_time
from srctable
group by lg_code
)b
where store_out_time is not null  ;

(2) Blink 自定義Sink --- MetaQTimingMsg Sink

Blink的當前版本還不支持 MetaQ的定時消息功能的Sink,故利用 Blink的自定義Sink功能,並結合菜鳥物流數據的特色開發了MetaQTimingMsg Sink。關鍵代碼以下(實現 writeAddRecord 方法)。

@Override
public void writeAddRecord(Row row) throws IOException {
Object deliverTime = row.getField(0);
String[] deliverTimeList = deliverTime.toString().split(",");
for(String dTime:deliverTimeList){
        String orderCode = row.getField(1).toString();
        String key = orderCode + "_" + dTime;
        Message message = newMessage(row, dTime, key);
        boolean result = sendMessage(message,orderCode);
        if(!result){
            LOG.error(orderCode + " : " + dTime + " send failed");
        }
    }
}

private  Message newMessage(Row row,String deliverMillisec,String key){

    //Support Varbinary Type Insert Into MetaQ
    Message message = new Message();
    message.setKeys(key);
    message.putUserProperty("TIMER_DELIVER_MS",deliverMillisec);
    int arity = row.getArity();
    Object[] values = new Object[arity];
    for(int i=0;i<arity;i++){
        values[i]=row.getField(i);
    }
    String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER);
    try {
        byte[] bytes = lineStr.getBytes(ENCODING);
        message.setBody(bytes);
        message.setWaitStoreMsgOK(true);
    } catch (UnsupportedEncodingException e) {
        LOG.error("create new message error",e);
    }
    return message;
}

private boolean sendMessage(Message message,String orderCode){
    long retryTime = 0;
    boolean isSendSuccess = true;
    if(message != null){
        message.setTopic(topicName);
        message.setTags(tagName);
    }
    SendResult result = producer.send(message, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                 .... // 針對物流訂單code的hash算法
                return list.get(index.intValue());
         }                    
    },orderCode);
    if(!result.getSendStatus().equals(SendStatus.SEND_OK)){
        LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString());
        isSendSuccess = false;
    }
    return  isSendSuccess;
}
}

(3)Blink job3 --- 彙總計算

關鍵Blink SQL 代碼以下,統計了每一個倉庫的「出庫超6小時未攬收物理訂單」、「出庫超12小時未攬收物理訂單」、「出庫超24小時未攬收物理訂單」的彙總值。代碼中使用了「stringLast()」函數處理來自dwd_ord_ri的每條消息,以取得每一個物流訂單的最新出庫攬收狀況,利用Blink Retraction機制,更新彙總值。

create view dws_store_view as  
select 
    t1.store_code,
    max(t1.store_name)       as store_name,
    count(case 
            when length(trim(t1.store_out_time)) = 19
            and t1.tms_collect_time is null 
            and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 21600
          then t2.lg_code end 
        ) as tms_not_collect_6h_ord_cnt, ---出庫超6小時未攬收物流訂單量
    count(case 
            when length(trim(t1.store_out_time)) = 19
            and t1.tms_collect_time is null 
            and  NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 43200
          then t2.lg_code end 
        ) as tms_not_collect_12h_ord_cnt,---出庫超6小時未攬收物流訂單量
    count(case 
            when length(trim(t1.store_out_time)) = 19
            and t1.tms_collect_time is null 
            and  NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 86400
          then t2.lg_code end 
        ) as tms_not_collect_24h_ord_cnt ---出庫超6小時未攬收物流訂單量
from 
(
    select 
        lg_code,
        coalesce(store_code,'-1')   as store_code,
        store_name,
        store_out_time,
        tms_collect_time
    from 
    (
        select 
            lg_code,
            max(store_code)          as store_code,
            max(store_name)          as store_name,
            stringLast(store_out_time)  as store_out_time,
            stringLast(tms_collect_time)as tms_collect_time, 
        from dwd_ord_ri
        group by lg_code
    ) a     
) t1
left outer join 
(
    select 
        lg_code,
    from timing_msg  
    where node_name = 'wms'
    group by lg_code
) t2
on t1.lg_code = t2.lg_code
group by
    t1.store_code
  ;

三.  方案優點

3.1 配置靈活

咱們從「Blink SQL 代碼」 和「自定義MetaQ」 兩個方面設計,用戶能夠根據具體的業務場景,在Blink SQL的一個view裏就能實現多種節點多種考覈時間的定時消息生成,而不是針對每個實操節點的每一種定時指標都要寫一個view,這樣大大節省了代碼量,提高了開發效率。例如對於倉庫節點的出庫超6小時未攬收、超12小時未攬收、超24小時未攬收,這三個指標利用上述方案,僅需在Blink job2的中metaq_timing_msg的第一個字段deliver_time_list中拼接三個kpi_length,即6小時、12小時、24小時爲一個字符串便可,由MetaQTimingMsg Sink自動拆分紅三條消息下發給MetaQ。對於不一樣的節點的考覈,僅需在node_name,node_time填寫不一樣的節點名稱和節點實操時間便可。

3.2 主鍵保序

如2.3節所述,自定義的Sink中 實現了MetaQ的 MessageQueueSelector 接口的 select() 方法,同時在Blink SQL 生成的MetaQ消息默認第二個字段爲保序主鍵字段。從而,能夠根據用戶自定義的主鍵,保證同一主鍵的全部消息放在同一個通道內處理,從而保證按主鍵保序,這對於流計算很是關鍵,可以實現數據的實時準確性。

3.3 性能優良

讓專業的團隊作專業的事。我的認爲,這種大規模的消息存儲、消息下發的任務本就應該交給「消息中間件」來處理,這樣既能夠作到計算與消息存儲分離,也能夠方便消息的管理,好比針對不一樣的實操節點,咱們還能夠定義不一樣的MetaQ的tag。
另外,正如2.2節所述,咱們對定時消息量作了優化。考慮到一筆訂單的屬性字段或其餘節點更新會下發多條消息,咱們利用了Blink的FIRST_VALUE函數,在Blink job2中同一筆訂單的的一種考覈指標只下發一條定時消息,大大減小了消息量,減輕了Blink的寫壓力,和MetaQ的存儲。

四. 自我介紹

馬汶園    阿里巴巴 -菜鳥網絡—數據部      數據工程師
菜鳥倉配實時研發核心成員,主導屢次倉配大促實時數據研發,對利用Blink的原理與特性解決物流場景問題有深刻思考與理解。



本文做者:付空

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索