案例說明java
功能說明sql
經過socket讀取數據源,進行單詞的統計處理。windows
實現流程socket
轉換操做處理:ide
1)以空格進行分割測試
2)給每一個單詞計數累加1ui
3)根據單詞進行分組處理url
4)求和統計code
5)輸出打印數據orm
FlinkTable API 方式實現
StreamTableApiApplication,代碼實現:
//獲取流處理的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //獲取Table的運行環境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); //接入數據源 DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922); //對字符串進行分詞壓平 SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { Arrays.stream(line.split(" ")).forEach(out::collect); } }); //將DataStream轉換成Table對象,字段名默認的是f0,給定字段名是word Table table = tabEnv.fromDataStream(words, "word"); //按照單詞進行分組聚合操做 Table resultTable = table.groupBy("word").select("word, sum(1L) as counts"); //在流處理中,數據會源源不斷的產生,須要累加處理,只能採用用toRestractStream // DataStream<WordCount> wordCountDataStream = tabEnv.toAppendStream(resultTable, WordCount.class); // wordCountDataStream.printToErr("toAppendStream>>>"); DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class); wordCountDataStream.printToErr("toRetractStream>>>"); env.execute();
測試驗證:
開啓socket輸入, 輸入字符串:
[root@flink1 flink-1.11.2]# nc -lk 9922
FlinkTable SQL 方式實現
代碼實現:
StreamTableSqlApplication實現類:
//獲取流處理的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //獲取Table的運行環境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); //接入數據源 DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922); //對字符串進行分詞壓平 SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { Arrays.stream(line.split(" ")).forEach(out::collect); } }); //將DataStream轉換成Table對象,字段名默認的是f0,給定字段名是word tabEnv.registerDataStream("t_wordcount", words, "word"); //按照單詞進行分組聚合操做 Table resultTable = tabEnv.sqlQuery("select word,count(1) as counts from t_wordcount group by word"); DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class); wordCountDataStream.printToErr("toRetractStream>>>"); env.execute();
Flink SQL 窗口說明
Flink SQL支持的窗口聚合主要是兩種:Window聚合和Over聚合。這裏主要介紹Window聚合。Window聚合支持兩種時間屬性定義窗口:Event Time和Processing Time。每種時間屬性類型支持三種窗口類型:滾動窗口(TUMBLE)、滑動窗口(HOP)和會話窗口(SESSION)。
案例說明
統計在過去的1分鐘內有多少用戶點擊了某個的網頁,能夠經過定義一個窗口來收集最近1分鐘內的數據,並對這個窗口內的數據進行計算。
測試數據:
| 用戶名 | 訪問地址 | 訪問時間|
| ------ | --------------------- | -------------------- |
| 張三 | http://taobao.com/xxx | 2021-05-10 10:00:00 |
| 張三 | http://taobao.com/xxx | 2021-05-10 10:00:10 |
| 張三 | http://taobao.com/xxx | 2021-05-10 10:00:49 |
| 張三 | http://taobao.com/xxx | 2021-05-10 10:01:05 |
| 張三 | http://taobao.com/xxx | 2021-05-10 10:01:58 |
| 李四 | http://taobao.com/xxx | 2021-05-10 10:02:10 |
滾動窗口運用
滾動窗口(Tumbling windows)要用Tumble類來定義,另外還有三個方法:
實現步驟:
TumbleUserClickApplication,實現代碼:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); // 將源數據寫入臨時文件並獲取絕對路徑 String contents = "張三,http://taobao.com/xxx,2021-05-10 10:00:00\n" + "張三,http://taobao.com/xxx,2021-05-10 10:00:10\n" + "張三,http://taobao.com/xxx,2021-05-10 10:00:49\n" + "張三,http://taobao.com/xxx,2021-05-10 10:01:05\n" + "張三,http://taobao.com/xxx,2021-05-10 10:01:58\n" + "張三,http://taobao.com/xxx,2021-05-10 10:02:10\n"; String path = FileUtil.createTempFile(contents); String ddl = "CREATE TABLE user_clicks (\n" + " username varchar,\n" + " click_url varchar,\n" + " ts TIMESTAMP(3),\n" + " WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'filesystem',\n" + " 'connector.path' = '" + path + "',\n" + " 'format.type' = 'csv'\n" + ")"; tabEnv.sqlUpdate(ddl); //對錶數據進行sql查詢,並將結果做爲新表進行查詢 String query = "SELECT\n" + " TUMBLE_START(ts, INTERVAL '1' MINUTE),\n" + " TUMBLE_END(ts, INTERVAL '1' MINUTE),\n" + " username,\n" + " COUNT(click_url)\n" + "FROM user_clicks\n" + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username"; Table result = tabEnv.sqlQuery(query); tabEnv.toAppendStream(result, Row.class).print(); env.execute();
以1分鐘做爲時間滾動窗口,水印延遲2秒。
輸出結果:
4> 2021-10-10T10:00,2021-10-10T10:01,張三,3 4> 2021-10-10T10:01,2021-10-10T10:02,張三,2 4> 2021-10-10T10:02,2021-10-10T10:03,張三,1
實現步驟
實現代碼
代碼HopUserClickApplication:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); // 將源數據寫入臨時文件並獲取絕對路徑 String contents = "張三,http://taobao.com/xxx,2020-10-10 10:00:00\n" + "張三,http://taobao.com/xxx,2020-10-10 10:00:10\n" + "張三,http://taobao.com/xxx,2020-10-10 10:00:49\n" + "張三,http://taobao.com/xxx,2020-10-10 10:01:05\n" + "張三,http://taobao.com/xxx,2020-10-10 10:01:58\n" + "張三,http://taobao.com/xxx,2020-10-10 10:02:10\n"; String path = FileUtil.createTempFile(contents); String ddl = "CREATE TABLE user_clicks (\n" + " username varchar,\n" + " click_url varchar,\n" + " ts TIMESTAMP(3),\n" + " WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'filesystem',\n" + " 'connector.path' = '" + path + "',\n" + " 'format.type' = 'csv'\n" + ")"; tabEnv.sqlUpdate(ddl); //對錶數據進行sql查詢,並將結果做爲新表進行查詢,每隔30秒,統計一次過去1分鐘的數據 String query = "SELECT\n" + " HOP_START(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),\n" + " HOP_END(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),\n" + " username,\n" + " COUNT(click_url)\n" + "FROM user_clicks\n" + "GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username"; Table result = tabEnv.sqlQuery(query); tabEnv.toAppendStream(result, Row.class).print(); env.execute();
每隔30秒,統計一次過去1分鐘的用戶點擊數量。
輸出結果:
4> 2021-05-10T09:59:30,2021-05-10T10:00:30,張三,2 4> 2021-05-10T10:00,2021-05-10T10:01,張三,3 4> 2021-05-10T10:00:30,2021-05-10T10:01:30,張三,2 4> 2021-05-10T10:01,2021-05-10T10:02,張三,2 4> 2021-05-10T10:01:30,2021-05-10T10:02:30,張三,2 4> 2021-05-10T10:02,2021-05-10T10:03,張三,1
實現步驟
代碼實現:
代碼:SessionUserClickApplication
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings); // 將源數據寫入臨時文件並獲取絕對路徑 String contents = "張三,http://taobao.com/xxx,2021-05-10 10:00:00\n" + "張三,http://taobao.com/xxx,2021-05-10 10:00:10\n" + "張三,http://taobao.com/xxx,2021-05-10 10:00:49\n" + "張三,http://taobao.com/xxx,2021-05-10 10:01:05\n" + "張三,http://taobao.com/xxx,2021-05-10 10:01:58\n" + "張三,http://taobao.com/xxx,2021-05-10 10:02:10\n"; String path = FileUtil.createTempFile(contents); String ddl = "CREATE TABLE user_clicks (\n" + " username varchar,\n" + " click_url varchar,\n" + " ts TIMESTAMP(3),\n" + " WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'filesystem',\n" + " 'connector.path' = '" + path + "',\n" + " 'format.type' = 'csv'\n" + ")"; tabEnv.sqlUpdate(ddl); //對錶數據進行sql查詢,並將結果做爲新表進行查詢,每隔30秒統計一次數據 String query = "SELECT\n" + " SESSION_START(ts, INTERVAL '30' SECOND),\n" + " SESSION_END(ts, INTERVAL '30' SECOND),\n" + " username,\n" + " COUNT(click_url)\n" + "FROM user_clicks\n" + "GROUP BY SESSION (ts, INTERVAL '30' SECOND), username"; Table result = tabEnv.sqlQuery(query); tabEnv.toAppendStream(result, Row.class).print(); env.execute();
每隔30秒統計一次用戶點擊數據.
輸出結果:
4> 2021-05-10T10:00,2021-05-10T10:00:40,張三,2 4> 2021-05-10T10:00:49,2021-05-10T10:01:35,張三,2 4> 2021-05-10T10:01:58,2021-05-10T10:02:40,張三,2
本文由mirson創做分享,如需進一步交流,請加QQ羣:19310171或訪問www.softart.cn