flume:用來蒐集日誌,將日誌傳輸至kakfahtml
kafka:做爲緩存,存儲來自flume的日誌java
es:做爲存儲媒介,存放日誌nginx
logstash:針對日誌進行過濾處理apache
獲取安裝包、解壓json
1 cd /usr/local/src && wget http://10.80.7.177/install_package/apache-flume-1.7.0-bin.tar.gz && tar zxf apache-flume-1.7.0-bin.tar.gz -C /usr/local/
修改flumen-env.sh腳本,設置啓動參數bootstrap
1 cd /usr/local/apache-flume-1.7.0-bin 2 vim conf/flume-env.sh
1 export JAVA_HOME=/usr/java/jdk1.8.0_121/ 2 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote" //設置啓動jvm使用的內存大小
編輯配置文件vim
1 vim conf/flume_kfk.conf (備註:該配置文件名字隨便起)
1 agent.sources = s1 2 agent.channels = c1 3 agent.sinks = k1 4 5 agent.sources.s1.type=exec 6 7 #要蒐集的日誌文件據對路徑 8 agent.sources.s1.command=tail -F /root/test.log 9 agent.sources.s1.channels=c1 10 agent.channels.c1.type=memory 11 agent.channels.c1.capacity=10000 12 agent.channels.c1.transactionCapacity=100 13 #設置Kafka接收器 14 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink 15 #設置Kafka的broker地址和端口號 16 agent.sinks.k1.brokerList=10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092 17 #設置Kafka的Topic 18 agent.sinks.k1.topic=kafkatest 19 #設置序列化方式 20 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder 21 agent.sinks.k1.channel=c1
建立kafka的topic緩存
1 cd /data1/kafka/kafka_2.11-0.10.1.0/ && ./kafka-topics.sh --create --topic kafkatest --replication-factor 3 --partitions 20 --zookeeper 10.90.11.19:12181
啓動flumeruby
1 /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume_launcherclick.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true
測試app
1 測試:向/root/test.log文件追加日誌。登陸kakfa manager上查看kafkatest 的topic中是否有消息。若是有的話,說明沒有問題,若是沒有,請檢查。
supervisor部署再次很少加贅述,詳見:https://www.cnblogs.com/sailq21/p/9227592.html
編輯/etc/supervisord.conf
[unix_http_server] file=/data/ifengsite/flume/supervisor.sock ; the path to the socket file [inet_http_server] ; inet (TCP) server disabled by default port=9001 ; ip_address:port specifier, *:port for all iface [supervisord] logfile=/data/logs/supervisord.log ; main log file; default $CWD/supervisord.log logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB logfile_backups=10 ; # of main logfile backups; 0 means none, default 10 loglevel=info ; log level; default info; others: debug,warn,trace pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid nodaemon=false ; start in foreground if true; default false minfds=1024 ; min. avail startup file descriptors; default 1024 minprocs=200 ; min. avail process descriptors;default 200 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///data/ifengsite/flume/supervisor.sock ; use a unix:// URL for a unix socket [include] files = /etc/supervisord.d/*.conf
編輯flume啓動文件
1 [program:flume-push] 2 directory = /usr/local/apache-flume-1.7.0-bin/ 3 command = /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/push.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true 4 autostart = true 5 startsecs = 5 6 autorestart = true 7 startretries = 3 8 user = root 9 redirect_stderr = true 10 stdout_logfile_maxbytes = 20MB 11 stdout_logfile_backups = 20 12 stdout_logfile = /data/ifengsite/flume/logs/flume-supervisor.log
建立目錄、並啓動supervisor
1 mkdir -p /data/ifengsite/flume/logs/ 2 supervisord -c /etc/supervisord.conf 3 重啓supervisor:supervisorctl reload
測試:登陸ip:9001查看supervisor
若是flume到kafka沒有問題,接下來配置logstash
編輯flume_kfk.conf
1 vim /etc/logstash/conf.d/flume_kfk.conf
1 input{ 2 kafka { 3 bootstrap_servers => ["10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092"] 4 client_id => "test" 5 group_id => "test" 6 consumer_threads => 5 7 decorate_events => true 8 topics => "kafkatest" 9 type => "testqh" 10 } 11 } 12 filter { 13 mutate { 14 gsub => ["message","\\x","\\\x"] 15 } 16 json { 17 source => "message" 18 remove_field => ["message","beat","tags","source","kafka"] 19 } 20 date { 21 match => ["timestamp","ISO8601"] 22 timezone => "Asia/Shanghai" 23 target => "@timestamp" 24 } 25 26 } 27 28 #輸出到標準輸出用於debug,須要輸出到es的時候,可配置爲es接收。 29 output{ 30 stdout{ 31 codec => rubydebug 32 } 33 }
補充:
一、若是使用logstash 的grok插件對nginx等日誌進行正則匹配是,自定義匹配模式的文件目錄在:/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns,文件名爲:grok-patterns
二、若是想測試本身定義的模式是否可以匹配到相應的文件:能夠使用grok debug網站,進行匹配測試,網址爲:https://grokdebug.herokuapp.com/