下載logstash-6.2.3.tar.gz版本,並經過Xftp5上傳到hadoop機器集羣的第一個節點node1上的/opt/uploads/目錄:java
tar zxvf logstash-6.2.3.tar.gznode
mv logstash-6.2.3 /opt/app/ && cd /opt/app/python
sudo vi /etc/profilenginx
添加以下內容:json
export LOGSTASH_HOME=/opt/app/logstash-6.2.3
export PATH=:$PATH:$LOGSTASH_HOME/binbootstrap
使環境變量生效:source /etc/profileapp
輸入源爲nginx的日誌文件,輸出源爲kafkaelasticsearch
input { file { path => "/var/logs/nginx/*.log" discover_interval => 5 start_position => "beginning" } } output { kafka { topic_id => "accesslog" codec => plain { format => "%{message}" charset => "UTF-8" } bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092" } }
輸入源爲txt文件,輸出源爲kafkaoop
input { file { codec => plain { charset => "GB2312" } path => "D:/GameLog/BaseDir/*/*.txt" discover_interval => 30 start_position => "beginning" } } output { kafka { topic_id => "gamelog" codec => plain { format => "%{message}" charset => "GB2312" } bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092" } }
輸入源爲nginx的日誌文件,輸出源爲elasticsearch測試
input { file { type => "flow" path => "var/logs/nginx/*.log" discover_interval => 5 start_position => "beginning" } } output { if [type] == "flow" { elasticsearch { index => "flow-%{+YYYY.MM.dd}" hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"] } } }
輸入源爲kafka的accesslog和gamelog主題,並在中間分別針對accesslog和gamelog進行過濾,輸出源爲elasticsearch。當input裏面有多個kafka輸入源時,client_id => "es*"必須添加且須要不一樣,不然會報錯javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。
input { kafka { type => "accesslog" codec => "plain" auto_offset_reset => "earliest" client_id => "es1" group_id => "es1" topics => ["accesslog"] bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092" } kafka { type => "gamelog" codec => "plain" auto_offset_reset => "earliest" client_id => "es2" group_id => "es2" topics => ["gamelog"] bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092" } } filter { if [type] == "accesslog" { json { source => "message" remove_field => ["message"] target => "access" } } if [type] == "gamelog" { mutate { split => { "message" => " " } add_field => { "event_type" => "%{message[3]}" "current_map" => "%{message[4]}" "current_x" => "%{message[5]}" "current_y" => "%{message[6]}" "user" => "%{message[7]}" "item" => "%{message[8]}" "item_id" => "%{message[9]}" "current_time" => "%{message[12]}" } remove_field => ["message"] } } } output { if [type] == "accesslog" { elasticsearch { index => "accesslog" codec => "json" hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"] } } if [type] == "gamelog" { elasticsearch { index => "gamelog" codec => plain { charset => "UTF-16BE" } hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"] } } }
注:UTF-16BE爲解決中文亂碼,而不是UTF-8
logstash -f /opt/app/logstash-6.2.3/conf/flow-kafka.conf
1) 在使用logstash採集日誌時,若是咱們採用file爲input類型,採用不能反覆對一份文件進行測試!第一次會成功,以後就會失敗!
參考資料:
https://blog.csdn.net/lvyuan1234/article/details/78653324