使用Flink SQL結合Kafka、Elasticsearch、Kibana實時分析電商用戶行爲 (Use flink sql to combine kafka, elasticsearch and kibana, real-time analysis of e-commerce user behavior.)javascript
Flink與其它實時計算工具區別之一是向用戶提供了更多抽象易用的API,好比讀寫各種程序的connector接口、Table API和SQL,從數據加載、計算、一直到輸出,全部操做均可以使用SQL完成,大大減小了開發量和維護成本,本文將經過實時分析電商用戶行爲數據介紹flink sql的使用,分析的內容以下:php
數據採集存儲到kafka,經過flink消費kafka數據,實時計算,結果存儲到es,最後經過kibana展示。css
版本信息
flink 1.12.一、kafka_2.13-2.7.0、elasticsearch 7.10.一、kibana 7.10.1html
電商用戶行爲分析共涉及3個表,商品類目信息表、商品類目信息表、用戶行爲信息表,其中用戶行爲信息表共5個列:用戶ID、商品ID、商品類目ID、行爲類型、時間戳;java
./kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9092 --from-beginning --max-messages 5 1,2268318,2520377,pv,1511544070 1,2333346,2520771,pv,1511561733
數據來源於淘寶開放的用戶行爲數據UserBehavior,數據格式爲csv,以逗號分隔;python
如今數據已經存儲在kafka,進入flink sql client,mysql
建立消費kafka數據表;nginx
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, app_time BIGINT, ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time, 'yyyy-MM-dd HH:mm:ss')), proctime AS PROCTIME(), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', --使用kafka connector 'topic' = 'user_behavior', --kafka topic 'scan.startup.mode' = 'earliest-offset', --從topic最開始處開始消費 'properties.bootstrap.servers'='localhost:9092', --kafka broker地址 'properties.group.id' = 'test-group03', 'format' = 'csv', --存儲在kafka的數據格式爲csv 'csv.field-delimiter'=',' --數據分隔符 );
最終的分析結果數據會寫入es,首先建立es index和寫入es的表;web
CREATE TABLE cumulative_uv ( date_str STRING, time_str STRING, uv BIGINT, PRIMARY KEY (date_str, time_str) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'cumulative_uv' );
分析每10分鐘在線用戶數只須要知道日期(date_str)、時間(time_str)、數量(uv)便可;上面已經定義了消費kafka數據的表 user_behavior,如今查詢該表,並將數據寫入es;sql
INSERT INTO cumulative_uv SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv FROM ( SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str, user_id FROM user_behavior) GROUP BY date_str;
因爲分析跨度爲每10分鐘,在sql 內層查詢中使用 SUBSTR 截取事件小時和分鐘字符,拼湊成每10分鐘的數據,好比: 12:10,12:20。提交sql後,flink會將sql以流做業方式按照設定的WATERMARK和窗口提交到集羣運行;
如今查詢kibina能夠看到數據已經實時寫入.
建立es index和寫入es的表;
CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'buy_cnt_per_hour' );
查詢 user_behavior 表,將數據寫入es;
INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior='buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
首先經過(behavior='buy') 過濾出購買數據,再經過窗口函數(TUMBLE)按一小時切窗,統計出每小時共有多少"buy"的用戶行爲。
因爲kafka數據存儲的是商品id,商品id對應的商品類目名稱存儲在mysql數據庫,需先建立鏈接mysql的數據表;
CREATE TABLE category ( category_id BIGINT, category_name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'category', 'username' = 'sywu', 'password' = 'sywu', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '10min' );
爲了後續查詢方便,建立kafka數據表和mysql數據表關聯視圖;
CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, case when C.category_name is null then 'other' else C.category_name end as category_name FROM user_behavior AS U LEFT JOIN category FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id;
如今 kafka 數據表和 mysql數據表經過視圖表 rich_user_behavior 關聯在一塊兒;分析top瀏覽商品類目只須要知道商品類目名和瀏覽數便可,因此在此建立一張包含商品類目名和瀏覽數的表;
CREATE TABLE top_category ( category_name STRING PRIMARY KEY NOT ENFORCED, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'top_category' );
查詢視圖表 rich_user_behavior表,過濾分組統計數據;
INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior='buy' GROUP BY category_name;
到此3個分析需求實現,做業正常實時運行。
經過Flink 提供的Table API和SQL,以及處理數據的窗口、讀寫各種程序的connector接口和函數,使用上面的SQL DML操做,flink即實現了用戶行爲數據的實時分析需求;從開發角度看,代碼量和開發難度大大下降;從維護角度看,維護成本也大大下降。