針對公司項目微服務化,隨着項目及服務器的不斷增多,決定採用ELK(Elasticsearch+Logstash+Kibana)日誌分析平臺進行微服務日誌分析。redis
1.在微服務服務器上部署Logstash,做爲Shipper的角色,對微服務日誌文件數據進行數據採集,將採集到的數據輸出到Redis消息隊列。數據庫
2.在另一臺服務器上部署Logstash,做爲Indexer的角色,從Redis消息隊列中讀取數據(能夠對數據進行處理),輸出到Elasticsearch-Master主節點。bash
3.Elasticsearch-Master主節點內部與副節點同步數據。(Elasticsearch集羣建議3個服務以上奇數)服務器
4.Kibana部署一臺服務器內,讀取Elasticsearch集羣數據,展現Web查詢頁面,提供數據展現。架構
在我這個最終方案中,選擇了使用Redis做爲消息隊列進行緩衝,下降Elasticsearch壓力,起到削峯做用,主要緣由仍是因爲公司考慮成本問題,日誌收集也是隻針對咱們單個項目組來使用,因此選擇了公司如今就已經有的Redis集羣進行復用。elasticsearch
最初方案中,在消息隊列上選擇的是Kafka,畢竟Kafka天生就是作爲消息隊列的,具體兩者的畢竟在這裏我就很少說了,百度上一大堆。微服務
這裏就不在這裏寫出來了,提供三個地址僅供參考:ui
Linux安裝Logstash
Linux安裝Kibana
Linux安裝Elasticsearchspa
從日誌文件讀取到redis日誌
#從日誌文件讀取數據
#file{}
#type 日誌類型
#path 日誌位置
# 能夠直接讀取文件(a.log)
# 能夠全部後綴爲log的日誌(*.log)
# 讀取文件夾下全部文件(路徑)
#start_position 文件讀取開始位置 (beginning)
#sincedb_path 從什麼位置讀取(設置爲/dev/null自動從開始位置讀取)
input {
file {
type => "log"
path => ["/root/logs/info.log"]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
#根據時間戳分隔日誌
#grok 區分日誌中得字段
filter {
multiline {
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => previous
}
#定義數據的格式
grok {
match => { "message" => "%{DATA:datetime} - %{DATA:logLevel} - %{DATA:serviceName} - %{DATA:ip} - %{DATA:pid} - %{DATA:thread} - %{DATA-msg}"}
}
}
#輸出數據到Redis
#host Redis主機地址
#port Redis端口
#db Redis數據庫編號
#data_type Redis數據類型
#key Redis的key
#password Redis密碼
output {
redis {
host => "ip"
port => "6379"
db => "6"
data_type => "list"
password => "password"
key => "test_log"
}
}
複製代碼
從redis讀取到es
#從redis內讀取數據
#host Redis主機ip
#port Redis端口
#data_type Redis數據類型
#batch_count
#password Redis密碼
#key Redis讀取Key
input {
redis {
host => "ip"
port => "6379"
db => "6"
data_type => "list"
password => "password"
key => "test_log"
}
}
#數據的輸出咱們指向了es集羣
#hosts Elasticsearch主機地址
#index Elasticsearch索引名稱
output {
elasticsearch {
hosts => "ip:9200"
index => "logs-%{+YYYY.MM.dd}"
}
}
複製代碼
其餘剩下的就是Es集羣和Kibana了,這兩個沒什麼特別值得注意的地方,上網隨便搜,一大堆文章。
以上僅僅表明本人項目使用方案,不必定完美適合全部場景,僅供參考。