【kafka KSQL】遊戲日誌統計分析(1)

【kafka KSQL】遊戲日誌統計分析(1)

以遊戲結算日誌爲例,展現利用KSQL對日誌進行統計分析的過程。sql

啓動confluent

cd ~/Documents/install/confluent-5.0.1/

bin/confluent start

查看kafka主題列表

bin/kafka-topics --list --zookeeper localhost:2181

建立接受遊戲結算日誌的topic

bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic score-normalized

使用生產者命令行工具往topic中寫日誌

bin/kafka-console-producer --broker-list localhost:9092 --topic score-normalized

> 

{"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_高手1區_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}

使用消費者命令行工具查看日誌是否正常寫入

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic score-normalized --from-beginning

;; 能夠看到

{"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_高手1區_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}

啓動KSQL客戶端

bin/ksql http://localhost:8088

能夠看到ksql啓動後的圖標,和操做終端。bootstrap

ksql終端查看kafka topic列表

ksql> show topics;

打印topic中的消息

PRINT 'score-normalized';

能夠看到:工具

Format:STRING
19-1-5 下午11時59分31秒 , NULL , {"cost":7, "epoch":1512342568296,"gameId":"2017-12-04_07:09:28_\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175","gameType":"situan","gamers": [{"balance":4405682,"delta":-60,"username":"0791754000"}, {"balance":69532,"delta":-60,"username":"70837999"}, {"balance":972120,"delta":-60,"username":"abc6378303"}, {"balance":23129,"delta":180,"username":"a137671268"}],"reason":"xiayu"}

其中:this

  • 第一個逗號19-1-5 下午11時59分31秒表示消息時間。
  • 第二個逗號NULL爲消息的Key,由於是從kafka-console-producer推送的,默認爲NULL
  • 後面的就是推送過來的消息內容。

從topic score-normalized建立一個Stream

CREATE STREAM SCORE_EVENT \
 (epoch BIGINT, \
  gameType VARCHAR, \
  cost INTEGER, \
  gamers ARRAY< \
              STRUCT< \
                      username VARCHAR, \
                      balance BIGINT, \
                      delta BIGINT \
                      > \
               >, \
  gameId VARCHAR, \
  tax BIGINT, \
  reason VARCHAR) \
  WITH ( KAFKA_TOPIC='score-normalized', \
         VALUE_FORMAT='JSON', \
         TIMESTAMP='epoch');

其中TIMESTAMP='epoch'表示以epoch的時間爲事件的時間戳。命令行

刪除一個STREAM

DROP  STREAM stream_name ;

若是有查詢語句在查詢該流,則會出現錯誤:日誌

Cannot drop USER_SCORE_EVENT. 
The following queries read from this source: []. 
The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3]. 
You need to terminate them before dropping USER_SCORE_EVENT.

須要用TERMINATE命令中止這些查詢語句,而後再刪除流:code

TERMINATE CSAS_USER_SCORE_EVENT_2;
TERMINATE InsertQuery_4;

從最先記錄開始查詢

ksql> SET 'auto.offset.reset' = 'earliest';

從Stream中查詢全部數據

ksql> SELECT * FROM SCORE_EVENT;

能夠看到:orm

1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手1區_200_015_185175 | null | xiayu

其中:server

  • 第1列爲記錄的時間戳。
  • 第2列爲記錄的key。
  • 第3列之後就是消息中的各個字段的值,對應建立流時的順序。
  • 倒數第2列的null,是由於消息中tax字段不存在。

統計2017-12-04日的對局總數

;; 增長一個game_date字段,用於統計
CREATE STREAM SCORE_EVENT_WITH_DATE AS \
    SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \
    FROM SCORE_EVENT;
    
SELECT game_date, COUNT(*) \
    FROM SCORE_EVENT_WITH_DATE \
    WHERE game_date = '2017-12-04' AND reason = 'game' \
    GROUP BY game_date;

目前KSQL還不支持相似下面的查詢:遊戲

SELECT COUNT(*) \
  FROM SCORE_EVENT \
  WHERE gameId LIKE '2017-12-04_%';

統計參與對局的總玩家數(去重)

由於一條日誌中包含多個玩家的對局信息,因此想法把每一個玩家拆分紅單獨的事件

  • 整合各個玩家的事件到一個統一的流USER_SCORE_EVENT
CREATE STREAM USER_SCORE_EVENT AS \
    SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \
    FROM SCORE_EVENT;
    
INSERT INTO USER_SCORE_EVENT \
    SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \
    FROM SCORE_EVENT;
    
INSERT INTO USER_SCORE_EVENT \
    SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \
    FROM SCORE_EVENT;
    
INSERT INTO USER_SCORE_EVENT \
    SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \
    FROM SCORE_EVENT;
  • 爲了後續用於玩家名username的鏈接JOIN查詢,須要從新設置Key:
CREATE STREAM USER_SCORE_EVENT_REKEY AS \ 
SELECT * FROM USER_SCORE_EVENT \
PARTITION BY username;

輸出:

ksql> SELECT * FROM USER_SCORE_EVENT_REKEY;


4000 | lzc | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2區_500_015_185175 | null | game | lzc | 972120 | -60
4000 | lzb | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2區_500_015_185175 | null | game | lzb | 69532 | -60

注意:

實踐過程當中發現:只有對STREAM的field進行PARTITION BY才能生效。

  • 統計各個玩家總的對局數、輸贏總數、貢獻的總稅收,並以此建立一個表USER_SCORE_TABLE
CREATE TABLE USER_SCORE_TABLE AS \
    SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \
    FROM USER_SCORE_EVENT_REKEY \
    WHERE reason = 'game' \
    GROUP BY username;

查看USER_SCORE_TABLE全部數據:

ksql> SELECT * FROM USER_SCORE_TABLE;
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
1546709352758 | 0791754000 | 0791754000 | 4 | -240 | 0
1546709338711 | a137671268 | a137671268 | 4 | 720 | 0
1546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0
  • 查詢某個玩家的對局數、輸贏總數、貢獻的總稅收:
ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = '70837999';

輸出:

1546709338711 | 70837999 | 70837999 | 4 | -240 | 0

統計玩家總數(去重)

  • 添加一個傀儡列用於統計:
CREATE TABLE USER_SCORE_WITH_TAG AS \
    SELECT 1 AS tag, * FROM USER_SCORE_TABLE;
  • 統計去重後的玩家總數
SELECT tag, COUNT(username) \
FROM USER_SCORE_WITH_TAG \
GROUP BY tag;

KSQL WINDOW 功能。

相關文章
相關標籤/搜索