在生產環境中,各個業務服務產生的事件
都會被push到Kafka
消息中間件中。如:充值中心的 充值事件 會被push到kafka的recharge
topic中,玩家 結算事件 會被push到kafka的game_score
topic中。html
平臺但願經過處理,實時分析這些事件,篩選出知足條件的一些玩家,對其獎勵相應道具。如,想作一個針對不一樣充值金額的玩家獎勵不一樣道具的活動:sql
0 < 充值金額 < 100
時, 獎勵一個10萬金幣卡
道具(道具ID:"10w")100 <= 充值金額
時,獎勵一個100萬金幣卡
道具(道具ID:"100w")將充值的事件(原始日誌數據,JSON格式),推送到Kafka的recharge
topic中。充值事件數據格式:數據結構
{"event_type" : "cash_order", "username" : "foo", "channel" : "wx_scan", "cash" : 100 }
kafka中新建一個PROPREWARD
的topic,專門接收道具獎勵
的事件,該主題事件消息格式爲:ide
{"user/name" : "foo" "prop/id" : "道具ID" "reward/reason" : "獎勵的緣由"}
KSQL
建立兩個派生流(Stream),分別從recharge
topic中過濾出0 < 充值金額 < 100
和 100 <= 充值金額
的事件,過濾出符合條件的用戶名,並組裝成約定的道具獎勵事件
,將其推送到Kafka的PROPREWARD
topic中。新建一個kafka topic : PROPREWARD (大寫),用於接收和存儲道具獎勵事件
。大數據
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic PROPREWARD
從kafka topic PROPREWARD
建立一個流(PROPREWARD
),用於輸出道具獎勵事件,注意:流的名字與kafka topic的名字相同。ui
CREATE STREAM PROPREWARD (`user/name` varchar, `seed/id` varchar, `reward/reason` varchar) \ WITH (kafka_topic='PROPREWARD', value_format='JSON');
根據業務須要,新建一個查詢規則爲0 < 充值金額 < 100
的派生流並插入到PROPREWARD
流中。設計
INSERT INTO PROPREWARD \ SELECT username AS `user/name` , '10w' AS `prop/id`, '充值100元之內獎勵10萬金幣卡道具' AS `reward/reason` \ FROM recharge \ WHERE EVENT_TYPE = 'cash_order' AND CASH > 0 AND CASH <= 100;
根據業務須要,新建一個查詢規則爲 100 <= 充值金額
的派生流並插入到PROPREWARD
流中。日誌
INSERT INTO PROPREWARD \ SELECT username AS `user/name` , '100w' AS `prop/id`, '充值超過100元獎勵100萬金幣卡道具' AS `reward/reason` \ FROM recharge \ WHERE EVENT_TYPE = 'cash_order' AND CASH >= 100;
PROPREWARD
流中。往kafka的recharge
topic中寫入數據:code
{"event_type" : "cash_order", "username" : "foo", "channel" : "wx_scan", "cash" : 9 }
能夠在topic PROPREWARD
中接收到事件:orm
{"user/name" : "foo" , "prop/id" : "10w" "reward/reason" : "充值100元之內獎勵10萬金幣卡道具"}
往kafka的 recharge
topic中寫入數據:
{"event_type" : "cash_order", "username" : "foo", "channel" : "wx_scan", "cash" : 11 }
能夠在topic PROPREWARD
中接收到事件:
{"user/name" : "foo" , "prop/id" : "100w", "reward/reason" : "充值100元以上獎勵100萬金幣卡道具"}
要想將上述兩個派生流插入(INSERT INTO)到輸出結果的PROPREWARD
流中,須要確保:
PROPREWARD
的名字與輸出結果的Kafka topic名字相同,不然會拋出異常。這應該是KSQL 5.0.0 的一個BUG。user/name
, '100w' AS prop/id
, '充值超過100元獎勵100萬金幣卡道具' AS reward/reason
) 與 流 PROPREWARD
定義的結構相同。活動
——根據各類不一樣的事件和規則,獎勵不一樣的道具(或者其餘類型的東西),而不須要額外的代碼開發!!