ELFK結合Kafka報錯:InstanceAlreadyExistsException

InstanceAlreadyExistsException錯誤

在ELFK集羣結合Kafka集羣的配置過程當中,經過filebet收集日誌傳輸到kafka,而後Kafka傳輸日誌到logstash,最後到elasticsearch,並結合zabbix告警。html

但在kafka傳輸日誌到logstash的配置完成後,啓動logstash時報錯:java

[WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException

kafka經過logstash輸入到elasticsearch的配置文件:web

input {
    kafka {
        bootstrap_servers => "192.168.1.253:9092"
        group_id => "webservice"
	    topics => "253-webservice"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true 
        type => "253_webservice"
    }}filter {
    if [type] == "253_ciphermachine" {
        ruby {
            code => "event.set('log_time',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
        #grok{
        #    match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})"  ]
        #}

        grok {
            match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ]
        }

        mutate {
            add_field => [ "[zabbix_key]", "ciphermachine_error" ]
            add_field => [ "[zabbix_host]", "192.168.1.253" ]

            remove_field => "@version"
            remove_field => "host"
            remove_field => "path"
            remove_field => "_type"
            remove_field => "_score"
            remove_field => "_id"
            remove_field => "thread-id"
            remove_field => "log_time"
            remove_field => "thisdate"
            remove_field => "thistime"
            remove_field => "score"
            remove_field => "id"
            remove_field => "name"
            remove_field => "beat.*"
            remove_field => "fields.*"
            remove_field => "host.*"
            remove_field => "input.type"
            remove_field => "log.file.path"
            remove_field => "offset"
            remove_field => "prospector.type"
            remove_field => "source"
        }

        #ruby {
        #    code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )"
        #}

        date {
            match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ]
            target => "@timestamp"
        }

        ruby {
            code => "event.set('logtime',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
    }}output {
    if [type] == "253_ciphermachine" {
        elasticsearch {
            hosts => ["192.168.1.253:9200"]
            user => "elastic"
            password => "changeme"
            index => "webservice.log-%{+YYYY.MM.dd}"
        }

        if [level]  =~ /(ERR|error|ERROR|Failed)/ {
            zabbix {
                zabbix_host => "[zabbix_host]"
                zabbix_key => "[zabbix_key]"
                zabbix_server_host => "192.168.1.252"
                zabbix_server_port => "10051"
                zabbix_value => "message"
            }
        }
    }}

網上搜了一下,提示是kafka併發問題。因而從新修改了配置文件:shell

input {
    kafka {
        bootstrap_servers => "192.168.1.253:9092"
        group_id => "ciphermachine"
        client_id => "ciphermachine-1"                #須要指定client_id,不然logstash啓動報錯
        topics => "253-ciphermachine"
        auto_offset_reset => "latest"
        codec => "json"
        consumer_threads => 5
        decorate_events => false
        type => "253_ciphermachine"
    }}filter {
    if [type] == "253_ciphermachine" {
        ruby {
            code => "event.set('log_time',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
        #grok{
        #    match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})"  ]
        #}

        grok {
            match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ]
        }

        mutate {
            add_field => [ "[zabbix_key]", "ciphermachine_error" ]
            add_field => [ "[zabbix_host]", "192.168.1.253" ]

            remove_field => "@version"
            remove_field => "host"
            remove_field => "path"
            remove_field => "_type"
            remove_field => "_score"
            remove_field => "_id"
            remove_field => "thread-id"
            remove_field => "log_time"
            remove_field => "thisdate"
            remove_field => "thistime"
            remove_field => "score"
            remove_field => "id"
            remove_field => "name"
            remove_field => "beat.*"
            remove_field => "fields.*"
            remove_field => "host.*"
            remove_field => "input.type"
            remove_field => "log.file.path"
            remove_field => "offset"
            remove_field => "prospector.type"
            remove_field => "source"
        }

        #ruby {
        #    code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )"
        #}

        date {
            match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ]
            target => "@timestamp"
        }

        ruby {
            code => "event.set('logtime',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
    }}output {
    if [type] == "253_ciphermachine" {
        elasticsearch {
            hosts => ["192.168.1.253:9200"]
            user => "elastic"
            password => "changeme"
            index => "webservice.log-%{+YYYY.MM.dd}"
        }

        if [level]  =~ /(ERR|error|ERROR|Failed)/ {
            zabbix {
                zabbix_host => "[zabbix_host]"
                zabbix_key => "[zabbix_key]"
                zabbix_server_host => "192.168.1.252"
                zabbix_server_port => "10051"
                zabbix_value => "message"
            }
        }
    }}

反覆測試,發現是input部分沒有指定client_id,任意指定client_id便可,重啓logstash後再也不報錯。apache


刪除topic錯誤

在部署過程當中,原先直接使用logstash收集日誌傳輸到kafka存在問題,kafka沒法接收到logstash傳入的數據,而手動去kafka的topic中生產數據,又會致使kafka重複消費的問題,可是我其實配置了auto_offset_reset => "latest",多是個人配置文件存在問題。因而最終放棄了直接用logstash收集日誌到kafka,改由filebeat收集日誌到kafka。json

可是在此以前,我須要刪除以前的topic。可是在刪除topic時,一直提示zookeeper marked,實際topic沒有刪除,只是被zookeeper標記,能夠經過下面解決。bootstrap

  • 修改kafka配置:
# vim /usr/local/kafka/config/server.propertiesdelete.topic.enable=true            #容許刪除topic

  • 刪除topic:
# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.1.253:2181 --topic 253-webservice

若是這樣沒法刪除,進入zookeeper命令行刪除:vim

# /usr/local/kafka/bin/zookeeper-shell.sh 192.168.1.253:2181ls /brokers/topics

rmr /brokers/topics/253-ciphermachine

rmr /brokers/topics/253-webservice

  • 列出當前topic:
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.253:2181
相關文章
相關標籤/搜索