Filebeat是一個日誌文件託運工具,在你的服務器上安裝客戶端後,filebeat會監控日誌目錄或者指定的日誌文件,追蹤讀取這些文件(追蹤文件的變化,不停的讀)。java
Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。node
ElasticSearch它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch 中的 Index 是一組具備類似特徵的文檔集合,相似於關係數據庫模型中的數據庫實例,Index 中能夠指定 Type 區分不一樣的文檔,相似於數據庫實例中的關係表,Document 是存儲的基本單位,都是 JSON 格式,相似於關係表中行級對象。咱們處理後的 JSON 文檔格式的日誌都要在 Elasticsearch 中作索引,相應的 Logstash 有 Elasticsearch output 插件,對於用戶是透明的。linux
Hadoop 生態圈爲大規模數據集的處理提供多種分析功能,但實時搜索一直是 Hadoop 的軟肋。現在,Elasticsearch for Apache Hadoop(ES-Hadoop)彌補了這一缺陷,爲用戶整合了 Hadoop 的大數據分析能力以及 Elasticsearch 的實時搜索能力。web
Logstash 是一種功能強大的信息採集工具,相似於 Hadoop 生態圈裏的 Flume。一般在其配置文件規定 Logstash 如何處理各類類型的事件流,通常包含 input、filter、output 三個部分。Logstash 爲各個部分提供相應的插件,於是有 input、filter、output 三類插件完成各類處理和轉換;另外 codec 類的插件能夠放在 input 和 output 部分經過簡單編碼來簡化處理過程。spring
Kibana是ElasticSearch的用戶界面。shell
在實際應用場景下,爲了知足大數據實時檢索的場景,利用Filebeat去監控日誌文件,將Kafka做爲Filebeat的輸出端,Kafka實時接收到Filebeat後以Logstash做爲輸出端輸出,到Logstash的數據也許還不是咱們想要的格式化或者特定業務的數據,這時能夠經過Logstash的一些過了插件對數據進行過濾最後達到想要的數據格式以ElasticSearch做爲輸出端輸出,數據到ElasticSearch就能夠進行豐富的分佈式檢索了。數據庫
下載 elasticsearch、logstash、kibana、filebeat 的壓縮包,並將四個壓縮包上傳到 /data/elk 目錄下
json
略vim
說明:ElasticSearch的運行不能用root執行,本身用useradd命令新建一個用戶以下所示:
添加普通用戶elk並設置密碼
useradd elk
passwd elk 而後根據提示輸入密碼便可springboot
修改文件全部者
chown -R elk:elk /data/elk
解壓
tar -zxvf elasticsearch-6.7.0.tar.gz
修改 es 的配置文件
cd /data/elk/elasticsearch-6.7.0/config
vim elasticsearch.yml
修改方法參考以下:
cluster.name: my-application
node.name: node-1
node.attr.rack: r1
path.data: /data/elk/elasticsearch-6.7.0/data
path.logs: /data/elk/elasticsearch-6.7.0/logs
network.host: localhost
http.port: 9200
su elk
./bin/elasticsearch -d
啓動時間有點慢,耐心等待10-20s或者更長,查看9200,9300端口是否開啓
netstat -tnlp|grep 9[23]00
tcp6 0 0 localhost:9200 :::* LISTEN 30155/java
tcp6 0 0 localhost:9300 :::* LISTEN 30155/java
訪問地址:http://localhost:9200
ps -ef |grep elasticsearch
kill PID
filebeat:部署在具體的業務機器上,經過定時監控的方式獲取增量的日誌,並轉發到logstash、elasticsearch、kafka等等。
解壓
tar –zxvf filebeat-6.7.0-linux-x86_64.tar.gz
cd filebeat-6.7.0-linux-x86_64
修改kibana 的配置文件
vim filebeat.yml
===================Filebeat prospectors ===========
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /data/elk/logfile
#- c:\programdata\elasticsearch\logs\*
#輸出到logstash
#-------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
./filebeat -e -c filebeat.yml -d "publish"
解壓
tar -zxvf logstash-6.7.0.tar.gz
cd logstash-6.7.0
配置
在logstash文件夾的下bin目錄建立配置文件logstash.conf ,內容以下:
input:對應輸入的配置,其中path是監控的文**件路勁, codec編碼個格式
output:elasticsearch : { hosts => "localhost:9200" }
對應輸出到elasticsearch,hosts是elasticsearch安裝地址
input {
beats {
host => "localhost"
port => 5044
codec => json
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "localhost:9200"
index="test_index"
}
}
測試你的配置文件 是否正確( 解析配置文件並報告任何錯誤。)
./logstash -f logstash.conf --config.test_and_exit
Sending Logstash logs to /data/elk/logstash-6.7.0/logs which is now configured via log4j2.properties
[2019-03-28T17:55:34,114][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/data/elk/logstash-6.7.0/data/queue"}
[2019-03-28T17:55:34,169][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/data/elk/logstash-6.7.0/data/dead_letter_queue"}
[2019-03-28T17:55:34,856][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2019-03-28T17:55:43,327][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
啓動命令(啓用自動配置從新加載,這樣就沒必要每次修改配置文件時都中止並從新啓動Logstash )
./logstash -f logstash.conf --config.reload.automatic
中止服務
ps aux | grep logstash
kill -9 pid
解壓
tar –zxvf kibana-6.7.0-linux-x86_64.tar.gz
cd kibana-6.7.0-linux-x86_64/config
配置
vim kibana.yml
server.port: 5601
server.host: "localhost"
#elasticsearch.username: "elastic"
#elasticsearch.password: "changeme"
elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"
./bin/kibana
訪問地址:
http://localhost:5601
netstat -anltp|grep 5601查找對應端口
kill -9 pid
咱們經過echo往這文件/data/elk/test.log追加內容,來測試整個日誌收集系統是否可行
vim test2.conf
input {
file {
type =>"syslog"
path => ["/data/elk/test.log" ]
}
syslog {
type =>"syslog"
port =>"5544"
}
}
output {
stdout { codec=> rubydebug }
elasticsearch {hosts => "http:// localhost:9200"}
}
啓動
./logstash -f test2.conf --config.reload.automatic
向監控文件中寫入數據
echo "{ "firstName": "1", "lastName":"McLaughlin", "email": "aaaa" }" >> test.log
Logstash打印接收到的數據:
{
"message" => "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
"@timestamp" => 2019-03-29T02:39:35.229Z,
"@version" => "1",
"host" => "node223",
"path" => "/data/elk/test.log"
}
查看es全部index
curl -X GET 'http://localhost:9200/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .kibana_task_manager ZYN7zZl3QLuB1d-EE_ysHA 1 0 2 0 12.5kb 12.5kb
yellow open logstash-2019.03.29 rfkkxW7rQDKnl40_Ekaf7g 5 1 2 0 10.8kb 10.8kb
yellow open test-index hFBQF-jZSkqajPRYzegfwg 5 1 0 0 1.2kb 1.2kb
green open .kibana_1 sEUUnVMxQ0amHbQGcOedOQ 1 0 4 1 19.8kb 19.8kb
查看單個index數據
curl 'localhost:9200/logstash-2019.03.29/_search?pretty=true'
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "logstash-2019.03.29",
"_type" : "doc",
"_id" : "W85Rx2kBAxlZ_OvPIlZf",
"_score" : 1.0,
"_source" : {
"message" : "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
"@timestamp" : "2019-03-29T02:39:35.229Z",
"@version" : "1",
"host" : "node223",
"path" : "/data/elk/test.log"
}
}
]
}
}
在你開始用Kibana以前,你須要告訴Kibana你想探索哪一個Elasticsearch索引。第一次訪問Kibana是,系統會提示你定義一個索引模式以匹配一個或多個索引的名字。
一、訪問Kibana UI。例如,localhost:56011 或者 http://YOURDOMAIN.com:5601
二、指定一個索引模式來匹配一個或多個你的Elasticsearch索引。當你指定了你的索引模式之後,任何匹配到的索引都將被展現出來。
(畫外音:*匹配0個或多個字符; 指定索引默認是爲了匹配索引,確切的說是匹配索引名字)
三、點擊「Next Step」以選擇你想要用來執行基於時間比較的包含timestamp字段的索引。若是你的索引沒有基於時間的數據,那麼選擇「I don’t want to use the Time Filter」選項。
四、點擊「Create index pattern」按鈕來添加索引模式。第一個索引模式自動配置爲默認的索引默認,之後當你有多個索引模式的時候,你就能夠選擇將哪個設爲默認。(提示:Management > Index Patterns)
如今,Kibana已經鏈接到你的Elasticsearch數據。Kibana展現了一個只讀的字段列表,這些字段是匹配到的這個索引配置的字段。
http://localhost:5601
要求日誌格式爲json格式
在Spring Boot工程resources目錄下配置logback.xml文件,旨在將log4j日誌按json格式輸出到指定目錄,logback.xml文件內容以下:
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<!--定義日誌文件的存儲地址 勿在 LogBack 的配置中使用相對路徑
<property name="LOG_HOME" value="E:/demo/home" />
-->
<!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日誌消息,%n是換行符 -->
<!--<property name="LOG_PATTERN" value="[ %d{yyyy-MM-dd HH:mm:ss.SSS}] [UUID:%X{requestId}] [%level]|[${HOSTNAME}] [%thread]|[%logger{36}] | %msg|%n " />-->
<property name="TIME_PATTERN" value="yyyy-MM-dd'T'HH:mm:ss.SSS+08:00" />
<property name="LOG_PATTERN" value='{"event":"log","timestamp":"%d{${TIME_PATTERN}}","level":"%level","serviceName":"${springAppName:-}","pid": "${PID:-}","host":"${HOSTNAME}","class": "%logger{40}","thread":"%thread","message":"%msg"}%n ' />
<!-- Console 輸出設置 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 按照天天生成日誌文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/newsinfo-dataservice.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>logs/newsinfo-dataservice.%d{yyyy-MM-dd}.log</FileNamePattern>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>
<!--log4jdbc -->
<logger name="com.sohu" level="debug" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</logger>
<!-- 日誌輸出級別 -->
<root level="info">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
</configuration>
輸出日誌示例以下:
{"event":"log","timestamp":"2019-04-09T19:47:07.908Z","level":"INFO","serviceName":"springAppName_IS_UNDEFINED","pid": "7196","host":"node1","class": "o.s.b.web.servlet.FilterRegistrationBean","thread":"localhost-startStop-1","message":"Mapping filter: 'MyFilter' to urls: [/getUser, /hello]"}
啓動服務的同時,配置日誌收集到kafka,而後調用ELK讀取kafka日誌存入ES,進行日誌的全文檢索和實時告警分析。
路徑:/code/logs/newsinfo-dataservice.log自動收集 :自動將日誌收集到 KafkaKafka Topic:newsinfo_appdata_service_log