Flink SQL結合Kafka、Elasticsearch、Kibana實時分析電商用戶行爲

使用Flink SQL結合Kafka、Elasticsearch、Kibana實時分析電商用戶行爲 (Use flink sql to combine kafka, elasticsearch and kibana, real-time analysis of e-commerce user behavior.)javascript

Flink與其它實時計算工具區別之一是向用戶提供了更多抽象易用的API,好比讀寫各種程序的connector接口、Table API和SQL,從數據加載、計算、一直到輸出,全部操做均可以使用SQL完成,大大減小了開發量和維護成本,本文將經過實時分析電商用戶行爲數據介紹flink sql的使用,分析的內容以下:php

  1. 分析每10分鐘累計在線用戶數;
  2. 分析每小時購買量;
  3. 分析top瀏覽商品類目(瀏覽的商品歸屬於那個類目);

1 最終實時分析kibana展示效果

2 流程和版本信息

  • kafka --> flink --> es -->kibana

數據採集存儲到kafka,經過flink消費kafka數據,實時計算,結果存儲到es,最後經過kibana展示。css

版本信息
flink 1.12.一、kafka_2.13-2.7.0、elasticsearch 7.10.一、kibana 7.10.1html

3 數據結構

電商用戶行爲分析共涉及3個表,商品類目信息表、商品類目信息表、用戶行爲信息表,其中用戶行爲信息表共5個列:用戶ID、商品ID、商品類目ID、行爲類型、時間戳;java

4 kafka數據

./kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9092 --from-beginning --max-messages 5

1,2268318,2520377,pv,1511544070
1,2333346,2520771,pv,1511561733

數據來源於淘寶開放的用戶行爲數據UserBehavior,數據格式爲csv,以逗號分隔;python

2 使用Flink SQL建表讀取kafka數據

如今數據已經存儲在kafka,進入flink sql client,mysql

建立消費kafka數據表;nginx

CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, app_time BIGINT, ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time, 'yyyy-MM-dd HH:mm:ss')), proctime AS PROCTIME(), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', --使用kafka connector 'topic' = 'user_behavior', --kafka topic 'scan.startup.mode' = 'earliest-offset', --從topic最開始處開始消費 'properties.bootstrap.servers'='localhost:9092', --kafka broker地址 'properties.group.id' = 'test-group03', 'format' = 'csv', --存儲在kafka的數據格式爲csv 'csv.field-delimiter'=',' --數據分隔符 ); 
  • WATERMARK 定義處理混亂次序的事件時間屬性,每5秒觸發一次window
  • PROCTIME 是內置函數,產生一個虛擬的Processing Time列,偶爾會用到
  • WITH 裏定義kafka鏈接信息和屬性
  • 因爲事件時間格式爲bigint,在sql中將其轉爲timestamp

3 分析場景

3.1 場景1:分析每10分鐘累計在線用戶數

最終的分析結果數據會寫入es,首先建立es index和寫入es的表;web

CREATE TABLE cumulative_uv (
    date_str STRING,
    time_str STRING,
    uv BIGINT,
    PRIMARY KEY (date_str, time_str) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200',
    'index' = 'cumulative_uv'
);
  • WITH 裏面定義es鏈接信息和屬性

分析每10分鐘在線用戶數只須要知道日期(date_str)、時間(time_str)、數量(uv)便可;上面已經定義了消費kafka數據的表 user_behavior,如今查詢該表,並將數據寫入es;sql

INSERT INTO cumulative_uv SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv FROM ( SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str, user_id FROM user_behavior) GROUP BY date_str;

因爲分析跨度爲每10分鐘,在sql 內層查詢中使用 SUBSTR 截取事件小時和分鐘字符,拼湊成每10分鐘的數據,好比: 12:10,12:20。提交sql後,flink會將sql以流做業方式按照設定的WATERMARK和窗口提交到集羣運行;

如今查詢kibina能夠看到數據已經實時寫入.

3.2 場景2:分析每小時購買量

建立es index和寫入es的表;

CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'buy_cnt_per_hour' );

查詢 user_behavior 表,將數據寫入es;

INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior='buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
  • HOUR 爲內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值
  • TUMBLE 爲窗口函數,按設定的時間切窗

首先經過(behavior='buy') 過濾出購買數據,再經過窗口函數(TUMBLE)按一小時切窗,統計出每小時共有多少"buy"的用戶行爲。

3.3 場景3:分析top瀏覽商品類目

因爲kafka數據存儲的是商品id,商品id對應的商品類目名稱存儲在mysql數據庫,需先建立鏈接mysql的數據表;

CREATE TABLE category ( category_id BIGINT, category_name STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'category', 'username' = 'sywu', 'password' = 'sywu', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '10min' );

爲了後續查詢方便,建立kafka數據表和mysql數據表關聯視圖;

CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, case when C.category_name is null then 'other' else C.category_name end as category_name FROM user_behavior AS U LEFT JOIN category FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id;

如今 kafka 數據表和 mysql數據表經過視圖表 rich_user_behavior 關聯在一塊兒;分析top瀏覽商品類目只須要知道商品類目名和瀏覽數便可,因此在此建立一張包含商品類目名和瀏覽數的表;

CREATE TABLE top_category ( category_name STRING PRIMARY KEY NOT ENFORCED, buy_cnt BIGINT ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'top_category' );

查詢視圖表 rich_user_behavior表,過濾分組統計數據;

INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior='buy' GROUP BY category_name;

到此3個分析需求實現,做業正常實時運行。 

4 總結

經過Flink 提供的Table API和SQL,以及處理數據的窗口、讀寫各種程序的connector接口和函數,使用上面的SQL DML操做,flink即實現了用戶行爲數據的實時分析需求;從開發角度看,代碼量和開發難度大大下降;從維護角度看,維護成本也大大下降。

參考文獻

相關文章
相關標籤/搜索