本文與前文是有關聯的,以前的兩篇文章客官能夠擡腿出門右轉 導讀,ELK 以前端,ELK 之分佈式發
#前端和消息隊列搞定以後,咱們須要安裝數據採集工具filebeats和數據過濾機運輸工具Logstash,通常狀況咱們都使用filebeats 用來收集日誌文件,我自定義了一個log文件,文件內容以下:
55.3.244.1 GET /index.html 15824 0.043
55.3.244.1 GET /index.html 15824 0.043
文件位置放到/tmp/test.loghtml
cd /opt/elk/filebeat-6.2.2-linux-x86_64
#filebeat 的配置文件大致上分爲兩部分,輸入部分和輸出部分,輸入指定輸入源便可,輸出能夠選擇直接到elasticsearch,logstash或者是消息隊列(redis&kafka)等,本文采用kafka 做爲消息隊列;前端
vi filebeat.yml linux
filebeat.prospectors: #日誌類型 - type: log enabled: True # 日誌路徑能夠寫多個,支持通配符 paths: - /tmp/test.log #設置字符集編碼 encoding: utf-8 #文檔類型 document_type: my-nginx-log #每十秒掃描一次 scan_frequency: 10s # 實際讀取文件時,每次讀取 16384 字節 harverster_buffer_size: 16384 #The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent. This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760). 一次發生的log大小值; max_bytes: 10485760 # 是否從文件末尾開始讀取 tail_files: true #給日誌加上tags方便在logstash過濾日誌的時候作判斷 tags: ["nginx-access"] #剔除以.gz 結尾的文件 exclude_files: [".gz$"] #output 部分的配置文檔 #配置輸出爲kafka,不管你是tar包仍是rpm安裝,目錄裏邊會看到官方提供的filebeat.full.yml OR filebeat.reference.yml 這裏邊有filebeat 全部的input 方法和output 方法供你參考; output.kafka: enabled: true #kafka 的server,能夠配置集羣,例子以下: hosts:["ip:9092","ip2:9092","ip3:9092"] #這個很是重要,filebeat做爲provider,把數據輸入到kafka裏邊,logstash 做爲消費者去消費這些信息,logstash的input 中須要這個topic,否則logstash沒有辦法取到數據。 topic: elk-%{[type]} # The number of concurrent load-balanced Kafka output workers. kafka 的併發運行進程 worker: 2 #當傳輸給kafka 有問題的時候,重試的次數; max_retries: 3 #單個kafka請求裏面的最大事件數,默認2048 bulk_max_size: 2048 #等待kafka broker響應的時間,默認30s timeout: 30s #kafka broker等待請求的最大時長,默認10s broker_timeout: 10s #每一個kafka broker在輸出管道中的消息緩存數,默認256 channel_buffer_size: 256 #網絡鏈接的保活時間,默認爲0,不開啓保活機制 keep_alive: 60 #輸出壓縮碼,可選項有none, snappy, lz4 and gzip,默認爲gzip (kafka支持的壓縮,數據會先被壓縮,而後被生產者發送,而且在服務端也是保持壓縮狀態,只有在最終的消費者端纔會被解壓縮) compression: gzip #容許的最大json消息大小,默認爲1000000,超出的會被丟棄,應該小於broker的 message.max.bytes(broker能接收消息的最大字節數) max_message_bytes: 1000000 #kafka的響應返回值,0位無等待響應返回,繼續發送下一條消息;1表示等待本地提交(leader broker已經成功寫入,但follower未寫入),-1表示等待全部副本的提交,默認爲1 required_acks: 0 #The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats"。客戶端ID 用於日誌怕錯,審計等,默認是beats。 client_id: beats #測試配置文件:/opt/elk/filebeat/filebeat -c /opt/elk/filebeat/filebeat.yml test config #若是配置文件沒有問題的話,會出現config ok ,若是有問題會提示具體問題在哪裏。 #啓動filebeat 能夠先經過 /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c -e /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml 查看一下輸入filebeat是否工做正常,會有不少信息打印到屏幕上; nohup /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml >>/dev/null 2>&1&
input { #數據來源 kafka { #這個對應filebeat的output 的index topics_pattern => "elk-.*" #kafka 的配置 bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092" kafka 中的group ID group_id => "logstash-g1" } } #過濾器,若是你想要日誌按照你的需求輸出的話,須要在logstash中過濾而且給與相應的key,好比如下的是nginx 日誌,用的是logstash內置好的過濾器,%{IP:client} 這個裏邊包含了兩個信息:ip地址,client 是在kibana 中顯示的自定義鍵值,最終顯示成這樣,http://grokdebug.herokuapp.com/ 是在線調試logstash filter的工具,你能夠在線調試你的過濾規則。 filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} #由於咱們取出了字段,因此不須要原來這個message字段,這個字段裏邊包含以前beat 輸入的全部字段。 remove_field => ["message"] } } output { elasticsearch { hosts => ["IP:9200"] #這個是在kibana上的index Patterns 的索引,建議什麼服務就用幹什麼名字,由於beat 提供了些kibana的模板能夠導入,導入的模板匹配的時候用的索引是對應服務的名稱開頭 。 index => "nginx-%{+YYYY.MM.dd}" document_type => "nginx" #每次20000 發送一次數據到elasticsearch flush_size => 20000 若是不夠20000,沒10秒會發送一次數據;、 idle_flush_time =>10 } }