交易數據的實時統計是電商網站一個核心功能,能夠幫助用戶實時統計網站的總體銷售狀況,快速驗證「新銷售策略」的效果。咱們今天介紹一個基於表格存儲(Tablestore)實現交易數據的實時計算,給你們提供一個新使用方式。html
Tablestore做爲在線的結構化數據庫,提供了毫秒級的訪問延時和豐富的查詢方式,能高效的支撐交易數據的存儲和查詢,同時Tablestore已經原生支持阿里雲的流計算框架Flink/Blink,能夠實現數據的實時計算。web
注意:示例是模擬一個電商網站的交易數據的存儲和實時計算,目的是爲了展現Tablestore + Blink的使用流程。數據庫
用戶經過SDK將網站交易數據實時的存儲(PutRow/BatchWrite/TableStoreWriter)到Tablestore的source_order表中,Tablestore經過Tunnel服務,將實時增量的數據流入到Flink/Blink中,每5秒進行一次聚合計算,並將計算的結果從新寫回Tablestore的sink_order表中。最後提供給「大屏」實時讀取(GetRange)展現。json
source表是原始數據表,存儲了全部交易記錄。vim
字段 | 類型 | 註釋 |
---|---|---|
metering(PrimaryKey) | string | 計量類型,樣例中默認是web |
orderid(PrimaryKey) | string | 訂單號ID |
ts | integer | 交易時間(Unix時間戳,毫秒精度) |
price | double | 交易金額 |
buyerid | integer | 買家ID |
sellerid | integer | 賣家ID |
productid | integer | 商品ID |
字段 | 類型 | 註釋 |
---|---|---|
metering(PrimaryKey) | string | 計量類型,樣例中默認是web |
ts(PrimaryKey) | integer | 交易時間(Unix時間戳,毫秒精度) |
price | double | 交易金額 |
ordercount | integer | 交易次數 |
DDL參考架構
注意:當前Blink在支持Tablestore source上還有些限制,只能運行ProcessingTime的方式,將來會支持EventTime模式,按照用戶數據參數的時間進行計算。框架
-- Source 源表建立 CREATE TABLE ots_input ( metering VARCHAR, orderid VARCHAR, price DOUBLE, byerid BIGINT, sellerid BIGINT, productid BIGINT, primary key(metering,orderid), ts AS PROCTIME() ) WITH ( type = 'ots', instanceName = 'ordertest', tableName = 'source_order', accessId = '******************', accessKey = '******************', endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com', tunnelName = 'blink_agg' ); -- Sink 結果表建立 CREATE TABLE ots_output ( metering VARCHAR, ts BIGINT, price DOUBLE, ordercount BIGINT, primary key(metering,ts) ) WITH ( type = 'ots', instanceName = 'ordertest', tableName = 'sink_order', accessId = '******************', accessKey = '******************', endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com', valueColumns = 'price,ordercount' ); -- 計算 INSERT INTO ots_output SELECT DISTINCT metering as metering, CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts, SUM(price) as price, COUNT(orderid) as ordercount FROM ots_input GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;
準備表格存儲實例,能夠參考《表格存儲實例建立》運維
準備Flink/Blink項目,能夠參考《Blink如何購買》工具
表格存儲控制檯入口,建立表格存儲實例ordertest (用戶自定義便可,下面對於參數位置更換爲自定義的實例名),並記錄實例的VPC地址網站
同時在控制檯建立Source表和Sink表, 併爲Source表(source_order)開啓一個Tunnel服務blink_agg
圖三 Source表(source_order)
圖四 Sink表(sink_order)
圖五 源表和目標表
圖六 建立通道
Blink控制檯入口,建立一個Blink項目(獨享模式要建立集羣以後才能建立項目),分別建立一個做業,agg_order,並將上面的Flink SQL粘貼到窗口中,上線服務
在運維窗口中啓動該任務
程序默認會從'~/tablestoreConf.json'獲取配置
vim ~/tablestoreConf.json # 內容 { "endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com", "accessId":"************", "accessKey":"************", "instanceName":"ordertest" }
mvn install cd target tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz
能夠直接下載工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz
# 窗口1 ./bin/mock_order_generator # 窗口2 ./bin/data_show_screen
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。