簡介:Flink+Hologres億級用戶實時UV精確去重最佳實踐
UV、PV計算,由於業務需求不一樣,一般會分爲兩種場景:html
針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情能夠參見往期文章>>Hologres如何支持超高基數UV計算(基於RoaringBitmap實現)java
對於實時計算場景,能夠使用Flink+Hologres方式,並基於RoaringBitmap,實時對用戶標籤去重。這樣的方式,能夠較細粒度的實時獲得用戶UV、PV數據,同時便於根據需求調整最小統計窗口(如最近5分鐘的UV),實現相似實時監控的效果,更好的在大屏等BI展現。相較於以天、周、月等爲單位的去重,更適合在活動日期進行更細粒度的統計,而且經過簡單的聚合,也能夠獲得較大時間單位的統計結果。git
1)建立表uid\_mapping爲uid映射表,用於映射uid到32位int類型。github
BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); --將uid設爲clustering_key和distribution_key便於快速查找其對應的int32值 CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;
2)建立表dws\_app爲基礎聚合表,用於存放在基礎維度上聚合後的結果。sql
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
--新建shard數爲16的Table Group, --由於測試數據量百萬級,其中後端計算資源爲100core,設置shard數爲16 BEGIN; CREATE TABLE tg16 (a int); --Table Group哨兵表 call set_table_property('tg16', 'shard_count', '16'); COMMIT;
BEGIN; create table dws_app( country text, prov text, city text, ymd text NOT NULL, --日期字段 timetz TIMESTAMPTZ, --統計時間戳,能夠實現以Flink窗口週期爲單位的統計 uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv primary key(country, prov, city, ymd, timetz)--查詢維度和時間做爲主鍵,防止重複插入數據 ); CALL set_table_property('public.dws_app', 'orientation', 'column'); --日期字段設爲clustering_key和event_time_column,便於過濾 CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); --等價於將表放在shard數爲16的table group call set_table_property('public.dws_app', 'colocate_with', 'tg16'); --group by字段設爲distribution_key CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
完整示例源碼請見alibabacloud-hologres-connectors examples後端
1)Flink 流式讀取數據源(DataStream),並轉化爲源表(Table)數組
//此處使用csv文件做爲數據源,也能夠是kafka等 DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // 與維表join須要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // 註冊到catalog環境 tableEnv.createTemporaryView("odsTable", odsTable);
2)將源表與Hologres維表(uid\_mapping)進行關聯app
其中維表使用insertIfNotExists參數,即查詢不到數據時自行插入,uid\_int32字段即可以利用Hologres的serial類型自增建立。ide
// 建立Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入 String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," //Hologres DB名 + " 'tablename' = '%s',"//Hologres 表名 + " 'username' = '%s'," //當前帳號access id + " 'password' = '%s'," //當前帳號access key + " 'endpoint' = '%s'," //Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); // 源表與維表join String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);
3)將關聯結果轉化爲DataStream,經過Flink時間窗口處理,結合RoaringBitmap進行聚合函數
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source // 篩選須要統計的維度(country, prov, city, ymd) .keyBy(0, 1, 2, 3) // 滾動時間窗口;此處因爲使用讀取csv模擬輸入流,採用ProcessingTime,實際使用中可以使用EventTime .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 觸發器,能夠在窗口未結束時獲取聚合結果 .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( // 聚合函數,根據key By篩選的維度,進行聚合 new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { // 將32位的uid添加到RoaringBitmap進行去重 acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, //窗口函數,輸出聚合結果 new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // 優化RoaringBitmap result.runOptimize(); // 將RoaringBitmap轉化爲字節數組以存入Holo中 byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度爲週期進行統計,以秒爲單位 out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });
4)寫入結果表
須要注意的是,Hologres中RoaringBitmap類型在Flink中對應Byte數組類型
// 計算結果轉換爲表 Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); // 建立Hologres結果表, 其中Hologres的RoaringBitmap類型經過Byte數組存入 String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); // 寫入計算結果到dws表 tableEnv.executeSql("insert into sink select * from " + resTable);
查詢時,從基礎聚合表(dws\_app)中按照查詢維度作聚合計算,查詢bitmap基數,得出group by條件下的用戶數
--運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好 set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;
--運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好 set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。