Tablestore + Blink實戰:交易數據的實時統計

背景

交易數據的實時統計是電商網站一個核心功能,能夠幫助用戶實時統計網站的總體銷售狀況,快速驗證「新銷售策略」的效果。咱們今天介紹一個基於表格存儲(Tablestore)實現交易數據的實時計算,給你們提供一個新使用方式。html

Tablestore做爲在線的結構化數據庫,提供了毫秒級的訪問延時和豐富的查詢方式,能高效的支撐交易數據的存儲和查詢,同時Tablestore已經原生支持阿里雲的流計算框架Flink/Blink,能夠實現數據的實時計算。web

架構

示例設計

基本場景

注意:示例是模擬一個電商網站的交易數據的存儲和實時計算,目的是爲了展現Tablestore + Blink的使用流程。數據庫

用戶經過SDK將網站交易數據實時的存儲(PutRow/BatchWrite/TableStoreWriter)到Tablestore的source_order表中,Tablestore經過Tunnel服務,將實時增量的數據流入到Flink/Blink中,每5秒進行一次聚合計算,並將計算的結果從新寫回Tablestore的sink_order表中。最後提供給「大屏」實時讀取(GetRange)展現。json

Source表(源表)- source_order

source表是原始數據表,存儲了全部交易記錄。vim

字段 類型 註釋
metering(PrimaryKey) string 計量類型,樣例中默認是web
orderid(PrimaryKey) string 訂單號ID
ts integer 交易時間(Unix時間戳,毫秒精度)
price double 交易金額
buyerid integer 買家ID
sellerid integer 賣家ID
productid integer 商品ID

Sink表(結果表)- sink_order

字段 類型 註釋
metering(PrimaryKey) string 計量類型,樣例中默認是web
ts(PrimaryKey) integer 交易時間(Unix時間戳,毫秒精度)
price double 交易金額
ordercount integer 交易次數

Flink SQL

DDL參考架構

注意:當前Blink在支持Tablestore source上還有些限制,只能運行ProcessingTime的方式,將來會支持EventTime模式,按照用戶數據參數的時間進行計算。框架

-- Source 源表建立
CREATE TABLE ots_input (
    metering VARCHAR,
    orderid VARCHAR,
    price DOUBLE,
    byerid BIGINT,
    sellerid BIGINT,
    productid BIGINT,
    primary key(metering,orderid),
    ts AS PROCTIME()
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'source_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    tunnelName = 'blink_agg'
);
-- Sink 結果表建立
CREATE TABLE ots_output (
    metering VARCHAR,
    ts BIGINT,
    price DOUBLE,
    ordercount BIGINT,
    primary key(metering,ts)
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'sink_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    valueColumns = 'price,ordercount'
);

-- 計算
INSERT INTO ots_output
SELECT 
    DISTINCT metering as metering,
    CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts,
    SUM(price) as price,
    COUNT(orderid) as ordercount
FROM ots_input
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;

實戰

第一步:準備帳戶與開服

準備表格存儲實例,能夠參考《表格存儲實例建立》運維

準備Flink/Blink項目,能夠參考《Blink如何購買》工具

第二步:建立資源

表格存儲資源

表格存儲控制檯入口,建立表格存儲實例ordertest (用戶自定義便可,下面對於參數位置更換爲自定義的實例名),並記錄實例的VPC地址網站

同時在控制檯建立Source表和Sink表, 併爲Source表(source_order)開啓一個Tunnel服務blink_agg

圖三 Source表(source_order)

圖四 Sink表(sink_order)

圖五 源表和目標表

圖六 建立通道

Blink資源

Blink控制檯入口,建立一個Blink項目(獨享模式要建立集羣以後才能建立項目),分別建立一個做業,agg_order,並將上面的Flink SQL粘貼到窗口中,上線服務

在運維窗口中啓動該任務

第三步:壓入數據 並 實時獲取結算結果

1 準備配置文件

程序默認會從'~/tablestoreConf.json'獲取配置

vim ~/tablestoreConf.json

# 內容
{
    "endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com",
    "accessId":"************",
    "accessKey":"************",
    "instanceName":"ordertest"
}

2 構建源碼

mvn install
cd target
tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz

3 啓動壓力器和模擬大屏

能夠直接下載工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz

# 窗口1
./bin/mock_order_generator
# 窗口2
./bin/data_show_screen

4 效果


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索