統一日誌檢索部署(es、logstash、kafka、flume)

flume:用來蒐集日誌,將日誌傳輸至kakfahtml

kafka:做爲緩存,存儲來自flume的日誌java

es:做爲存儲媒介,存放日誌nginx

logstash:針對日誌進行過濾處理apache

flume部署

獲取安裝包、解壓json

1 cd /usr/local/src && wget http://10.80.7.177/install_package/apache-flume-1.7.0-bin.tar.gz  && tar zxf apache-flume-1.7.0-bin.tar.gz -C /usr/local/

修改flumen-env.sh腳本,設置啓動參數bootstrap

1 cd /usr/local/apache-flume-1.7.0-bin 
2 vim conf/flume-env.sh
1 export JAVA_HOME=/usr/java/jdk1.8.0_121/
2 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote" //設置啓動jvm使用的內存大小

編輯配置文件vim

1 vim conf/flume_kfk.conf    (備註:該配置文件名字隨便起)
 1 agent.sources = s1
 2 agent.channels = c1
 3 agent.sinks = k1
 4 
 5 agent.sources.s1.type=exec
 6 
 7 #要蒐集的日誌文件據對路徑
 8 agent.sources.s1.command=tail -F /root/test.log
 9 agent.sources.s1.channels=c1
10 agent.channels.c1.type=memory
11 agent.channels.c1.capacity=10000
12 agent.channels.c1.transactionCapacity=100
13 #設置Kafka接收器
14 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
15 #設置Kafka的broker地址和端口號
16 agent.sinks.k1.brokerList=10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092
17 #設置Kafka的Topic
18 agent.sinks.k1.topic=kafkatest
19 #設置序列化方式
20 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
21 agent.sinks.k1.channel=c1

建立kafka的topic緩存

1 cd /data1/kafka/kafka_2.11-0.10.1.0/ && ./kafka-topics.sh --create --topic kafkatest --replication-factor 3 --partitions 20 --zookeeper 10.90.11.19:12181

啓動flumeruby

1 /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume_launcherclick.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true

測試app

1 測試:向/root/test.log文件追加日誌。登陸kakfa manager上查看kafkatest 的topic中是否有消息。若是有的話,說明沒有問題,若是沒有,請檢查。

部署supervisor監控flume

supervisor部署再次很少加贅述,詳見:https://www.cnblogs.com/sailq21/p/9227592.html

編輯/etc/supervisord.conf

[unix_http_server]
file=/data/ifengsite/flume/supervisor.sock   ; the path to the socket file
[inet_http_server]         ; inet (TCP) server disabled by default
port=9001        ; ip_address:port specifier, *:port for all iface
[supervisord]
logfile=/data/logs/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
loglevel=info                ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false               ; start in foreground if true; default false
minfds=1024                  ; min. avail startup file descriptors; default 1024
minprocs=200                 ; min. avail process descriptors;default 200
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///data/ifengsite/flume/supervisor.sock ; use a unix:// URL  for a unix socket
[include]
files = /etc/supervisord.d/*.conf

編輯flume啓動文件

 1 [program:flume-push]
 2 directory = /usr/local/apache-flume-1.7.0-bin/
 3 command = /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/push.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true
 4 autostart = true
 5 startsecs = 5
 6 autorestart = true
 7 startretries = 3
 8 user = root
 9 redirect_stderr = true
10 stdout_logfile_maxbytes = 20MB
11 stdout_logfile_backups = 20
12 stdout_logfile = /data/ifengsite/flume/logs/flume-supervisor.log

建立目錄、並啓動supervisor

1 mkdir -p /data/ifengsite/flume/logs/
2 supervisord -c /etc/supervisord.conf
3 重啓supervisor:supervisorctl reload

測試:登陸ip:9001查看supervisor

若是flume到kafka沒有問題,接下來配置logstash

編輯flume_kfk.conf

1 vim /etc/logstash/conf.d/flume_kfk.conf
 1 input{
 2     kafka {
 3         bootstrap_servers => ["10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092"]
 4         client_id => "test"
 5         group_id => "test"
 6         consumer_threads => 5
 7         decorate_events => true
 8         topics => "kafkatest"
 9         type => "testqh"
10     }
11 }
12 filter {
13     mutate {
14         gsub => ["message","\\x","\\\x"]
15     }
16     json {
17         source => "message"
18         remove_field => ["message","beat","tags","source","kafka"]
19     }
20     date {
21         match => ["timestamp","ISO8601"]
22         timezone => "Asia/Shanghai"
23         target => "@timestamp"
24     }
25 
26 }
27 
28 #輸出到標準輸出用於debug,須要輸出到es的時候,可配置爲es接收。
29 output{
30     stdout{
31         codec => rubydebug
32     }
33 }

 補充:

  一、若是使用logstash 的grok插件對nginx等日誌進行正則匹配是,自定義匹配模式的文件目錄在:/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns,文件名爲:grok-patterns

  二、若是想測試本身定義的模式是否可以匹配到相應的文件:能夠使用grok debug網站,進行匹配測試,網址爲:https://grokdebug.herokuapp.com/

相關文章
相關標籤/搜索