ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌

先扯點沒用的

收集日誌的目的是有效的利用日誌,有效利用日誌的前提是日誌通過格式化符合咱們的要求,這樣才能真正的高效利用收集到elasticsearch平臺的日誌。默認的日誌到達elasticsearch 是原始格式,亂的讓人抓狂,這個時候你會發現Logstash filter的可愛之處,它很像一塊橡皮泥,若是咱們手巧的話就會塑造出來讓本身舒舒服服的做品,but 若是你沒搞好的話那就另說了,本文的宗旨就是帶你一塊兒飛,搞定這塊橡皮泥。當你搞定以後你會以爲kibana 的世界瞬間清爽了不少!
FIlebeat 的4大金剛
Filebeat 有4個很是重要的概念須要咱們知道,
Prospector(礦工);
Harvest (收割者);
libeat (匯聚層);
registry(註冊記錄者);
Prospector 負責探索日誌所在地,就如礦工同樣要找礦,而Harvest如礦主同樣的收割者礦工們的勞動成果,哎,世界無處不剝削啊!每一個Prospector 都有一個對應的Harvest,而後他們有一個共同的老大叫作Libbeat,這個傢伙會彙總全部的東西,而後把全部的日誌傳送給指定的客戶,這其中還有個很是重要的角色」registry「,這個傢伙至關於一個會計,它會記錄Harvest 都收割了些啥,收割到哪裏了,這樣一但有問題了以後,harvest就會跑到會計哪裏問:「上次老子的活幹到那塊了」?Registry 會告訴Harvest 你Y的上次幹到哪裏了,去哪裏接着幹就好了。這樣就避免了數據重複收集的問題!mysql

FIlebeat 詳細配置:

filebeat.prospectors:
- input_type: log
  enabled: True
  paths:
    - /var/log/mysql-slow-*
#這個地方是關鍵,咱們給上邊日誌加上了tags,方便在logstash裏邊經過這個tags 過濾並格式化本身想要的內容;
  tags: ["mysql_slow_logs"]
#有的時候日誌不是一行輸出的,若是不用multiline的話,會致使一條日誌被分割成多條收集過來,造成不完整日誌,這樣的日誌對於咱們來講是沒有用的!經過正則匹配語句開頭,這樣multiline 會在匹配開頭
以後,一直到下一個這樣開通的語句合併成一條語句。
#pattern:多行日誌開始的那一行匹配的pattern
#negate:是否須要對pattern條件轉置使用,不翻轉設爲true,反轉設置爲false
#match:匹配pattern後,與前面(before)仍是後面(after)的內容合併爲一條日誌
#max_lines:合併的最多行數(包含匹配pattern的那一行 默認值是500行
#timeout:到了timeout以後,即便沒有匹配一個新的pattern(發生一個新的事件),也把已經匹配的日誌事件發送出去
  multiline.pattern: '^\d{4}/\d{2}/\d{2}'  (2018\05\01  個人匹配是已這樣的日期開頭的)
  multiline.negate: true
  multiline.match: after
  multiline.Max_lines:20
  multiline.timeout: 10s
- input_type: log
  paths:
    - /var/log/mysql-sql-*
  tags: ["mysql_sql_logs"]
  multiline.pattern: '^\d{4}/\d{2}/\d{2}'
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 10s
  encoding: utf-8
  document_type: mysql-proxy
  scan_frequency: 20s
  harverster_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
 #tail_files:若是設置爲true,Filebeat從文件尾開始監控文件新增內容,把新增的每一行文件做爲一個事件依次發送,而不是從文件開始處從新發送全部內容。默認是false;

Logstash 的詳細配置

input {
  kafka {
    topics_pattern => "mysql.*"
    bootstrap_servers => "x.x.x.x:9092"
    #auto_offset_rest => "latest"
    codec => json
    group_id => "logstash-g1"
   }
}
#終於到了關鍵的地方了,logstash的filter ,使用filter 過濾出來咱們想要的日誌,
filter {
     #if 還可使用or 或者and 做爲條件語句,舉個栗子: if 「a」 or 「b」  or 「c」 in [tags],這樣就能夠過濾多個tags 的標籤了,咱們這個主要用在同型號的交換設備的日誌正規化,好比說你有5臺交換機,把日誌指定到了同一個syslog-ng 上,收集日誌的時候只能經過同一個filebeat,多個prospector加不一樣的tags。這個時候過濾就能夠經過判斷相應的tags來完成了。
    if "mysql_slow_logs" in [tags]{ 
    grok {
     #grok 裏邊有定義好的現場的模板你能夠用,可是更多的是自定義模板,規則是這樣的,小括號裏邊包含全部一個key和value,例子:(?<key>value),好比如下的信息,第一個我定義的key是data,表示方法爲:?<key> 前邊一個問號,而後用<>把key包含在裏邊去。value就是純正則了,這個我就不舉例子了。這個有個在線的調試庫,能夠供你們參考,http://grokdebug.herokuapp.com/ 
        match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
#過濾完成以後把以前的message 移出掉,節約磁盤空間。
        remove_field => ["message"]
    }
  }
    else if "mysql_sql_logs" in [tags]{
    grok {
        match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+\.\d+)\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
        remove_field => ["message"]}
  }
}

我要過濾的日誌樣本:

2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[];
過濾後的結果以下:
{
  "date": [
    [
      "2018/05/01 16:16:01.892"
    ]
  ],
  "datetime": [
    [
      "16:16:01.892"
    ]
  ],
  "TIME": [
    [
      "16:16:01.892"
    ]
  ],
  "HOUR": [
    [
      "16"
    ]
  ],
  "MINUTE": [
    [
      "16"
    ]
  ],
  "SECOND": [
    [
      "01.892"
    ]
  ],
  "status": [
    [
      "OK"
    ]
  ],
  "respond_time": [
    [
      "759"
    ]
  ],
  "client": [
    [
      "172.29.1.7"
    ]
  ],
  "IPV6": [
    [
      null,
      null
    ]
  ],
  "IPV4": [
    [
      "172.29.1.7",
      "172.7.1.39"
    ]
  ],
  "client-port": [
    [
      "35184"
    ]
  ],
  "server": [
    [
      "172.7.1.39"
    ]
  ],
  "server-port": [
    [
      "3306"
    ]
  ],
  "databases": [
    [
      "<DB>"
    ]
  ],
  "SQL": [
    [
      "select count(*) from test[];"
    ]
  ]
}

#有圖有真相:
ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌sql

相關文章
相關標籤/搜索