收集日誌的目的是有效的利用日誌,有效利用日誌的前提是日誌通過格式化符合咱們的要求,這樣才能真正的高效利用收集到elasticsearch平臺的日誌。默認的日誌到達elasticsearch 是原始格式,亂的讓人抓狂,這個時候你會發現Logstash filter的可愛之處,它很像一塊橡皮泥,若是咱們手巧的話就會塑造出來讓本身舒舒服服的做品,but 若是你沒搞好的話那就另說了,本文的宗旨就是帶你一塊兒飛,搞定這塊橡皮泥。當你搞定以後你會以爲kibana 的世界瞬間清爽了不少!
FIlebeat 的4大金剛
Filebeat 有4個很是重要的概念須要咱們知道,
Prospector(礦工);
Harvest (收割者);
libeat (匯聚層);
registry(註冊記錄者);
Prospector 負責探索日誌所在地,就如礦工同樣要找礦,而Harvest如礦主同樣的收割者礦工們的勞動成果,哎,世界無處不剝削啊!每一個Prospector 都有一個對應的Harvest,而後他們有一個共同的老大叫作Libbeat,這個傢伙會彙總全部的東西,而後把全部的日誌傳送給指定的客戶,這其中還有個很是重要的角色」registry「,這個傢伙至關於一個會計,它會記錄Harvest 都收割了些啥,收割到哪裏了,這樣一但有問題了以後,harvest就會跑到會計哪裏問:「上次老子的活幹到那塊了」?Registry 會告訴Harvest 你Y的上次幹到哪裏了,去哪裏接着幹就好了。這樣就避免了數據重複收集的問題!mysql
filebeat.prospectors: - input_type: log enabled: True paths: - /var/log/mysql-slow-* #這個地方是關鍵,咱們給上邊日誌加上了tags,方便在logstash裏邊經過這個tags 過濾並格式化本身想要的內容; tags: ["mysql_slow_logs"] #有的時候日誌不是一行輸出的,若是不用multiline的話,會致使一條日誌被分割成多條收集過來,造成不完整日誌,這樣的日誌對於咱們來講是沒有用的!經過正則匹配語句開頭,這樣multiline 會在匹配開頭 以後,一直到下一個這樣開通的語句合併成一條語句。 #pattern:多行日誌開始的那一行匹配的pattern #negate:是否須要對pattern條件轉置使用,不翻轉設爲true,反轉設置爲false #match:匹配pattern後,與前面(before)仍是後面(after)的內容合併爲一條日誌 #max_lines:合併的最多行數(包含匹配pattern的那一行 默認值是500行 #timeout:到了timeout以後,即便沒有匹配一個新的pattern(發生一個新的事件),也把已經匹配的日誌事件發送出去 multiline.pattern: '^\d{4}/\d{2}/\d{2}' (2018\05\01 個人匹配是已這樣的日期開頭的) multiline.negate: true multiline.match: after multiline.Max_lines:20 multiline.timeout: 10s - input_type: log paths: - /var/log/mysql-sql-* tags: ["mysql_sql_logs"] multiline.pattern: '^\d{4}/\d{2}/\d{2}' multiline.negate: true multiline.match: after multiline.timeout: 10s encoding: utf-8 document_type: mysql-proxy scan_frequency: 20s harverster_buffer_size: 16384 max_bytes: 10485760 tail_files: true #tail_files:若是設置爲true,Filebeat從文件尾開始監控文件新增內容,把新增的每一行文件做爲一個事件依次發送,而不是從文件開始處從新發送全部內容。默認是false;
input { kafka { topics_pattern => "mysql.*" bootstrap_servers => "x.x.x.x:9092" #auto_offset_rest => "latest" codec => json group_id => "logstash-g1" } } #終於到了關鍵的地方了,logstash的filter ,使用filter 過濾出來咱們想要的日誌, filter { #if 還可使用or 或者and 做爲條件語句,舉個栗子: if 「a」 or 「b」 or 「c」 in [tags],這樣就能夠過濾多個tags 的標籤了,咱們這個主要用在同型號的交換設備的日誌正規化,好比說你有5臺交換機,把日誌指定到了同一個syslog-ng 上,收集日誌的時候只能經過同一個filebeat,多個prospector加不一樣的tags。這個時候過濾就能夠經過判斷相應的tags來完成了。 if "mysql_slow_logs" in [tags]{ grok { #grok 裏邊有定義好的現場的模板你能夠用,可是更多的是自定義模板,規則是這樣的,小括號裏邊包含全部一個key和value,例子:(?<key>value),好比如下的信息,第一個我定義的key是data,表示方法爲:?<key> 前邊一個問號,而後用<>把key包含在裏邊去。value就是純正則了,這個我就不舉例子了。這個有個在線的調試庫,能夠供你們參考,http://grokdebug.herokuapp.com/ match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"} #過濾完成以後把以前的message 移出掉,節約磁盤空間。 remove_field => ["message"] } } else if "mysql_sql_logs" in [tags]{ grok { match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+\.\d+)\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"} remove_field => ["message"]} } }
2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[]; 過濾後的結果以下: { "date": [ [ "2018/05/01 16:16:01.892" ] ], "datetime": [ [ "16:16:01.892" ] ], "TIME": [ [ "16:16:01.892" ] ], "HOUR": [ [ "16" ] ], "MINUTE": [ [ "16" ] ], "SECOND": [ [ "01.892" ] ], "status": [ [ "OK" ] ], "respond_time": [ [ "759" ] ], "client": [ [ "172.29.1.7" ] ], "IPV6": [ [ null, null ] ], "IPV4": [ [ "172.29.1.7", "172.7.1.39" ] ], "client-port": [ [ "35184" ] ], "server": [ [ "172.7.1.39" ] ], "server-port": [ [ "3306" ] ], "databases": [ [ "<DB>" ] ], "SQL": [ [ "select count(*) from test[];" ] ] }
#有圖有真相:sql