以遊戲結算日誌爲例,展現利用KSQL對日誌進行統計分析的過程。sql
cd ~/Documents/install/confluent-5.0.1/ bin/confluent start
bin/kafka-topics --list --zookeeper localhost:2181
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic score-normalized
bin/kafka-console-producer --broker-list localhost:9092 --topic score-normalized > {"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_高手1區_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic score-normalized --from-beginning ;; 能夠看到 {"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_高手1區_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}
bin/ksql http://localhost:8088
能夠看到ksql啓動後的圖標,和操做終端。bootstrap
ksql> show topics;
PRINT 'score-normalized';
能夠看到:工具
Format:STRING 19-1-5 下午11時59分31秒 , NULL , {"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}
其中:this
19-1-5 下午11時59分31秒
表示消息時間。NULL
爲消息的Key,由於是從kafka-console-producer
推送的,默認爲NULL
。CREATE STREAM SCORE_EVENT \ (epoch BIGINT, \ gameType VARCHAR, \ cost INTEGER, \ gamers ARRAY< \ STRUCT< \ username VARCHAR, \ balance BIGINT, \ delta BIGINT \ > \ >, \ gameId VARCHAR, \ tax BIGINT, \ reason VARCHAR) \ WITH ( KAFKA_TOPIC='score-normalized', \ VALUE_FORMAT='JSON', \ TIMESTAMP='epoch');
其中TIMESTAMP='epoch'
表示以epoch的時間爲事件的時間戳。命令行
DROP STREAM stream_name ;
若是有查詢語句在查詢該流,則會出現錯誤:日誌
Cannot drop USER_SCORE_EVENT. The following queries read from this source: []. The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3]. You need to terminate them before dropping USER_SCORE_EVENT.
須要用TERMINATE
命令中止這些查詢語句,而後再刪除流:code
TERMINATE CSAS_USER_SCORE_EVENT_2; TERMINATE InsertQuery_4;
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM SCORE_EVENT;
能夠看到:orm
1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手1區_200_015_185175 | null | xiayu
其中:server
tax
字段不存在。2017-12-04
日的對局總數;; 增長一個game_date字段,用於統計 CREATE STREAM SCORE_EVENT_WITH_DATE AS \ SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \ FROM SCORE_EVENT; SELECT game_date, COUNT(*) \ FROM SCORE_EVENT_WITH_DATE \ WHERE game_date = '2017-12-04' AND reason = 'game' \ GROUP BY game_date;
目前KSQL還不支持相似下面的查詢:遊戲
SELECT COUNT(*) \ FROM SCORE_EVENT \ WHERE gameId LIKE '2017-12-04_%';
USER_SCORE_EVENT
:CREATE STREAM USER_SCORE_EVENT AS \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \ FROM SCORE_EVENT;
JOIN
查詢,須要從新設置Key:CREATE STREAM USER_SCORE_EVENT_REKEY AS \ SELECT * FROM USER_SCORE_EVENT \ PARTITION BY username;
輸出:
ksql> SELECT * FROM USER_SCORE_EVENT_REKEY; 4000 | lzc | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2區_500_015_185175 | null | game | lzc | 972120 | -60 4000 | lzb | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2區_500_015_185175 | null | game | lzb | 69532 | -60
注意:
實踐過程當中發現:只有對STREAM的field進行PARTITION BY才能生效。
USER_SCORE_TABLE
:CREATE TABLE USER_SCORE_TABLE AS \ SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \ FROM USER_SCORE_EVENT_REKEY \ WHERE reason = 'game' \ GROUP BY username;
查看USER_SCORE_TABLE
全部數據:
ksql> SELECT * FROM USER_SCORE_TABLE; 1546709338711 | 70837999 | 70837999 | 4 | -240 | 0 1546709352758 | 0791754000 | 0791754000 | 4 | -240 | 0 1546709338711 | a137671268 | a137671268 | 4 | 720 | 0 1546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0
ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = '70837999';
輸出:
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
傀儡列
用於統計:CREATE TABLE USER_SCORE_WITH_TAG AS \ SELECT 1 AS tag, * FROM USER_SCORE_TABLE;
SELECT tag, COUNT(username) \ FROM USER_SCORE_WITH_TAG \ GROUP BY tag;
KSQL WINDOW 功能。