什麼是Mysql慢查詢日誌?mysql
當SQL語句執行時間超過所設定的閾值時,便會記錄到指定的日誌文件中或者表中,全部記錄內容稱之爲慢查詢日誌正則表達式
爲何要收集Mysql慢查詢日誌?sql
數據庫在運行期間,可能會存在這不少SQL語句標準性的問題,那麼咱們如何快速的去定位,分析哪些SQL語句須要優化處理,又是哪些SQL語句給業務系統形成影響呢?當咱們進行統一的收集分析,這樣開發和運維就不會產生矛盾,就不會相互的「撕咬」了;SQL語句執行的時間,對應的語句以及具體的寫法一切一覽無餘~數據庫
如何收集Mysql慢查詢日誌或者是說具體思路是什麼?json
1.首先咱們的數據採集者仍然是經過filebeat這個工具,對慢查詢日誌進行採集;bootstrap
1.1:開啓慢查詢日誌(永久生效,進入my.cnf配置文件進行修改,重啓以後便會生效)vim
- slow_query_log=ON( log_slow_queries=ON ) #前者是Mysql5.6以後的開啓方法;後者是Mysql5.6以前的開啓方法;這裏版本採用是5.5版本,故此都生效
- slow_query_log_file=/dada/mysql/the_slow_query.log #指定mysql慢查詢日誌的存儲目錄位置
- long_query_time=1 #指定慢查詢的時間閾值
- log_outpu=FILE #表示慢查詢日誌存放於指定的文件中
1.2:由於Mysql慢查詢的格式多行格式,而且不一樣mysql版本的慢查詢日誌格式也不經相同,須要考慮將其「#Time」字段過濾掉並整合成爲爲一整條完整的日誌輸出,能夠藉助filebeat的muitline.negate選項,將慢日誌查詢日誌多行合併到一塊兒,輸出爲一條日誌;ruby
2.在logstash事件配置中,咱們將拉取到的日誌在此經過file過濾處理(經過grok的match插件將日誌分爲四種狀況,當有多條匹配規則存在,就會依次匹配)根據不一樣狀況(數據庫版本不一樣,慢查詢亦不一樣,有id有user,有id無user,無id有user,無id無user)四種狀況進行判斷匹配,服務器
[mysql5.6版本my.conf配置]運維
slow_query_log=ON #開啓慢查詢日誌
slow_query_log_file=/data/mysql_query.log #從新定義慢查詢日誌路徑
long_query_time=1 #設定sql語句執行超時時間
[root@test ~]# systemctl restart mysqld
mysql> select sleep(5); #執行測試語句,驗證慢日誌輸出,只要超過1秒設定的閥值便可
# tail -f /dada/mysql_query.log
配置完慢查詢日誌路徑時,切記不要忘記給予mysql屬組屬主權限,不然重啓mysql服務以後,即便經過sql語句查詢到慢查詢日誌位置,可是也沒有生效,報錯以下:
【拓展】
MariaDB [(none)]> show variables like "%log_output%"; #查看日誌存放方式
MariaDB [(none)]> show global status like "%slow_queries%"; #mysql服務自啓動到當前時間點的慢查詢次數統計
【filebeat配置】
# vim /usr/local/filebeat/filebeat.yml
filebeat.inputs: #定義數據的原型 - type: log #定義的數據如如類型是log,爲默認值。 enabled: true #啓動手工配置filebeat paths: - /data/mysql_query.log #這是指定mysql慢查詢日誌文件的路徑 fields: log_topic: mysqlslowlogs #定義一個新字段log_topic,值爲mysql_slowlogs;用於kifka的topic主題 exclude_lines: ['^\# Time'] #支持正則,排除匹配的行,若是有多行,合併成整行過濾;這裏過濾掉# Time開頭的行 multiline.negate: true #匹配多行時指定正則表達式,這裏匹配以# Time或者# User開頭的行,Time行要先匹配再過濾 multiline.match: after #定義如何將匹配行組合成時間,在以前或者以後,有 "after"、 "before"兩個值。multiline.pattern: '^\# Time|^\# User'
multiline.pattern: '^\# Time|^\# User'
processors: #filebeat對應的消息類型 - drop_fields: #刪除無用的字段 fields: ["beat", "input", "source", "offset", "prospector"] filebeat.config.modules: #模塊配置,默認狀況下會加載modules.d目錄中啓用的模塊 path: ${path.config}/modules.d/*.yml reload.enabled: false #從新加載,爲關閉狀態 name: 192.168.37.134 #host主機名稱,指定本地日誌收集的服務器IP便可 output.kafka: #filebeat支持多種輸出,如kafka,logstash,elasticsearch等 enabled: true #表示啓動該模塊 hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"] #指定輸出數據到kafka集羣地址中,加上端口號 version: "0.10" topic: '%{[fields.log_topic]}' #自動獲取日誌分類,此處格式爲filebeat6.x版本專配 partition.round_robin: #分區 reachable_only: true worker: 2 required_acks: 1 compression: gzip #壓縮格式 max_message_bytes: 10000000 #最大消息字節 logging.level: debug #debug日誌級別 filebeat的配置,重點是multiline.negate選項,經過此選項,將mysql慢查詢日誌多行合併到了一塊兒,輸出爲一條日誌。
[root@test filebeat]# nohup ./filebeat -e -c filebeat.yml &
【Logstash事件配置】
# vim /usr/local/logstash/config/etc/mysql_logs_query.conf
input { kafka { bootstrap_servers => "192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092" topics => ["mysqlslowlogs"] } } filter { json { source => "message" } grok { # 有ID有use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # 有ID無use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # 無ID有use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # 無ID無use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] } date { match => ["timestamp_mysql","UNIX"] #這裏咱們須要對日誌輸出進行時間格式轉換,默認將timestamp_mysql的unix時間格式以後直接賦值給timestamp target => "@timestamp" } mutate { remove_field => "@version" #版本字段,刪除不須要的字段 remove_field => "message" #在上述,咱們已經對mysql的慢日誌輸出分段了,因此message輸出已是多餘的了,故此刪除 } } output { elasticsearch { hosts => ["192.168.37.134:9200","192.168.37.135:9200","192.168.37.136:9200"] #將filebeat採集到的數據輸出到ES中 index => "mysql-slowlog-%{+YYYY.MM.dd}" #索引文件名稱 } #stdout{ #codec=> rubydebug #} }
[root@localhost etc]# nohup /usr/local/logstash/bin/logstash -f mysql_logs_query.conf & #後臺運行
ps:可stdount終端輸出驗證正常
【Kibana登陸驗證】