在ELFK集羣結合Kafka集羣的配置過程當中,經過filebet收集日誌傳輸到kafka,而後Kafka傳輸日誌到logstash,最後到elasticsearch,並結合zabbix告警。html
但在kafka傳輸日誌到logstash的配置完成後,啓動logstash時報錯:java
[WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException
kafka經過logstash輸入到elasticsearch的配置文件:web
input { kafka { bootstrap_servers => "192.168.1.253:9092" group_id => "webservice" topics => "253-webservice" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true type => "253_webservice" }}filter { if [type] == "253_ciphermachine" { ruby { code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)" } #grok{ # match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})" ] #} grok { match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ] } mutate { add_field => [ "[zabbix_key]", "ciphermachine_error" ] add_field => [ "[zabbix_host]", "192.168.1.253" ] remove_field => "@version" remove_field => "host" remove_field => "path" remove_field => "_type" remove_field => "_score" remove_field => "_id" remove_field => "thread-id" remove_field => "log_time" remove_field => "thisdate" remove_field => "thistime" remove_field => "score" remove_field => "id" remove_field => "name" remove_field => "beat.*" remove_field => "fields.*" remove_field => "host.*" remove_field => "input.type" remove_field => "log.file.path" remove_field => "offset" remove_field => "prospector.type" remove_field => "source" } #ruby { # code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )" #} date { match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ] target => "@timestamp" } ruby { code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)" } }}output { if [type] == "253_ciphermachine" { elasticsearch { hosts => ["192.168.1.253:9200"] user => "elastic" password => "changeme" index => "webservice.log-%{+YYYY.MM.dd}" } if [level] =~ /(ERR|error|ERROR|Failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.1.252" zabbix_server_port => "10051" zabbix_value => "message" } } }}
網上搜了一下,提示是kafka併發問題。因而從新修改了配置文件:shell
input { kafka { bootstrap_servers => "192.168.1.253:9092" group_id => "ciphermachine" client_id => "ciphermachine-1" #須要指定client_id,不然logstash啓動報錯 topics => "253-ciphermachine" auto_offset_reset => "latest" codec => "json" consumer_threads => 5 decorate_events => false type => "253_ciphermachine" }}filter { if [type] == "253_ciphermachine" { ruby { code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)" } #grok{ # match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})" ] #} grok { match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ] } mutate { add_field => [ "[zabbix_key]", "ciphermachine_error" ] add_field => [ "[zabbix_host]", "192.168.1.253" ] remove_field => "@version" remove_field => "host" remove_field => "path" remove_field => "_type" remove_field => "_score" remove_field => "_id" remove_field => "thread-id" remove_field => "log_time" remove_field => "thisdate" remove_field => "thistime" remove_field => "score" remove_field => "id" remove_field => "name" remove_field => "beat.*" remove_field => "fields.*" remove_field => "host.*" remove_field => "input.type" remove_field => "log.file.path" remove_field => "offset" remove_field => "prospector.type" remove_field => "source" } #ruby { # code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )" #} date { match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ] target => "@timestamp" } ruby { code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)" } }}output { if [type] == "253_ciphermachine" { elasticsearch { hosts => ["192.168.1.253:9200"] user => "elastic" password => "changeme" index => "webservice.log-%{+YYYY.MM.dd}" } if [level] =~ /(ERR|error|ERROR|Failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.1.252" zabbix_server_port => "10051" zabbix_value => "message" } } }}
反覆測試,發現是input部分沒有指定client_id
,任意指定client_id
便可,重啓logstash後再也不報錯。apache
在部署過程當中,原先直接使用logstash收集日誌傳輸到kafka存在問題,kafka沒法接收到logstash傳入的數據,而手動去kafka的topic中生產數據,又會致使kafka重複消費的問題,可是我其實配置了auto_offset_reset => "latest"
,多是個人配置文件存在問題。因而最終放棄了直接用logstash收集日誌到kafka,改由filebeat收集日誌到kafka。json
可是在此以前,我須要刪除以前的topic。可是在刪除topic時,一直提示zookeeper marked,實際topic沒有刪除,只是被zookeeper標記,能夠經過下面解決。bootstrap
# vim /usr/local/kafka/config/server.propertiesdelete.topic.enable=true #容許刪除topic
# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.1.253:2181 --topic 253-webservice
若是這樣沒法刪除,進入zookeeper命令行刪除:vim
# /usr/local/kafka/bin/zookeeper-shell.sh 192.168.1.253:2181ls /brokers/topics rmr /brokers/topics/253-ciphermachine rmr /brokers/topics/253-webservice
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.253:2181