filebeat-2-經過kafka隊列連接logstash

filebeat 直接到logstash, 因爲logstash的設計問題, 可能會出現阻塞問題, 由於中間使用消息隊列分開linux

可使用redis, 或者kafka, 這兒使用的是kafkanginx

1, 安裝

kafka的安裝, 解壓可用, 但須要zookeeper, 內置了一個zookeeper, 直接使用便可redis

1), 啓動內置zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

2), 修改kafka的配置文件

vim ./conf/server.propertiesjson

############################# Server Basics #############################
broker.id=0
delete.topic.enable=true
 
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
 
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

3), 啓動kafkaserver

/bin/kafka-server-start.sh ./config/server.properties &

4),修改filebeat文件, 最終形態

cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v '#' | grep -v '^$'

 

filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/nginx/*.log
  encoding: utf-8
  document_type: my-nginx-log
  scan_frequency: 5s
  harvester_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
output.kafka:
  enabled: true
  hosts: ["www.wenbronk.com:9092"]
  topic: elk-%{[type]}
  worker: 2
  max_retries: 3
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  keep_alive: 60
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 0
  client_id: beats

5), 從新啓動filebeat

./filebeat -c ./filebeat.yml &

6), 修改 logstash的input

input {
    kafka  {
      #codec => "json"
      topics_pattern => "elk-.*"
      bootstrap_servers => "127.0.0.1:9092"
      auto_offset_reset => "latest"
      group_id => "logstash-g1"
    }
}
output {
    elasticsearch {                                  #Logstash輸出到elasticsearch;
      hosts => ["localhost:9200"]                    #elasticsearch爲本地;
      index => "logstash-nginx-%{+YYYY.MM.dd}"       #建立索引;
      document_type => "nginx"                       #文檔類型;
      workers => 1                                   #進程數量;
      user => elastic                                #elasticsearch的用戶;
      password => changeme                           #elasticsearch的密碼;
      flush_size => 20000
      idle_flush_time => 10
 }
}

7), 重啓logstash

8 ), 頁面訪問 nginx, 能夠查看消息隊列中的消息

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning

 

 

 參考: http://www.ywnds.com/?p=9776bootstrap

相關文章
相關標籤/搜索