業務目的是可以分析nginx和apache天天產生的日誌,對url、ip、rest接口等信息進行監控,並將數據發送到elasticsearch服務。html
不重複消費,數據不丟失 目前flume支持hdfs比較好(我的理解)
先配置JAVA_HOME 必須java8以上java
下載解壓便可nginx
bin/logstash -e 'input { stdin {} } output { stdout{} }'
首先在logstash中mkdir conf & touch file-stdout.conf vim file-stdout.conf input { file { path => "/home/bingo/data/test.log" start_position => "beginning" ignore_older => 0 } } output { stdout{} } 最後啓動 bin/logstash -f conf/file-stdout.conf #多文件 path => "/home/bingo/data/*.log"、 #多目錄 path => "/home/bingo/data/*/*.log" #參數說明 start_position:默認end,是從文件末尾開始解析 ignore_older:默認超過24小時的日誌不解析,0表示不忽略任何過時日誌
執行命令後會看到控制檯輸出log文件的內容apache
touch file-file.conf vim file-file.conf input { file { path => "/home/connect/install/data/test.log" start_position => "beginning" ignore_older => 0 } } output { file { path => "/home/connect/install/data/test1.log" } stdout{ codec => rubydebug } }
touch file-es.conf vim file-es.conf input { file { type => "flow" path => "/home/bingo/data/logstash/logs/*/*.txt" discover_interval => 5 start_position => "beginning" } } output { if [type] == "flow" { elasticsearch { index => "flow-%{+YYYY.MM.dd}" hosts => ["master01:9200", "worker01:9200", "worker02:9200"] } } }
touch kafka-es.conf vim kafka-es.conf input { kafka { zk_connect => "master01:2181", "worker01:2181", "worker02:2181" auto_offset_reset => "smallest" group_id => "bdes_clm_bs_tracking_log_json" topic_id => "clm_bs_tracking_log_json" consumer_threads => 2 codec => "json" queue_size => 500 fetch_message_max_bytes => 104857600 } } output { elasticsearch { hosts => ["A:9900","B:9900","C:9900"] document_type => "bs_tracking_log" #document_id => "%{[mblnr]}%{[mjahr]}" flush_size => 102400 index => "clm" timeout => 10 } }
參考:Logstash 基礎入門json