ELK構建MySQL慢日誌收集平臺詳解

轉載自:https://mp.weixin.qq.com/s/umH7ImZZVhdfgMdZ3Hz5fAmysql

ELK介紹

ELK最先是Elasticsearch(如下簡稱ES)、Logstash、Kibana三款開源軟件的簡稱,三款軟件後來被同一公司收購,並加入了Xpark、Beats等組件,更名爲Elastic Stack,成爲如今最流行的開源日誌解決方案,雖然有了新名字但你們依然喜歡叫她ELK,如今所說的ELK就指的是基於這些開源軟件構建的日誌系統。web

咱們收集mysql慢日誌的方案以下:正則表達式

 

  • mysql服務器安裝Filebeat做爲agent收集slowLogsql

  • Filebeat讀取mysql慢日誌文件作簡單過濾傳給Kafka集羣數據庫

  • Logstash讀取Kafka集羣數據並按字段拆分後轉成JSON格式存入ES集羣json

  • Kibana讀取ES集羣數據展現到web頁面上bootstrap

慢日誌分類

目前主要使用的mysql版本有5.五、5.6和5.7,通過仔細對比發現每一個版本的慢查詢日誌都稍有不一樣,以下:服務器

5.5版本慢查詢日誌session

# Time: 180810  8:45:12
# User@Host: select[select] @  [10.63.253.59]
# Query_time: 1.064555  Lock_time: 0.000054 Rows_sent: 1  Rows_examined: 319707
SET timestamp=1533861912;
SELECT COUNT(*) FROM hs_forum_thread t  WHERE t.`fid`='50' AND t.`displayorder`>='0';

5.6版本慢查詢日誌elasticsearch

# Time: 160928 18:36:08
# User@Host: root[root] @ localhost []  Id:  4922
# Query_time: 5.207662  Lock_time: 0.000085 Rows_sent: 1  Rows_examined: 526068
use db_name;
SET timestamp=1475058968;
select count(*) from redeem_item_consume where id<=526083;

5.7版本慢查詢日誌

# Time: 2018-07-09T10:04:14.666231Z
# User@Host: bbs_code[bbs_code] @  [10.82.9.220]  Id: 9304381
# Query_time: 5.274805  Lock_time: 0.000052 Rows_sent: 0  Rows_examined: 2
SET timestamp=1531130654;
SELECT * FROM pre_common_session WHERE  sid='Ba1cSC'  OR lastactivity<1531129749;

慢查詢日誌異同點:

  1. 每一個版本的Time字段格式都不同

  2. 相較於5.六、5.7版本,5.5版本少了Id字段

  3. use db語句不是每條慢日誌都有的

  4. 可能會出現像下邊這樣的狀況,慢查詢塊# Time:下可能跟了多個慢查詢語句

# Time: 160918  2:00:03
# User@Host: dba_monitor[dba_monitor] @  [10.63.144.82]  Id:   968
# Query_time: 0.007479  Lock_time: 0.000181 Rows_sent: 172  Rows_examined: 344
SET timestamp=1474135203;
SELECT table_schema as 'DB',table_name as 'TABLE',CONCAT(ROUND(( data_length + index_length ) / ( 1024 * 1024 *1024 ), 2), '') as 'TOTAL',TABLE_COMMENT  FROM information_schema.TABLES ORDER BY data_length + index_length DESC;
# User@Host: dba_monitor[dba_monitor] @  [10.63.144.82]  Id:   969
# Query_time: 0.003303  Lock_time: 0.000395 Rows_sent: 233  Rows_examined: 233
SET timestamp=1474135203;
select TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION,COLUMN_TYPE,ifnull(COLUMN_COMMENT,0) from COLUMNS where table_schema not in ('mysql','information_schema','performance_schema','test');

處理思路

上邊咱們已經分析了各個版本慢查詢語句的構成,接下來咱們就要開始收集這些數據了,究竟應該怎麼收集呢?

  1. 拼裝日誌行:mysql的慢查詢日誌多行構成了一條完整的日誌,日誌收集時要把這些行拼裝成一條日誌傳輸與存儲。

  2. Time行處理:# Time:開頭的行可能不存在,且咱們能夠經過SET timestamp這個值來肯定SQL執行時間,因此選擇過濾丟棄Time行

  3. 一條完整的日誌:最終將以# User@Host:開始的行,和以SQL語句結尾的行合併爲一條完整的慢日誌語句

  4. 肯定SQL對應的DB:use db這一行不是全部慢日誌SQL都存在的,因此不能經過這個來肯定SQL對應的DB,慢日誌中也沒有字段記錄DB,因此這裏建議爲DB建立帳號時添加db name標識,例如咱們的帳號命名方式爲:projectName_dbName,這樣看到帳號名就知道是哪一個DB了

  5. 肯定SQL對應的主機:我想經過日誌知道這條SQL對應的是哪臺數據庫服務器怎麼辦?慢日誌中一樣沒有字段記錄主機,能夠經過filebeat注入字段來解決,例如咱們給filebeat的name字段設置爲服務器IP,這樣最終經過beat.name這個字段就能夠肯定SQL對應的主機了

