logstash安裝及簡單測試

背景

業務目的是可以分析nginx和apache天天產生的日誌,對url、ip、rest接口等信息進行監控,並將數據發送到elasticsearch服務。html

對比flume

不重複消費,數據不丟失

目前flume支持hdfs比較好(我的理解)

離線安裝

先配置JAVA_HOME 必須java8以上java

下載解壓便可nginx

標準輸入輸出

bin/logstash -e 'input { stdin {} } output { stdout{} }'

文件到標準輸出

首先在logstash中mkdir conf & touch file-stdout.conf

vim file-stdout.conf
input {
    file {
        path => "/home/bingo/data/test.log"
        start_position => "beginning"
        ignore_older => 0
    }
}
output {
    stdout{}
}

最後啓動
bin/logstash -f conf/file-stdout.conf 




#多文件  path => "/home/bingo/data/*.log"、
#多目錄  path => "/home/bingo/data/*/*.log"

#參數說明
start_position:默認end,是從文件末尾開始解析
ignore_older:默認超過24小時的日誌不解析,0表示不忽略任何過時日誌

執行命令後會看到控制檯輸出log文件的內容apache

  • 此方式能夠持續監控一個文件的輸入

文件到文件

  • 啓動方式和文件到標準輸出相同,不一樣之處在於配置文件:
touch file-file.conf
vim file-file.conf

input {
    file {
        path => "/home/connect/install/data/test.log"
        start_position => "beginning"
        ignore_older => 0
    }
}
output {
    file {
        path => "/home/connect/install/data/test1.log"
        }
       stdout{
        codec => rubydebug
}
}

上游到elasticsearch

touch file-es.conf

vim file-es.conf

input {
  file {
    type => "flow"
    path => "/home/bingo/data/logstash/logs/*/*.txt"
    discover_interval => 5
    start_position => "beginning" 
  }
}

output {
  if [type] == "flow" {
    elasticsearch {
      index => "flow-%{+YYYY.MM.dd}"
      hosts => ["master01:9200", "worker01:9200", "worker02:9200"]
    }
  }  
}

上游到kafka

kafka到es

touch kafka-es.conf
vim kafka-es.conf

input {
        kafka {
               zk_connect => "master01:2181", "worker01:2181", "worker02:2181"
               auto_offset_reset => "smallest"
               group_id => "bdes_clm_bs_tracking_log_json"
               topic_id => "clm_bs_tracking_log_json"
               consumer_threads => 2
               codec => "json"
               queue_size => 500
               fetch_message_max_bytes => 104857600
        }
}

output {
        elasticsearch {
                hosts => ["A:9900","B:9900","C:9900"]
                document_type => "bs_tracking_log"
                #document_id => "%{[mblnr]}%{[mjahr]}"
                flush_size => 102400
                index => "clm"
                timeout => 10
        }
}

參考:Logstash 基礎入門json

相關文章
相關標籤/搜索