Logstash安裝和使用

Logstash 是開源的服務器端數據處理管道,可以同時 從多個來源採集數據、轉換數據,而後將數據發送到您最喜歡的 「存儲庫」 中。(咱們的存儲庫固然是 Elasticsearch。)html

做用:集中、轉換和存儲數據web

官方網站:

https://www.elastic.co/cn/products/logstash

一個民間的中文Logstash最佳實踐:

https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html

 

1.下載Logstash,版本爲6.2.4,下載地址

https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz

2.解壓到目錄

3.啓動Logstash進程,Hello World Demo

bin/logstash -e 'input { stdin { } } output { stdout {} }'

bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

輸入:Hello Worldapache

輸出:json

在這個Demo中,Hello World做爲數據,在線程之間以 事件 的形式流傳。不要叫,由於 logstash 能夠處理多行事件。bootstrap

Logstash 會給事件添加一些額外信息。最重要的就是 @timestamp,用來標記事件的發生時間。由於這個字段涉及到 Logstash 的內部流轉,因此必須是一個 joda 對象,若是你嘗試本身給一個字符串字段重命名爲 @timestamp 的話,Logstash 會直接報錯。因此,請使用 filters/date 插件 來管理這個特殊字段數組

此外,大多數時候,還能夠見到另外幾個:ruby

  1. host 標記事件發生在哪裏。
  2. type 標記事件的惟一類型。
  3. tags 標記事件的某方面屬性。這是一個數組,一個事件能夠有多個標籤。

4.語法

Logstash 設計了本身的 DSL —— 有點像 Puppet 的 DSL,或許由於都是用 Ruby 語言寫的吧 —— 包括有區域,註釋,數據類型(布爾值,字符串,數值,數組,哈希),條件判斷,字段引用等。bash

區段(section)

Logstash 用 {} 來定義區域。區域內能夠包括插件區域定義,你能夠在一個區域內定義多個插件。插件區域內則能夠定義鍵值對設置。示例以下:服務器

input {
    stdin {}
    syslog {}
}

數據類型

Logstash 支持少許的數據值類型:elasticsearch

bool   debug => true 
string  host => "hostname" 
number    port => 514
array    match => ["datetime", "UNIX", "ISO8601"]
hash
options => {
    key1 => "value1",
    key2 => "value2"
}

條件判斷(condition)

表達式支持下面這些操做符:
    equality, etc: ==, !=, <, >, <=, >=
    regexp: =~, !~
    inclusion: in, not in
    boolean: and, or, nand, xor
    unary: !()

好比:
if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ and [url] == "/noc.gif" {
} else {
}

命令行參數:logstash命令

參數:
執行    -e              bin/logstash -e ''
文件    --config 或 -f       bin/logstash -f agent.conf
測試    --configtest 或 -t     用來測試 Logstash 讀取到的配置文件語法是否能正常解析。
日誌    --log 或 -l         Logstash 默認輸出日誌到標準錯誤。生產環境下你能夠經過 bin/logstash -l logs/logstash.log 命令來統一存儲日誌。

 

使用Logstash的Kafka插件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

啓動一個kafka做爲輸入,並輸入1231212

~/software/apache/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 stdin.conf文件

input{
        kafka{
                bootstrap_servers => ["127.0.0.1:9092"]
                topics => [ 'test' ]
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

啓動logstash

bin/logstash -f stdin.conf

輸出

關於auto_offset_reset參數

因爲Kafka是消息隊列,消費過的就不會再消費

<i>能夠在stdin.conf中設置auto_offset_reset="earliest",好比

input{
        kafka{
                bootstrap_servers => ["127.0.0.1:9092"]
                topics => [ 'test' ]
                auto_offset_reset => "earliest"
        }
}
output {
        stdout {
                codec => rubydebug
        }
}

在kafka中依次輸入

1111
2222
3333

輸出爲,注意這裏timestamp的時間是1111 -> 2222 -> 3333,logstash會從頭開始消費沒有消費的消息

 

<ii>當auto_offset_reset="latest"

logstash會從進程啓動的時候開始消費消息,以前的消息會丟棄

在kafka中依次輸入

1111
2222
3333

輸出爲

 

Kafka -> logstash -> Es的conf文件

input{
	kafka{
		bootstrap_servers => ["127.0.0.1:9092"]
		topics => [ 'topicB' ]
		auto_offset_reset => "earliest"
		consumer_threads => 1
		codec => json
	}
}
output {
	elasticsearch{
		hosts => ["127.0.0.1:9200"]
		index => "XXX"
	}
}

Kafka -> logstash -> File的conf文件

參考

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-file.html

注意:若是是kafka輸入是line格式的,使用codec => line { format => "custom format: %{message}"}

關於codec的說明

https://www.elastic.co/guide/en/logstash/6.2/codec-plugins.html

若是kafka輸入是json格式的,使用codec => json

input{
	kafka{
		bootstrap_servers => ["127.0.0.1:9092"]
		topics => [ 'topicB' ]
		auto_offset_reset => "earliest"
		consumer_threads => 1
		codec => json
	}
}
output {
	stdout {
		codec => rubydebug {}
    	}
	file {
		path => "/home/lintong/桌面/logs/path/to/1.txt"
		#codec => line { format => "custom format: %{message}"}		
		codec => json
	}
}

 

使用Logstash的HDFS插件

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-webhdfs.html

配置文件

input{
	kafka{
		bootstrap_servers => ["127.0.0.1:9092"]
		topics => [ 'topicB' ]
		auto_offset_reset => "earliest"
		consumer_threads => 1
		codec => json
	}
}
output {
	stdout {
		codec => rubydebug {}
    	}
	webhdfs {
		host => "127.0.0.1"                 # (required)
		port => 50070                       # (optional, default: 50070)
		path => "/user/lintong/xxx/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log"  # (required)
		user => "lintong"                       # (required)
		codec => json
	}
}

到 http://localhost:50070 下看文件內容

相關文章
相關標籤/搜索