在以前的文章【大數據實踐】遊戲事件處理系統(1)——事件收集-filebeat中,對本系統的背景、目標及技術方案進行了概述,並利用filebeat收集到日誌,發送到logstash。所以,本文章將對logstash如何接收、處理、輸出事件進行介紹。html
logstash根據配置文件****.conf
(如game-score.conf)對每一個事件執行輸入、處理、輸出過程,logstash爲每個過程提供了豐富的插件。配置文件的大致結構以下:web
input {} filter {} output {}
假設日誌文件存在目錄地址爲:/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log
。json
其內容爲:segmentfault
2015-11-02 14:26:53,355 DEBUG [IScoreService] service.score.IScoreService.recordScore Arguments:[SSZ game result. gameId : 2015-11-02_14:26:37_新手入門_1_002_512 tax : 0, [Lservice.score.GameResultBean;@1a4347b9] Returns: [snailiu,999979438,15]cost : 26
logstash的配置文件中,輸入模塊的配置內容以下:api
input { file { path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log" type => "game_score" start_position => "beginning" codec => plain { charset => "GBK" } } ## 來自於filebeat的事件做爲輸入 beats { ## 端口與filebeat中filebeat.yml文件output中配置的端口一致。 port => 5044 } }
日誌文件做爲輸入,須要使用的file
插件(input插件列表)。input中能夠有多個file
,用於同時收集多個日誌文件(參考官方文檔)。其中:數組
path
表示文件路徑。type
表示該輸入事件的類型,可自由定義。start_position
表示開始位置,設爲beginning表示需從文件頭導入舊的事件。codec
表示在事件進入input前要使用的編碼處理。若日誌文件使用GBK編碼,那麼就須要在codec
指定,以便後續處理過程當中,再也不須要處理編碼問題。接收filebeat
發送過來的數據時,須要使用到beats插件。ruby
每個事件都會通過filter中的處理過程(從上到下處理)。可利用多個filter 插件來完成處理過程。filter示例以下:ide
filter { ## 將非結構化數據轉爲結構化數據 grok { ## 自定義匹配模式文件地址。 patterns_dir => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/game_score_patterns" ## 根據自定義的匹配模式,匹配數據。數組表示匹配多個模式。 match => [ "message","%{DC_SCORE_NORMAL}", "message","%{DC_SCORE_MODE}" ] ## 刪掉一些不要的字段。 remove_field => [host,path,message,svr_type,gm_gr,user1_balance,user2_balance,user3_balance,user4_balance,cost,svr_method,type,money] ## 添加字段:將非結構化數據`game_id`中的內容提取出來,轉爲結構化數據,並kv對加入到map結構中。 add_field => {"game_id" => "%{game_date}_%{game_time}_%{bet_name}_%{bet_count}_%{room_id}_%{desk_id}"} } ## 若是解析出錯,則拋棄該事件。 if "_grokparsefailure" in [tags] { drop { } } mutate { ## 對map中數據進行類型轉換。 convert => [ "tax" , "integer" , "user1_delta" , "integer" , "user2_delta" , "integer" , "user3_delta" , "integer" , "user4_delta" , "integer" ] ## 字符串替換 gsub => [ "user1_name", "[\\]", "", "user2_name", "[\\]", "", "user3_name", "[\\]", "", "user4_name", "[\\]", "" ] } ## ruby插件,支持ruby腳本 ruby { ## ruby腳本文件路徑 path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/drop_percentage.rb" ## ruby腳本程序的輸入參數 script_params => { "PDK" => ",20,30,1,"} } ## 若出錯,則丟棄該事件 if "_jsonparsefailure" in [tags] { drop { } } ## 日期處理插件 date { match => [ "date_stamp", "YY-MM-dd HH:mm:ss,SSS" ] timezone => "Asia/Shanghai" remove_field => ["@version","@timestamp","server_tag","date_stamp","tmp_users","user1_name","user2_name","user3_name","user4_name","user1_delta","user2_delta","user3_delta","user4_delta"] } }
其中,用到的filter插件
有:post
grok:將非結構化日誌數據解析爲結構化數據的插件,很是經常使用。支持自定義一些解析模式(patterns),如game_score_patterns
文件:大數據
DC_SCORE_USER_1 %{DATA:user1_name},%{INT:user1_balance},%{INT:user1_delta} DC_SCORE_USER_2 %{DC_SCORE_USER_1}, %{DATA:user2_name},%{INT:user2_balance},%{INT:user2_delta} DC_SCORE_USER_3 %{DC_SCORE_USER_2}, %{DATA:user3_name},%{INT:user3_balance},%{INT:user3_delta} DC_SCORE_USER_4 %{DC_SCORE_USER_3}, %{DATA:user4_name},%{INT:user4_balance},%{INT:user4_delta} DC_SCORE_USERS %{DC_SCORE_USER_4}|%{DC_SCORE_USER_3}|%{DC_SCORE_USER_2}|%{DC_SCORE_USER_1} DC_SCORE_NORMAL %{DATA:date_stamp} DEBUG \[%{DATA:svr_type}\] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:\[%{DATA:server_tag} game result. gameId(( : pdk %{DATA:score_type})|( : sreqw %{DATA:score_type})|) : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} tax : %{INT:tax}, %{DATA:gm_gr}\] Returns: \[%{DC_SCORE_USERS}\]cost : %{INT:cost} DC_SCORE_MODE %{DATA:date_stamp} DEBUG \[%{DATA:svr_type}\] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:\[%{DATA:score_type} game result. gameId : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} user: %{DATA:username} get prize:%{INT:money}, \[%{DATA:gm_gr}\] Returns: \[%{DC_SCORE_USERS}\]cost : %{INT:cost}
經過match => ...
從非結構化的日誌中,提取出想要的數據,併爲其指定關鍵字命名,轉爲map結構。
mutate
處理以後的結果事件。ruby中對事件
的處理過程可參照event api。時間通過filter以後,轉化爲結構化的JSON格式。能夠經過各類輸出插件將其輸出到指定位置(或服務)。如:
## 輸出到標準輸出。 stdout { codec => rubydebug } ## 輸出到http服務 http { http_method => "post" url => "http://127.0.0.1:9090" } ## 輸出到kafka kafka { codec => json topic_id => "mytopic" }
其中,示例的三個插件分別爲:
至此,利用logstash,完成了日誌事件的收集和處理的過程,因本系統是搭配kafka
消息中間件工做的,所以,output中使用kafka插件,後續需根據具體狀況,完善output中kafka
模塊的編寫。