案例與解決方案彙總頁:
阿里雲實時計算產品案例&解決方案彙總
菜鳥的物流數據自己就有鏈路複雜、實操節點多、彙總維度多、考覈邏輯複雜的特色,對於實時數據的計算存在很大挑戰。通過倉配ETL團隊的努力,目前倉配實時數據已覆蓋了絕大多數場景,可是有這樣一類特殊指標:「晚點超時指標」(例如:出庫超6小時未攬收的訂單量),仍存在實時彙總計算困難。緣由在於:流計算是基於消息觸發計算的,若沒有消息到達到則沒法計算,這類指標剛好是要求在指定的超時時間計算出有多少未達到的消息。然而,這類指標對於指導實操有着重要意義,能夠告知運營小二當前多少訂單積壓在哪些做業節點,應該督促哪些實操人員加快做業,這對於物流的時效KPI達成相當重要。前端
以前的方案是:由產品前端根據用戶的請求查詢OLAP數據庫,由OLAP從明細表出結果。大促期間,用戶請求量大,加之數據量大,故對OLAP的明細查詢形成了比較大的壓力。node
2.1 問題定義算法
「超時晚點指標」 是指,一筆訂單的兩個相鄰的實操節點node_n-1 、node_n 的完成時間 time_n-一、time_n,
當知足 : time_n is null && current_time - time_n-1 > kpi_length 時,time_flag_n 爲 true , 該筆訂單計入 超時晚點指標的計數。
以下圖,有一筆訂單其 node_1 爲出庫節點,時間爲time_1 = '2018-06-18 00:00:00' ,運營對出庫與攬收之間考覈的時長 kpi_length = 6h, 那麼當前天然時間 current_time > '2018-06-18 06:00:00' 時,且node_2攬收節點的time_2 爲null,則該筆訂單的 timeout_flag_2 = true , 「出庫超6小時未攬收訂單量」 加1。因爲要求time_2 爲null,即要求沒有攬收消息下發的狀況下讓流計算作彙總值更新,這違背了流計算基於消息觸發的基本原理,故流計算沒法直接算出這種「超時晚點指標」。數據庫
決問題的基本思路是:在考覈時刻(即 kpi_time = time_n-1+kpi_length )「製造」出一條消息下發給流計算,觸發彙總計算。繼續上面的例子:在考覈時刻「2018-06-18 06:00:00」利用MetaQ定時消息功能「製造」出一條消息下發給流計算彙總任務,觸發對該筆訂單的 time_out_flag_2 的判斷,增長彙總計數。同時,還利用 Blink 的Retraction 機制,當time_2 由null變成有值的時候,Blink 能夠對 time_out_flag_2 更新,從新計數。apache
2.2 方案架構網絡
如上圖所示:
Step1: Blink job1 接收來自上游系統的訂單數據,作清洗加工,生成訂單明細表:dwd_ord_ri,利用TT下發給Blink job2 和 Blink job3。
Step2:Blink job2 收到 dwd_ord_ri後,對每筆訂單算出考覈時刻 kpi_time = time_n-1+kpi_length,做爲MetaQ消息的「TIMER_DELIVER_MS」 屬性,寫入MetaQ。MetaQ的定時消息功能,能夠根據用戶寫入的TIMER_DELIVER_MS 在指定時刻下發給消費者,即上圖中的Blink job3。
Step3:Blink job3 接收 TT、MetaQ 兩個消息源,先作Join,再對time_flag判斷,最後作Aggregate計算。同一筆訂單,dwd_ord_ri、timing_msg任意一個消息到來,都會觸發join,time_flag判斷,aggregate從新計算一遍,Blink的Retraction可對結果進行實時更新。架構
2.3 實現細節ide
本方案根據物流場景中多種實操節點、多種考覈時長的特色,從Blink SQL代碼 和 自定義Sink兩方面作了特殊設計,從而實現了靈活配置、高效開發。函數
(1) Blink job2 --- 生成定時消息性能
關鍵Blink SQL 代碼以下。約定每條record的第一個字段爲投遞時間列表,即MetaQ向消費者下發消息的時刻List,也就是上面所說的多個考覈時刻。第二個字段爲保序字段,好比在物流場景中常常以訂單code、運單號做爲保序主鍵。該代碼實現了對每一個出庫的物流訂單,根據其出庫時間,向後延遲6小時(21600000毫秒)、12小時(43200000毫秒)、24小時(86400000毫秒)由MetaQ向消費者下發三個定時消息。
create table metaq_timing_msg ( deliver_time_list varchar comment '投遞時間列表', -- 約定第一個字段爲投遞時間list lg_code varchar comment '物流訂單code', -- 約定第二字段爲保序主鍵 node_name varchar comment '節點名稱', node_time varchar comment '節點時間', ) WITH ( type = 'custom', class = 'com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink', tag = 'store', topic = 'blink_metaq_delay_msg_test', producergroup = 'blinktest', retrytimes = '5', sleeptime = '1000' ); insert into metaq_timing_msg select concat_ws(',', cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar), --6小時 cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar), --12小時 cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar) --24小時 ) as deliver_time_list, lg_code, 'wms' as node_name, store_out_time as node_time from ( select lg_code, FIRST_VALUE(store_out_time) as store_out_time from srctable group by lg_code )b where store_out_time is not null ;
(2) Blink 自定義Sink --- MetaQTimingMsg Sink
Blink的當前版本還不支持 MetaQ的定時消息功能的Sink,故利用 Blink的自定義Sink功能,並結合菜鳥物流數據的特色開發了MetaQTimingMsg Sink。關鍵代碼以下(實現 writeAddRecord 方法)。
@Override public void writeAddRecord(Row row) throws IOException { Object deliverTime = row.getField(0); String[] deliverTimeList = deliverTime.toString().split(","); for(String dTime:deliverTimeList){ String orderCode = row.getField(1).toString(); String key = orderCode + "_" + dTime; Message message = newMessage(row, dTime, key); boolean result = sendMessage(message,orderCode); if(!result){ LOG.error(orderCode + " : " + dTime + " send failed"); } } } private Message newMessage(Row row,String deliverMillisec,String key){ //Support Varbinary Type Insert Into MetaQ Message message = new Message(); message.setKeys(key); message.putUserProperty("TIMER_DELIVER_MS",deliverMillisec); int arity = row.getArity(); Object[] values = new Object[arity]; for(int i=0;i<arity;i++){ values[i]=row.getField(i); } String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER); try { byte[] bytes = lineStr.getBytes(ENCODING); message.setBody(bytes); message.setWaitStoreMsgOK(true); } catch (UnsupportedEncodingException e) { LOG.error("create new message error",e); } return message; } private boolean sendMessage(Message message,String orderCode){ long retryTime = 0; boolean isSendSuccess = true; if(message != null){ message.setTopic(topicName); message.setTags(tagName); } SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { .... // 針對物流訂單code的hash算法 return list.get(index.intValue()); } },orderCode); if(!result.getSendStatus().equals(SendStatus.SEND_OK)){ LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString()); isSendSuccess = false; } return isSendSuccess; } }
(3)Blink job3 --- 彙總計算
關鍵Blink SQL 代碼以下,統計了每一個倉庫的「出庫超6小時未攬收物理訂單」、「出庫超12小時未攬收物理訂單」、「出庫超24小時未攬收物理訂單」的彙總值。代碼中使用了「stringLast()」函數處理來自dwd_ord_ri的每條消息,以取得每一個物流訂單的最新出庫攬收狀況,利用Blink Retraction機制,更新彙總值。
create view dws_store_view as select t1.store_code, max(t1.store_name) as store_name, count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 21600 then t2.lg_code end ) as tms_not_collect_6h_ord_cnt, ---出庫超6小時未攬收物流訂單量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 43200 then t2.lg_code end ) as tms_not_collect_12h_ord_cnt,---出庫超6小時未攬收物流訂單量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 86400 then t2.lg_code end ) as tms_not_collect_24h_ord_cnt ---出庫超6小時未攬收物流訂單量 from ( select lg_code, coalesce(store_code,'-1') as store_code, store_name, store_out_time, tms_collect_time from ( select lg_code, max(store_code) as store_code, max(store_name) as store_name, stringLast(store_out_time) as store_out_time, stringLast(tms_collect_time)as tms_collect_time, from dwd_ord_ri group by lg_code ) a ) t1 left outer join ( select lg_code, from timing_msg where node_name = 'wms' group by lg_code ) t2 on t1.lg_code = t2.lg_code group by t1.store_code ;
3.1 配置靈活
咱們從「Blink SQL 代碼」 和「自定義MetaQ」 兩個方面設計,用戶能夠根據具體的業務場景,在Blink SQL的一個view裏就能實現多種節點多種考覈時間的定時消息生成,而不是針對每個實操節點的每一種定時指標都要寫一個view,這樣大大節省了代碼量,提高了開發效率。例如對於倉庫節點的出庫超6小時未攬收、超12小時未攬收、超24小時未攬收,這三個指標利用上述方案,僅需在Blink job2的中metaq_timing_msg的第一個字段deliver_time_list中拼接三個kpi_length,即6小時、12小時、24小時爲一個字符串便可,由MetaQTimingMsg Sink自動拆分紅三條消息下發給MetaQ。對於不一樣的節點的考覈,僅需在node_name,node_time填寫不一樣的節點名稱和節點實操時間便可。
3.2 主鍵保序
如2.3節所述,自定義的Sink中 實現了MetaQ的 MessageQueueSelector 接口的 select() 方法,同時在Blink SQL 生成的MetaQ消息默認第二個字段爲保序主鍵字段。從而,能夠根據用戶自定義的主鍵,保證同一主鍵的全部消息放在同一個通道內處理,從而保證按主鍵保序,這對於流計算很是關鍵,可以實現數據的實時準確性。
3.3 性能優良
讓專業的團隊作專業的事。我的認爲,這種大規模的消息存儲、消息下發的任務本就應該交給「消息中間件」來處理,這樣既能夠作到計算與消息存儲分離,也能夠方便消息的管理,好比針對不一樣的實操節點,咱們還能夠定義不一樣的MetaQ的tag。
另外,正如2.2節所述,咱們對定時消息量作了優化。考慮到一筆訂單的屬性字段或其餘節點更新會下發多條消息,咱們利用了Blink的FIRST_VALUE函數,在Blink job2中同一筆訂單的的一種考覈指標只下發一條定時消息,大大減小了消息量,減輕了Blink的寫壓力,和MetaQ的存儲。
馬汶園 阿里巴巴 -菜鳥網絡—數據部 數據工程師
菜鳥倉配實時研發核心成員,主導屢次倉配大促實時數據研發,對利用Blink的原理與特性解決物流場景問題有深刻思考與理解。
本文爲雲棲社區原創內容,未經容許不得轉載。