Filebeat配置

filebeat完整的配置文件以下:

filebeat.prospectors:

- input_type: log
  paths:
    - /home/opt/data/slow/mysql_slow.log

  exclude_lines: ['^\# Time']

  multiline.pattern: '^\# Time|^\# User'
  multiline.negate: true
  multiline.match: after

  tail_files: true

name: 10.82.9.89

output.kafka:
  hosts: ["10.82.9.202:9092","10.82.9.203:9092","10.82.9.204:9092"]
  topic: mysql_slowlog_v2

重要參數解釋:

  • input_type:指定輸入的類型是log或者是stdin

  • paths:慢日誌路徑,支持正則好比/data/*.log

  • exclude_lines:過濾掉# Time開頭的行

  • multiline.pattern:匹配多行時指定正則表達式,這裏匹配以# Time或者# User開頭的行,Time行要先匹配再過濾

  • multiline.negate:定義上邊pattern匹配到的行是否用於多行合併,也就是定義是否是做爲日誌的一部分

  • multiline.match:定義如何將皮排行組合成時間,在以前或者以後

  • tail_files:定義是從文件開頭讀取日誌仍是結尾,這裏定義爲true,從如今開始收集,以前已存在的無論

  • name:設置filebeat的名字,若是爲空則爲服務器的主機名,這裏咱們定義爲服務器IP

  • output.kafka:配置要接收日誌的kafka集羣地址可topic名稱

Kafka接收到的日誌格式:

{"@timestamp":"2018-08-07T09:36:00.140Z","beat":{"hostname":"db-7eb166d3","name":"10.63.144.71","version":"5.4.0"},"input_type":"log","message":"# User@Host: select[select] @  [10.63.144.16]  Id: 23460596\n# Query_time: 0.155956  Lock_time: 0.000079 Rows_sent: 112  Rows_examined: 366458\nSET timestamp=1533634557;\nSELECT DISTINCT(uid) FROM common_member WHERE hideforum=-1 AND uid != 0;","offset":1753219021,"source":"/data/slow/mysql_slow.log","type":"log"}

Logstash配置

logstash完整的配置文件以下:

input {
    kafka {
        bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092"
        topics => ["mysql_slowlog_v2"]
    }
}

filter {
    json {
        source => "message"
    }

    grok {
        # 有ID有use
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]

        # 有ID無use
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]

        # 無ID有use
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]

        # 無ID無use
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
    }

    date {
        match => ["timestamp_mysql","UNIX"]
        target => "@timestamp"
    }

}

output {
    elasticsearch {
        hosts => ["10.82.9.208:9200","10.82.9.217:9200"]
        index => "mysql-slowlog-%{+YYYY.MM.dd}"
    }
}

重要參數解釋:

  • input:配置kafka的集羣地址和topic名字

  • filter:過濾日誌文件,主要是對message信息(看前文kafka接收到的日誌格式)進行拆分,拆分紅一個一個易讀的字段,例如UserHostQuery_timeLock_timetimestamp等。grok段根據咱們前文對mysql慢日誌的分類分別寫不通的正則表達式去匹配,當有多條正則表達式存在時,logstash會從上到下依次匹配,匹配到一條後邊的則再也不匹配。date字段定義了讓SQL中的timestamp_mysql字段做爲這條日誌的時間字段,kibana上看到的實踐排序的數據依賴的就是這個時間

  • output:配置ES服務器集羣的地址和index,index自動按天分割

kibana查詢展現

  • 打開Kibana添加mysql-slowlog-*的Index,並選擇timestamp,建立Index Pattern

  •  

  • 進入Discover頁面,能夠很直觀的看到各個時間點慢日誌的數量變化,能夠根據左側Field實現簡單過濾,搜索框也方便搜索慢日誌,例如我要找查詢時間大於2s的慢日誌,直接在搜索框輸入query_time: > 2回車便可

  •  

  • 點擊每一條日誌起邊的很色箭頭能查看具體某一條日誌的詳情

  •  

  • 若是你想作個大盤統計慢日誌的總體狀況,例如top 10 SQL等,也能夠很方便的經過web界面配置

  •  

總結

  1. 不要望而卻步,當你開始去作已經成功一半了

  2. 本篇文章詳細介紹了關於mysql慢日誌的收集,收集以後的處理呢?咱們目前是DBA天天花時間去Kibana上查看分析,有優化的空間就跟開發一塊兒溝通優化,後邊達成默契以後考慮作成自動報警或處理

  3. 關於報警ELK生態的xpark已經提供,且最新版本也開源了,感興趣的能夠先研究起來,歡迎一塊兒交流

相關文章
相關標籤/搜索