實時數倉主要是爲了解決傳統數倉數據時效性低的問題,實時數倉一般會用在實時的OLAP分析、實時的數據看板、業務指標實時監控等場景。雖然關於實時數倉的架構及技術選型與傳統的離線數倉會存在差別,可是關於數倉建設的基本方法論是一致的。本文會分享基於Flink SQL從0到1搭建一個實時數倉的demo,涉及數據採集、存儲、計算、可視化整個處理流程。經過本文你能夠了解到:node
古人學問無遺力,少壯工夫老始成。mysql
紙上得來終覺淺,絕知此事要躬行。sql
本文會以電商業務爲例,展現實時數倉的數據處理流程。另外,本文旨在說明實時數倉的構建流程,因此不會涉及太複雜的數據計算。爲了保證案例的可操做性和完整性,本文會給出詳細的操做步驟。爲了方便演示,本文的全部操做都是在Flink SQL Cli中完成的。數據庫
具體的架構設計如圖所示:首先經過canal解析MySQL的binlog日誌,將數據存儲在Kafka中。而後使用Flink SQL對原始數據進行清洗關聯,並將處理以後的明細寬表寫入kafka中。維表數據存儲在MySQL中,經過Flink SQL對明細寬表與維表進行JOIN,將聚合後的數據寫入MySQL,最後經過FineBI進行可視化展現。apache
CREATE TABLE `order_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號', `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人', `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話', `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額', `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態', `user_id` bigint(20) DEFAULT NULL COMMENT '用戶id', `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式', `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址', `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備註', `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)', `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)', `create_time` datetime DEFAULT NULL COMMENT '建立時間', `operate_time` datetime DEFAULT NULL COMMENT '操做時間', `expire_time` datetime DEFAULT NULL COMMENT '失效時間', `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號', `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號', `img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑', `province_id` int(20) DEFAULT NULL COMMENT '地區', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表';
CREATE TABLE `order_detail` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號', `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號', `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id', `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗餘)', `img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗餘)', `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)', `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數', `create_time` datetime DEFAULT NULL COMMENT '建立時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單詳情表';
CREATE TABLE `sku_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)', `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid', `price` decimal(10,0) DEFAULT NULL COMMENT '價格', `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱', `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品規格描述', `weight` decimal(10,2) DEFAULT NULL COMMENT '重量', `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗餘)', `category3_id` bigint(20) DEFAULT NULL COMMENT '三級分類id(冗餘)', `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默認顯示圖片(冗餘)', `create_time` datetime DEFAULT NULL COMMENT '建立時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
CREATE TABLE `base_category1` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號', `name` varchar(10) NOT NULL COMMENT '分類名稱', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一級分類表';
CREATE TABLE `base_category2` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號', `name` varchar(200) NOT NULL COMMENT '二級分類名稱', `category1_id` bigint(20) DEFAULT NULL COMMENT '一級分類編號', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二級分類表';
CREATE TABLE `base_category3` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號', `name` varchar(200) NOT NULL COMMENT '三級分類名稱', `category2_id` bigint(20) DEFAULT NULL COMMENT '二級分類編號', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三級分類表';
CREATE TABLE `base_province` ( `id` int(20) DEFAULT NULL COMMENT 'id', `name` varchar(20) DEFAULT NULL COMMENT '省名稱', `region_id` int(20) DEFAULT NULL COMMENT '大區id', `area_code` varchar(20) DEFAULT NULL COMMENT '行政區位碼' ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `base_region` ( `id` int(20) NOT NULL COMMENT '大區id', `region_name` varchar(20) DEFAULT NULL COMMENT '大區名稱', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
注意:以上的建表語句是在MySQL中完成的,完整的建表及模擬數據生成腳本見:json
連接:https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA 提取碼:zuqwbootstrap
關於ODS層的數據同步參見個人另外一篇文章基於Canal與Flink實現數據實時增量同步(一)。主要使用canal解析MySQL的binlog日誌,而後將其寫入到Kafka對應的topic中。因爲篇幅限制,不會對具體的細節進行說明。同步以後的結果以下圖所示:api
本案例中將維表存儲在了MySQL中,實際生產中會用HBase存儲維表數據。咱們主要用到兩張維表:區域維表和商品維表。處理過程以下:bash
首先將mydw.base_province
和mydw.base_region
這個主題對應的數據抽取到MySQL中,主要使用Flink SQL的Kafka數據源對應的canal-json格式,注意:在執行裝載以前,須要先在MySQL中建立對應的表,本文使用的MySQL數據庫的名字爲dim,用於存放維表數據。以下:架構
-- ------------------------- -- 省份 -- kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_base_province`; CREATE TABLE `ods_base_province` ( `id` INT, `name` STRING, `region_id` INT , `area_code`STRING ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_province', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 省份 -- MySQL Sink -- ------------------------- DROP TABLE IF EXISTS `base_province`; CREATE TABLE `base_province` ( `id` INT, `name` STRING, `region_id` INT , `area_code`STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'base_province', -- MySQL中的待插入數據的表 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'sink.buffer-flush.interval' = '1s' ); -- ------------------------- -- 省份 -- MySQL Sink Load Data -- ------------------------- INSERT INTO base_province SELECT * FROM ods_base_province; -- ------------------------- -- 區域 -- kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_base_region`; CREATE TABLE `ods_base_region` ( `id` INT, `region_name` STRING ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_region', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 區域 -- MySQL Sink -- ------------------------- DROP TABLE IF EXISTS `base_region`; CREATE TABLE `base_region` ( `id` INT, `region_name` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'base_region', -- MySQL中的待插入數據的表 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'sink.buffer-flush.interval' = '1s' ); -- ------------------------- -- 區域 -- MySQL Sink Load Data -- ------------------------- INSERT INTO base_region SELECT * FROM ods_base_region;
通過上面的步驟,將建立維表所須要的原始數據已經存儲到了MySQL中,接下來就須要在MySQL中建立維表,咱們使用上面的兩張表,建立一張視圖:dim_province
做爲維表:
-- --------------------------------- -- DIM層,區域維表, -- 在MySQL中建立視圖 -- --------------------------------- DROP VIEW IF EXISTS dim_province; CREATE VIEW dim_province AS SELECT bp.id AS province_id, bp.name AS province_name, br.id AS region_id, br.region_name AS region_name, bp.area_code AS area_code FROM base_region br JOIN base_province bp ON br.id= bp.region_id ;
這樣咱們所須要的維表:dim_province就建立好了,只須要在維表join時,使用Flink SQL建立JDBC的數據源,就可使用該維表了。同理,咱們使用相同的方法建立商品維表,具體以下:
-- ------------------------- -- 一級類目表 -- kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_base_category1`; CREATE TABLE `ods_base_category1` ( `id` BIGINT, `name` STRING )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category1', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 一級類目表 -- MySQL Sink -- ------------------------- DROP TABLE IF EXISTS `base_category1`; CREATE TABLE `base_category1` ( `id` BIGINT, `name` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'base_category1', -- MySQL中的待插入數據的表 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'sink.buffer-flush.interval' = '1s' ); -- ------------------------- -- 一級類目表 -- MySQL Sink Load Data -- ------------------------- INSERT INTO base_category1 SELECT * FROM ods_base_category1; -- ------------------------- -- 二級類目表 -- kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_base_category2`; CREATE TABLE `ods_base_category2` ( `id` BIGINT, `name` STRING, `category1_id` BIGINT )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category2', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 二級類目表 -- MySQL Sink -- ------------------------- DROP TABLE IF EXISTS `base_category2`; CREATE TABLE `base_category2` ( `id` BIGINT, `name` STRING, `category1_id` BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'base_category2', -- MySQL中的待插入數據的表 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'sink.buffer-flush.interval' = '1s' ); -- ------------------------- -- 二級類目表 -- MySQL Sink Load Data -- ------------------------- INSERT INTO base_category2 SELECT * FROM ods_base_category2; -- ------------------------- -- 三級類目表 -- kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_base_category3`; CREATE TABLE `ods_base_category3` ( `id` BIGINT, `name` STRING, `category2_id` BIGINT )WITH( 'connector' = 'kafka', 'topic' = 'mydw.base_category3', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 三級類目表 -- MySQL Sink -- ------------------------- DROP TABLE IF EXISTS `base_category3`; CREATE TABLE `base_category3` ( `id` BIGINT, `name` STRING, `category2_id` BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'base_category3', -- MySQL中的待插入數據的表 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'sink.buffer-flush.interval' = '1s' ); -- ------------------------- -- 三級類目表 -- MySQL Sink Load Data -- ------------------------- INSERT INTO base_category3 SELECT * FROM ods_base_category3; -- ------------------------- -- 商品表 -- Kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_sku_info`; CREATE TABLE `ods_sku_info` ( `id` BIGINT, `spu_id` BIGINT, `price` DECIMAL(10,0), `sku_name` STRING, `sku_desc` STRING, `weight` DECIMAL(10,2), `tm_id` BIGINT, `category3_id` BIGINT, `sku_default_img` STRING, `create_time` TIMESTAMP(0) ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.sku_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 商品表 -- MySQL Sink -- ------------------------- DROP TABLE IF EXISTS `sku_info`; CREATE TABLE `sku_info` ( `id` BIGINT, `spu_id` BIGINT, `price` DECIMAL(10,0), `sku_name` STRING, `sku_desc` STRING, `weight` DECIMAL(10,2), `tm_id` BIGINT, `category3_id` BIGINT, `sku_default_img` STRING, `create_time` TIMESTAMP(0), PRIMARY KEY (tm_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'sku_info', -- MySQL中的待插入數據的表 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'sink.buffer-flush.interval' = '1s' ); -- ------------------------- -- 商品 -- MySQL Sink Load Data -- ------------------------- INSERT INTO sku_info SELECT * FROM ods_sku_info;
通過上面的步驟,咱們能夠將建立商品維表的基礎數據表同步到MySQL中,一樣須要提早建立好對應的數據表。接下來咱們使用上面的基礎表在mySQL的dim庫中建立一張視圖:dim_sku_info
,用做後續使用的維表。
-- --------------------------------- -- DIM層,商品維表, -- 在MySQL中建立視圖 -- --------------------------------- CREATE VIEW dim_sku_info AS SELECT si.id AS id, si.sku_name AS sku_name, si.category3_id AS c3_id, si.weight AS weight, si.tm_id AS tm_id, si.price AS price, si.spu_id AS spu_id, c3.name AS c3_name, c2.id AS c2_id, c2.name AS c2_name, c3.id AS c1_id, c3.name AS c1_name FROM ( sku_info si JOIN base_category3 c3 ON si.category3_id = c3.id JOIN base_category2 c2 ON c3.category2_id =c2.id JOIN base_category1 c1 ON c2.category1_id = c1.id );
至此,咱們所須要的維表數據已經準備好了,接下來開始處理DWD層的數據。
通過上面的步驟,咱們已經將所用的維表已經準備好了。接下來咱們將對ODS的原始數據進行處理,加工成DWD層的明細寬表。具體過程以下:
-- ------------------------- -- 訂單詳情 -- Kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_order_detail`; CREATE TABLE `ods_order_detail`( `id` BIGINT, `order_id` BIGINT, `sku_id` BIGINT, `sku_name` STRING, `img_url` STRING, `order_price` DECIMAL(10,2), `sku_num` INT, `create_time` TIMESTAMP(0) ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.order_detail', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ------------------------- -- 訂單信息 -- Kafka Source -- ------------------------- DROP TABLE IF EXISTS `ods_order_info`; CREATE TABLE `ods_order_info` ( `id` BIGINT, `consignee` STRING, `consignee_tel` STRING, `total_amount` DECIMAL(10,2), `order_status` STRING, `user_id` BIGINT, `payment_way` STRING, `delivery_address` STRING, `order_comment` STRING, `out_trade_no` STRING, `trade_body` STRING, `create_time` TIMESTAMP(0) , `operate_time` TIMESTAMP(0) , `expire_time` TIMESTAMP(0) , `tracking_no` STRING, `parent_order_id` BIGINT, `img_url` STRING, `province_id` INT ) WITH( 'connector' = 'kafka', 'topic' = 'mydw.order_info', 'properties.bootstrap.servers' = 'kms-3:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- --------------------------------- -- DWD層,支付訂單明細表dwd_paid_order_detail -- --------------------------------- DROP TABLE IF EXISTS dwd_paid_order_detail; CREATE TABLE dwd_paid_order_detail ( detail_id BIGINT, order_id BIGINT, user_id BIGINT, province_id INT, sku_id BIGINT, sku_name STRING, sku_num INT, order_price DECIMAL(10,0), create_time TIMESTAMP(0), pay_time TIMESTAMP(0) ) WITH ( 'connector' = 'kafka', 'topic' = 'dwd_paid_order_detail', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- DWD層,已支付訂單明細表 -- 向dwd_paid_order_detail裝載數據 -- --------------------------------- INSERT INTO dwd_paid_order_detail SELECT od.id, oi.id order_id, oi.user_id, oi.province_id, od.sku_id, od.sku_name, od.sku_num, od.order_price, oi.create_time, oi.operate_time FROM ( SELECT * FROM ods_order_info WHERE order_status = '2' -- 已支付 ) oi JOIN ( SELECT * FROM ods_order_detail ) od ON oi.id = od.order_id;
通過上面的步驟,咱們建立了一張dwd_paid_order_detail明細寬表,並將該表存儲在了Kafka中。接下來咱們將使用這張明細寬表與維表進行JOIN,獲得咱們ADS應用層數據。
首先在MySQL中建立對應的ADS目標表:ads_province_index
CREATE TABLE ads.ads_province_index( province_id INT(10), area_code VARCHAR(100), province_name VARCHAR(100), region_id INT(10), region_name VARCHAR(100), order_amount DECIMAL(10,2), order_count BIGINT(10), dt VARCHAR(100), PRIMARY KEY (province_id, dt) ) ;
向MySQL的ADS層目標裝載數據:
-- Flink SQL Cli操做 -- --------------------------------- -- 使用 DDL建立MySQL中的ADS層表 -- 指標:1.天天每一個省份的訂單數 -- 2.天天每一個省份的訂單金額 -- --------------------------------- CREATE TABLE ads_province_index( province_id INT, area_code STRING, province_name STRING, region_id INT, region_name STRING, order_amount DECIMAL(10,2), order_count BIGINT, dt STRING, PRIMARY KEY (province_id, dt) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/ads', 'table-name' = 'ads_province_index', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe' ); -- --------------------------------- -- dwd_paid_order_detail已支付訂單明細寬表 -- --------------------------------- CREATE TABLE dwd_paid_order_detail ( detail_id BIGINT, order_id BIGINT, user_id BIGINT, province_id INT, sku_id BIGINT, sku_name STRING, sku_num INT, order_price DECIMAL(10,2), create_time STRING, pay_time STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'dwd_paid_order_detail', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- tmp_province_index -- 訂單彙總臨時表 -- --------------------------------- CREATE TABLE tmp_province_index( province_id INT, order_count BIGINT,-- 訂單數 order_amount DECIMAL(10,2), -- 訂單金額 pay_date DATE )WITH ( 'connector' = 'kafka', 'topic' = 'tmp_province_index', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- tmp_province_index -- 訂單彙總臨時表數據裝載 -- --------------------------------- INSERT INTO tmp_province_index SELECT province_id, count(distinct order_id) order_count,-- 訂單數 sum(order_price * sku_num) order_amount, -- 訂單金額 TO_DATE(pay_time,'yyyy-MM-dd') pay_date FROM dwd_paid_order_detail GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd') ; -- --------------------------------- -- tmp_province_index_source -- 使用該臨時彙總表,做爲數據源 -- --------------------------------- CREATE TABLE tmp_province_index_source( province_id INT, order_count BIGINT,-- 訂單數 order_amount DECIMAL(10,2), -- 訂單金額 pay_date DATE, proctime as PROCTIME() -- 經過計算列產生一個處理時間列 ) WITH ( 'connector' = 'kafka', 'topic' = 'tmp_province_index', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- DIM層,區域維表, -- 建立區域維表數據源 -- --------------------------------- DROP TABLE IF EXISTS `dim_province`; CREATE TABLE dim_province ( province_id INT, province_name STRING, area_code STRING, region_id INT, region_name STRING , PRIMARY KEY (province_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'dim_province', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'scan.fetch-size' = '100' ); -- --------------------------------- -- 向ads_province_index裝載數據 -- 維表JOIN -- --------------------------------- INSERT INTO ads_province_index SELECT pc.province_id, dp.area_code, dp.province_name, dp.region_id, dp.region_name, pc.order_amount, pc.order_count, cast(pc.pay_date as VARCHAR) FROM tmp_province_index_source pc JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp ON dp.province_id = pc.province_id;
當提交任務以後:觀察Flink WEB UI:
查看ADS層的ads_province_index表數據:
首先在MySQL中建立對應的ADS目標表:ads_sku_index
CREATE TABLE ads_sku_index ( sku_id BIGINT(10), sku_name VARCHAR(100), weight DOUBLE, tm_id BIGINT(10), price DOUBLE, spu_id BIGINT(10), c3_id BIGINT(10), c3_name VARCHAR(100) , c2_id BIGINT(10), c2_name VARCHAR(100), c1_id BIGINT(10), c1_name VARCHAR(100), order_amount DOUBLE, order_count BIGINT(10), sku_count BIGINT(10), dt varchar(100), PRIMARY KEY (sku_id,dt) );
向MySQL的ADS層目標裝載數據:
-- --------------------------------- -- 使用 DDL建立MySQL中的ADS層表 -- 指標:1.天天每一個商品對應的訂單個數 -- 2.天天每一個商品對應的訂單金額 -- 3.天天每一個商品對應的數量 -- --------------------------------- CREATE TABLE ads_sku_index ( sku_id BIGINT, sku_name VARCHAR, weight DOUBLE, tm_id BIGINT, price DOUBLE, spu_id BIGINT, c3_id BIGINT, c3_name VARCHAR , c2_id BIGINT, c2_name VARCHAR, c1_id BIGINT, c1_name VARCHAR, order_amount DOUBLE, order_count BIGINT, sku_count BIGINT, dt varchar, PRIMARY KEY (sku_id,dt) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/ads', 'table-name' = 'ads_sku_index', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe' ); -- --------------------------------- -- dwd_paid_order_detail已支付訂單明細寬表 -- --------------------------------- CREATE TABLE dwd_paid_order_detail ( detail_id BIGINT, order_id BIGINT, user_id BIGINT, province_id INT, sku_id BIGINT, sku_name STRING, sku_num INT, order_price DECIMAL(10,2), create_time STRING, pay_time STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'dwd_paid_order_detail', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- tmp_sku_index -- 商品指標統計 -- --------------------------------- CREATE TABLE tmp_sku_index( sku_id BIGINT, order_count BIGINT,-- 訂單數 order_amount DECIMAL(10,2), -- 訂單金額 order_sku_num BIGINT, pay_date DATE )WITH ( 'connector' = 'kafka', 'topic' = 'tmp_sku_index', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- tmp_sku_index -- 數據裝載 -- --------------------------------- INSERT INTO tmp_sku_index SELECT sku_id, count(distinct order_id) order_count,-- 訂單數 sum(order_price * sku_num) order_amount, -- 訂單金額 sum(sku_num) order_sku_num, TO_DATE(pay_time,'yyyy-MM-dd') pay_date FROM dwd_paid_order_detail GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd') ; -- --------------------------------- -- tmp_sku_index_source -- 使用該臨時彙總表,做爲數據源 -- --------------------------------- CREATE TABLE tmp_sku_index_source( sku_id BIGINT, order_count BIGINT,-- 訂單數 order_amount DECIMAL(10,2), -- 訂單金額 order_sku_num BIGINT, pay_date DATE, proctime as PROCTIME() -- 經過計算列產生一個處理時間列 ) WITH ( 'connector' = 'kafka', 'topic' = 'tmp_sku_index', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kms-3:9092', 'format' = 'changelog-json' ); -- --------------------------------- -- DIM層,商品維表, -- 建立商品維表數據源 -- --------------------------------- DROP TABLE IF EXISTS `dim_sku_info`; CREATE TABLE dim_sku_info ( id BIGINT, sku_name STRING, c3_id BIGINT, weight DECIMAL(10,2), tm_id BIGINT, price DECIMAL(10,2), spu_id BIGINT, c3_name STRING, c2_id BIGINT, c2_name STRING, c1_id BIGINT, c1_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://kms-1:3306/dim', 'table-name' = 'dim_sku_info', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123qwe', 'scan.fetch-size' = '100' ); -- --------------------------------- -- 向ads_sku_index裝載數據 -- 維表JOIN -- --------------------------------- INSERT INTO ads_sku_index SELECT sku_id , sku_name , weight , tm_id , price , spu_id , c3_id , c3_name, c2_id , c2_name , c1_id , c1_name , sc.order_amount, sc.order_count , sc.order_sku_num , cast(sc.pay_date as VARCHAR) FROM tmp_sku_index_source sc JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds ON ds.id = sc.sku_id ;
當提交任務以後:觀察Flink WEB UI:
查看ADS層的ads_sku_index表數據:
當在代碼中使用Flink1.11.0版本時,若是將一個change-log的數據源insert到一個upsert sink時,會報以下異常:
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
該bug目前已被修復,修復能夠在Flink1.11.1中使用。
本文主要分享了構建一個實時數倉的demo案例,經過本文能夠了解實時數倉的數據處理流程,在此基礎之上,對Flink SQL的CDC會有更加深入的認識。另外,本文給出了很是詳細的使用案例,你能夠直接上手進行操做,在實踐中探索實時數倉的構建流程。
公衆號『大數據技術與數倉』,回覆『資料』領取大數據資料包