filebeat 直接到logstash, 因爲logstash的設計問題, 可能會出現阻塞問題, 由於中間使用消息隊列分開linux
可使用redis, 或者kafka, 這兒使用的是kafkanginx
kafka的安裝, 解壓可用, 但須要zookeeper, 內置了一個zookeeper, 直接使用便可redis
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
vim ./conf/server.propertiesjson
############################# Server Basics ############################# broker.id=0 delete.topic.enable=true ############################# Socket Server Settings ############################# listeners=PLAINTEXT://0.0.0.0:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# log.flush.interval.messages=10000 log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
/bin/kafka-server-start.sh ./config/server.properties &
cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v '#' | grep -v '^$'
filebeat.prospectors: - input_type: log paths: - /var/log/nginx/*.log encoding: utf-8 document_type: my-nginx-log scan_frequency: 5s harvester_buffer_size: 16384 max_bytes: 10485760 tail_files: true output.kafka: enabled: true hosts: ["www.wenbronk.com:9092"] topic: elk-%{[type]} worker: 2 max_retries: 3 bulk_max_size: 2048 timeout: 30s broker_timeout: 10s channel_buffer_size: 256 keep_alive: 60 compression: gzip max_message_bytes: 1000000 required_acks: 0 client_id: beats
./filebeat -c ./filebeat.yml &
input { kafka { #codec => "json" topics_pattern => "elk-.*" bootstrap_servers => "127.0.0.1:9092" auto_offset_reset => "latest" group_id => "logstash-g1" } } output { elasticsearch { #Logstash輸出到elasticsearch; hosts => ["localhost:9200"] #elasticsearch爲本地; index => "logstash-nginx-%{+YYYY.MM.dd}" #建立索引; document_type => "nginx" #文檔類型; workers => 1 #進程數量; user => elastic #elasticsearch的用戶; password => changeme #elasticsearch的密碼; flush_size => 20000 idle_flush_time => 10 } }
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning
參考: http://www.ywnds.com/?p=9776bootstrap