第1章 環境說明:
java
現有架構爲elk+kafka+filebeat,elk各組件爲5.2.x版本node
[root@logstash-node-1 ~]# rpm -qa |grep logstashnginx
logstash-5.2.2-1.noarchapache
[root@logstash-node-1 ~]# java -versionjson
java version "1.8.0_181"bootstrap
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)服務器
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)架構
因爲logstash5.x版本不支持獨立的pipeline,須要大量的if-else判斷,配置文件管理起來也比較複雜,而新的pipeline配置相對獨立,能夠針對每一個業務的日誌類型來進行管理app
這裏只升級了logstash組件,驗證是能夠和es 5.x版本配置使用的,沒有問題elasticsearch
cat /etc/logstash/pipelines.yml
- pipeline.id: nginx_access
path.config: "/etc/logstash/conf.d/nginx_access.yml"
cat /etc/logstash/conf.d/nginx_access.yml
input {
kafka {
bootstrap_servers => "127.0.0.1:9020"
group_id => "logstash"
consumer_threads => 5
topics => "nginx_access"
codec => "json"
}
}
filter {
grok {
patterns_dir => [ "/etc/logstash/patterns.d/" ]
match => {
message => ["%{WPT_NGX_COMM}"]
}
}
mutate {
split => ["request" , "?"]
add_field => {
"uri_path" => "%{[request][0]}"
"uri_query" => "%{[request][1]}"
}
remove_field => ["request"]
convert => {
"response" => "integer"
"body_bytes_sent" => "integer"
"request_time" => "float"
"upstream_response_time" => "float"
}
}
useragent {
source => "user_agent"
lru_cache_size => 5000
}
date {
timezone => "Asia/Shanghai"
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss" ]
target => "@timestamp"
remove_field => "timestamp"
}
}
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
}
調試期間能夠使用./bin/logstash –r 來檢查配置
systemctl stop logstash.service
rpm –e logstash-5.2.2-1.noarch
yum localinstall logstash-6.5.4.rpm
配置文件我本地打包好了上傳到服務器上
在pipeline文件中,我只開啓了一個管道進行驗證服務是否有問題,而且在output中,同時讓數據打到文件一份,用來驗證數據解析是否正常,最後啓動logstash服務便可
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
file {
path => "/tmp/test.log"
}
日誌中沒有報錯,而且logstash工做也是正常的,可是有警告,網上查了一下,多是與kafka版本不匹配而致使
[2018-12-26T14:39:05,424][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-7
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_121]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_121]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) [kafka-clients-2.0.1.jar:?]
at sun.reflect.GeneratedConstructorAccessor47.newInstance(Unknown Source) [?:1.8.0_121]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [?:1.8.0_121]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) [?:1.8.0_121]