【大數據實踐】遊戲事件處理系統(2)——事件處理-logstash

前言

在以前的文章【大數據實踐】遊戲事件處理系統(1)——事件收集-filebeat中,對本系統的背景、目標及技術方案進行了概述,並利用filebeat收集到日誌,發送到logstash。所以,本文章將對logstash如何接收、處理、輸出事件進行介紹。html

logstash根據配置文件****.conf(如game-score.conf)對每一個事件執行輸入、處理、輸出過程,logstash爲每個過程提供了豐富的插件。配置文件的大致結構以下:web

input {}
filter {}
output {}
  • input表示事件輸入。
  • filter表示事件處理過程。
  • output表示事件輸出。

事件輸入

假設日誌文件存在目錄地址爲:/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.logjson

其內容爲:segmentfault

2015-11-02 14:26:53,355 DEBUG [IScoreService] service.score.IScoreService.recordScore Arguments:[SSZ game result. gameId : 2015-11-02_14:26:37_新手入門_1_002_512 tax : 0, [Lservice.score.GameResultBean;@1a4347b9] Returns: [snailiu,999979438,15]cost : 26

logstash的配置文件中,輸入模塊的配置內容以下:api

input {
    file {
        path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log"
        type => "game_score"
        start_position => "beginning"
        codec => plain {
            charset => "GBK"
        }
    }
    
    ## 來自於filebeat的事件做爲輸入
    beats {
        ## 端口與filebeat中filebeat.yml文件output中配置的端口一致。
        port => 5044
    }
}

日誌文件做爲輸入,須要使用的file插件(input插件列表)。input中能夠有多個file,用於同時收集多個日誌文件(參考官方文檔)。其中:數組

  • path表示文件路徑。
  • type表示該輸入事件的類型,可自由定義。
  • start_position表示開始位置,設爲beginning表示需從文件頭導入舊的事件。
  • codec表示在事件進入input前要使用的編碼處理。若日誌文件使用GBK編碼,那麼就須要在codec指定,以便後續處理過程當中,再也不須要處理編碼問題。

接收filebeat發送過來的數據時,須要使用到beats插件。ruby

事件處理

每個事件都會通過filter中的處理過程(從上到下處理)。可利用多個filter 插件來完成處理過程。filter示例以下:ide

filter {
    ## 將非結構化數據轉爲結構化數據
    grok {
        ## 自定義匹配模式文件地址。
        patterns_dir => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/game_score_patterns"
        ## 根據自定義的匹配模式,匹配數據。數組表示匹配多個模式。
        match => [
            "message","%{DC_SCORE_NORMAL}",
            "message","%{DC_SCORE_MODE}"
        ]
        ## 刪掉一些不要的字段。
        remove_field => [host,path,message,svr_type,gm_gr,user1_balance,user2_balance,user3_balance,user4_balance,cost,svr_method,type,money]
        ## 添加字段:將非結構化數據`game_id`中的內容提取出來,轉爲結構化數據,並kv對加入到map結構中。
        add_field => {"game_id" => "%{game_date}_%{game_time}_%{bet_name}_%{bet_count}_%{room_id}_%{desk_id}"}
    }

    ## 若是解析出錯,則拋棄該事件。
    if "_grokparsefailure" in [tags] {
        drop { }
    }

    
    mutate {
        ## 對map中數據進行類型轉換。
        convert => [
        "tax" , "integer" ,
        "user1_delta" , "integer" ,
        "user2_delta" , "integer" ,
        "user3_delta" , "integer" ,
        "user4_delta" , "integer"
        ]
        ## 字符串替換
        gsub => [
        "user1_name", "[\\]", "",
        "user2_name", "[\\]", "",
        "user3_name", "[\\]", "",
        "user4_name", "[\\]", ""
        ]
    }

    ## ruby插件,支持ruby腳本
    ruby {
      ## ruby腳本文件路徑
      path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/drop_percentage.rb"
      
      ## ruby腳本程序的輸入參數
      script_params => { "PDK" => ",20,30,1,"}
    }

    ## 若出錯,則丟棄該事件
    if "_jsonparsefailure" in [tags] {
        drop { }
    }
    
    ## 日期處理插件
    date {
        match => [ "date_stamp", "YY-MM-dd HH:mm:ss,SSS" ]
        timezone => "Asia/Shanghai"
        remove_field => ["@version","@timestamp","server_tag","date_stamp","tmp_users","user1_name","user2_name","user3_name","user4_name","user1_delta","user2_delta","user3_delta","user4_delta"]
    }
}

其中,用到的filter插件有:post

  • grok:將非結構化日誌數據解析爲結構化數據的插件,很是經常使用。支持自定義一些解析模式(patterns),如game_score_patterns文件:大數據

    DC_SCORE_USER_1 %{DATA:user1_name},%{INT:user1_balance},%{INT:user1_delta}
    DC_SCORE_USER_2 %{DC_SCORE_USER_1}, %{DATA:user2_name},%{INT:user2_balance},%{INT:user2_delta}
    DC_SCORE_USER_3 %{DC_SCORE_USER_2}, %{DATA:user3_name},%{INT:user3_balance},%{INT:user3_delta}
    DC_SCORE_USER_4 %{DC_SCORE_USER_3}, %{DATA:user4_name},%{INT:user4_balance},%{INT:user4_delta}
    DC_SCORE_USERS %{DC_SCORE_USER_4}|%{DC_SCORE_USER_3}|%{DC_SCORE_USER_2}|%{DC_SCORE_USER_1}
    DC_SCORE_NORMAL %{DATA:date_stamp} DEBUG \[%{DATA:svr_type}\] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:\[%{DATA:server_tag} game result. gameId(( : pdk %{DATA:score_type})|( : sreqw %{DATA:score_type})|) : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} tax : %{INT:tax}, %{DATA:gm_gr}\] Returns: \[%{DC_SCORE_USERS}\]cost : %{INT:cost}
    DC_SCORE_MODE %{DATA:date_stamp} DEBUG \[%{DATA:svr_type}\] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:\[%{DATA:score_type} game result. gameId : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} user: %{DATA:username} get prize:%{INT:money}, \[%{DATA:gm_gr}\] Returns: \[%{DC_SCORE_USERS}\]cost : %{INT:cost}

    經過match => ... 從非結構化的日誌中,提取出想要的數據,併爲其指定關鍵字命名,轉爲map結構。

  • mutate:數據轉換插件,支持對字段內容進行格式轉換、類型轉換等。
  • ruby:當你感受其餘插件都不夠靈活,須要你經過代碼自由處理的時候,可使用ruby插件,編寫事件處理腳本。其中,ruby腳本的輸入事件即爲上面mutate處理以後的結果事件。ruby中對事件的處理過程可參照event api
  • date:日期處理插件,如轉換時區,轉換時間格式。

事件輸出

時間通過filter以後,轉化爲結構化的JSON格式。能夠經過各類輸出插件將其輸出到指定位置(或服務)。如:

## 輸出到標準輸出。
stdout { 
    codec => rubydebug 
    }

## 輸出到http服務
http {
    http_method => "post"
    url => "http://127.0.0.1:9090"
}

## 輸出到kafka
kafka {
    codec => json
    topic_id => "mytopic"
}

其中,示例的三個插件分別爲:

  • stdout:將事件輸出到標準輸出,可用於調試。
  • http:將事件輸出到其餘web服務。
  • kafka:將處理後的事件輸出到kafka消息中間件。

小結

至此,利用logstash,完成了日誌事件的收集和處理的過程,因本系統是搭配kafka消息中間件工做的,所以,output中使用kafka插件,後續需根據具體狀況,完善output中kafka模塊的編寫。

相關文章
相關標籤/搜索