logstash的各個場景應用(配置文件均已實踐過)

場景:html

1) datasource->logstash->elasticsearch->kibanajava

2) datasource->filebeat->logstash-> elasticsearch->kibananode

3) datasource->filebeat->logstash->redis/kafka->logstash-> elasticsearch->kibanamysql

4) kafka->logstash-> elasticsearch->kibanagit

5) datasource->filebeat->kafka->logstash->elasticsearch->kibana(最經常使用)github

6) filebeatSSL加密傳輸web

7) datasource->logstash->redis/kafka->logstash->elasticsearch->kibana正則表達式

8) mysql->logstash->elasticsearch->kibanaredis

上述主要是對下面傳輸處理場景的一個歸納,從數據源開始,如何採集,用什麼工具採集,採集到哪裏,通過怎樣的處理過濾,傳輸到哪裏,怎樣進行展現sql

 

輸入、輸出、過濾主要經過插件實現(包含多類型插件),插件教程參考官網

https://www.elastic.co/guide/en/logstash/current/index.html

 【安裝部署這種官網或者社區已經很完善,此處不作贅述,可自行去官網查看】

 ps【redis集羣安裝文檔前面已經說明過,可自行查看】

 

前提條件

1) java環境:jdk8;

2) elk已搭建完畢;

3) elasticsearch、kibana、logstash版本最好保持一致,目前環境是5.6.10版本

4) logstash建議使用root用戶(擁有足夠權限去搜集所需日誌文件);

5) elasticsearch使用普通用戶安裝,新版本已限制不容許root安裝;

6) filebeat安裝完畢

啓動命令:

7) logstash啓動命令:

nohup ./bin/logstash -f ***.conf –config.reload.automatic >/dev/null 2>/dev/null &

8) filebeat啓動命令: nohup ./filebeat -e -c filebeat.yml>/dev/null 2>/dev/null &

9)elasticsearch啓動命令:./elasticsearch -d  

10)kibana啓動命令:nohup ./bin/kibana & 

 

Logstash啓動命令:--config.reload.automatic自動從新加載配置文件,無需重啓logstash

filebeat啓動命令:-e參數指定輸出日誌到stderr-c參數指定配置文件路徑

 

場景介紹

1、 簡單模式:以logstash做爲日誌搜索器

架構:logstash採集、處理、轉發到elasticsearch存儲,在kibana進行展現

特色:這種結構由於須要在各個服務器上部署 Logstash,而它比較消耗 CPU 和內存資源,因此比較適合計算資源豐富的服務器,不然容易形成服務器性能降低,甚至可能致使沒法正常工做。

 

Demo1:

test1.conf:

控制檯輸入,不通過任何處理轉換(僅傳輸),輸出到控制檯(或者elasticsearch、文件----自行選擇):

#控制檯輸入

input { stdin { } }

output {

     #codec輸出到控制檯

stdout { codec=> rubydebug }

#輸出到elasticsearch

elasticsearch {

        hosts => "node18:9200"

        codec => json

        }

#輸出到文件

file {

    path => "/usr/local/logstash-5.6.10/data/log/logstash/all.log" #指定寫入文件路徑

    flush_interval => 0                  # 指定刷新間隔,0表明實時寫入

    codec => json

    }

}

2、 安全模式:beats(FilebeatMetricbeatPacketbeatWinlogbeat等)做爲日誌蒐集器

Packetbeat(蒐集網絡流量數據);

Topbeat(蒐集系統、進程和文件系統級別的 CPU 和內存使用狀況等數據);

Filebeat(蒐集文件數據)-------最經常使用

Winlogbeat(蒐集 Windows 事件日誌數據)。

架構:

 

工做模式:Beats 將蒐集到的數據發送到 Logstash,經 Logstash 解析、過濾後,將其發送到 Elasticsearch 存儲,並由 Kibana 呈現給用戶;

模式特色:這種架構解決了 Logstash 在各服務器節點上佔用系統資源高的問題。相比 Logstash,Beats 所佔系統的 CPU 和內存幾乎能夠忽略不計。另外,Beats 和 Logstash 之間支持 SSL/TLS 加密傳輸,客戶端和服務器雙向認證,保證了通訊安全。

所以這種架構適合對數據安全性要求較高,同時各服務器性能比較敏感的場景

Demo2:

filebeat.yml:

################# Filebeat Configuration Example ########################

 

# This file is an example configuration file highlighting only the most common

# options. The filebeat.full.yml file from the same directory contains all the

# supported options with more comments. You can use it as a reference.

#

# You can find the full configuration reference here:

# https://www.elastic.co/guide/en/beats/filebeat/index.html

 

#===================== Filebeat prospectors =====================

 

filebeat.prospectors:

 

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

 

- input_type: log

 

  # Paths that should be crawled and fetched. Glob based paths.

  paths:

    - /home/admin/helloworld/logs/*.log

    #- c:\programdata\elasticsearch\logs\*

 

  # Exclude lines. A list of regular expressions to match. It drops the lines that are

  # matching any regular expression from the list.

  #exclude_lines: ["^DBG"]

 

  # Include lines. A list of regular expressions to match. It exports the lines that are

  # matching any regular expression from the list.

  #include_lines: ["^ERR", "^WARN"]

 

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that

  # are matching any regular expression from the list. By default, no files are dropped.

  #exclude_files: [".gz$"]

 

  # Optional additional fields. These field can be freely picked

  # to add additional information to the crawled log files for filtering

  #fields:

  #  level: debug

  #  review: 1

 

  ### Multiline options

 

  # Mutiline can be used for log messages spanning multiple lines. This is common

  # for Java Stack Traces or C-Line Continuation

 

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [

  #multiline.pattern: ^\[

 

  # Defines if the pattern set under pattern should be negated or not. Default is false.

  #multiline.negate: false

 

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern

  # that was (not) matched before or after or as long as a pattern is not matched based on negate.

  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash

  #multiline.match: after

 

 

#====================== General =============================

 

# The name of the shipper that publishes the network data. It can be used to group

# all the transactions sent by a single shipper in the web interface.

#name:

 

# The tags of the shipper are included in their own field with each

# transaction published.

#tags: ["service-X", "web-tier"]

 

# Optional fields that you can specify to add additional information to the

# output.

#fields:

#  env: staging

 

#======================= Outputs ===========================

 

# Configure what outputs to use when sending the data collected by the beat.

# Multiple outputs may be used.

 

#-------------------------- Elasticsearch output ------------------------------

#output.elasticsearch:

  # Array of hosts to connect to.

  # hosts: ["localhost:9200"]

 

  # Optional protocol and basic auth credentials.

  #protocol: "https"

  #username: "elastic"

  #password: "changeme"

 

#--------------------------- Logstash output --------------------------------

output.logstash:

  # The Logstash hosts

  hosts: ["192.168.80.34:5044"]

 

  # Optional SSL. By default is off.

  # List of root certificates for HTTPS server verifications

  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

 

  # Certificate for SSL client authentication

  #ssl.certificate: "/etc/pki/client/cert.pem"

 

  # Client Certificate Key

  #ssl.key: "/etc/pki/client/cert.key"

 

#=========================== Logging =======================

 

# Sets log level. The default log level is info.

# Available log levels are: critical, error, warning, info, debug

#logging.level: debug

 

# At debug level, you can selectively enable logging only for some components.

# To enable all selectors use ["*"]. Examples of other selectors are "beat",

# "publish", "service".

#logging.selectors: ["*"]

 

 

34服務器----test2.conf:

input {

    beats {

    port => 5044

    codec => "json"

}

}

#filters{

#…………(後續進行說明)

#}

 

output {

    # 輸出到控制檯

    # stdout { }

 

    # 輸出到redis

    redis {

        host => "192.168.80.32"   # redis主機地址

        port => 6379              # redis端口號

        password => "123456"          # redis 密碼

        #db => 8                   # redis數據庫編號

        data_type => "channel"    # 使用發佈/訂閱模式

        key => "logstash_list_0"  # 發佈通道名稱

}

#輸出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id         => "test" 

       }

#輸出到es

elasticsearch {

        hosts => "node18:9200"

        codec => json

        }

}

3、 消息模式:Beats 還不支持輸出到消息隊列新版本除外:5.0版本及以上),因此在消息隊列先後兩端只能是 Logstash 實例。logstash從各個數據源蒐集數據,不通過任何處理轉換僅轉發出到消息隊列(kafka、redis、rabbitMQ等),後logstash從消息隊列取數據進行轉換分析過濾,輸出到elasticsearch,並在kibana進行圖形化展現

架構(Logstash進行日誌解析所在服務器性能各方面必需要足夠好):

 

模式特色這種架構適合於日誌規模比較龐大的狀況。但因爲 Logstash 日誌解析節點和 Elasticsearch 的負荷比較重,可將他們配置爲集羣模式,以分擔負荷。引入消息隊列,均衡了網絡傳輸,從而下降了網絡閉塞,尤爲是丟失數據的可能性,但依然存在 Logstash 佔用系統資源過多的問題

工做流程:Filebeat採集—>  logstash轉發到kafka—>  logstash處理從kafka緩存的數據進行分析—>  輸出到es—>  顯示在kibana

Msg1.conf:

input {

    beats {

    port => 5044

    codec => "json"

       }

    syslog{

       }

}

 

#filter{

#

#}

 

output {

    # 輸出到控制檯

    # stdout { }

 

    # 輸出到redis

    redis {

        host => "192.168.80.32"   # redis主機地址

        port => 6379              # redis端口號

        password => "123456"          # redis 密碼

       #db => 8                   # redis數據庫編號

        data_type => "channel"    # 使用發佈/訂閱模式

        key => "logstash_list_0"  # 發佈通道名稱

    }

    #輸出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

       }     

}

Msg2.conf:

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

           topics          => ["test"]

           #decroate_events   => true

        group_id          => "consumer-test"(消費組)

           #decroate_events  => true

        auto_offset_reset => "earliest"(初始消費,至關於from beginning,不設置,至關因而監控啓動後的kafka的消息生產)

   }

}

#filter{

#}

output {

       elasticsearch {

       hosts => "192.168.80.18:9200"   

       codec => json

       }

}

 4、logstash從kafka消息隊列直接讀取數據並處理、輸出到es(由於從kafka內部直接讀取,至關因而已經在緩存內部,直接logstash處理後就能夠進行輸出,輸出到文件、es等)

工做模式:【數據已存在kafka對應主題內】單獨的logstash,kafka讀取,通過處理輸出到es並在kibana進行展現

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

            topics          => ["test"]

         group_id       => "consumer-test"

         #decroate_events  => true

       auto_offset_reset => "earliest"

   }

 

}

#flter{

#

#}

 

  • output {

       elasticsearch {

       hosts => "192.168.80.18:9200"

       codec => json

       }

      

}

5、filebeat新版本(5.0以上)支持直接支持輸出到kafka,而無需通過logstash接收轉發到kafka

 

Filebeat採集完畢直接入到kafka消息隊列,進而logstash取出數據,進行處理分析輸出到es,並在kibana進行展現。

filebeat.yml:

################# Filebeat Configuration Example #########################

 

# This file is an example configuration file highlighting only the most common

# options. The filebeat.full.yml file from the same directory contains all the

# supported options with more comments. You can use it as a reference.

#

# You can find the full configuration reference here:

# https://www.elastic.co/guide/en/beats/filebeat/index.html

 

#================== Filebeat prospectors===========================

 

filebeat.prospectors:

 

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

 

- input_type: log

 

  # Paths that should be crawled and fetched. Glob based paths.

  paths:

    - /home/admin/helloworld/logs/*.log

    #- c:\programdata\elasticsearch\logs\*

 

  # Exclude lines. A list of regular expressions to match. It drops the lines that are

  # matching any regular expression from the list.

  #exclude_lines: ["^DBG"]

 

  # Include lines. A list of regular expressions to match. It exports the lines that are

  # matching any regular expression from the list.

  #include_lines: ["^ERR", "^WARN"]

 

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that

  # are matching any regular expression from the list. By default, no files are dropped.

  #exclude_files: [".gz$"]

 

  # Optional additional fields. These field can be freely picked

  # to add additional information to the crawled log files for filtering

  #fields:

  #  level: debug

  #  review: 1

 

  ### Multiline options

 

  # Mutiline can be used for log messages spanning multiple lines. This is common

  # for Java Stack Traces or C-Line Continuation

 

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [

  #multiline.pattern: ^\[

 

  # Defines if the pattern set under pattern should be negated or not. Default is false.

  #multiline.negate: false

 

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern

  # that was (not) matched before or after or as long as a pattern is not matched based on negate.

  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash

  #multiline.match: after

 

#============================ General=========================

 

# The name of the shipper that publishes the network data. It can be used to group

# all the transactions sent by a single shipper in the web interface.

#name:

 

# The tags of the shipper are included in their own field with each

# transaction published.

#tags: ["service-X", "web-tier"]

 

# Optional fields that you can specify to add additional information to the

# output.

#fields:

#  env: staging

 

#======================== Outputs ============================

 

# Configure what outputs to use when sending the data collected by the beat.

# Multiple outputs may be used.

 

#-------------------------- Elasticsearch output ------------------------------

#output.elasticsearch:

  # Array of hosts to connect to.

  # hosts: ["localhost:9200"]

 

  # Optional protocol and basic auth credentials.

  #protocol: "https"

  #username: "elastic"

  #password: "changeme"

 

#----------------------------- Logstash output --------------------------------

#output.logstash:

  # The Logstash hosts

#  hosts: ["192.168.80.34:5044"]

 

#-----------------------------kafka  output-----------------------------------

#output.kafka:

#  enabled: true

#  hosts: ["192.168.80.42:9092,192.168.80.43:9092,192.168.80.44:9092"]

#  topics: 'test'

output.kafka:

  hosts: ["192.168.80.42:9092"]

  topic: test

  required_acks: 1

 

 

  # Optional SSL. By default is off.

  # List of root certificates for HTTPS server verifications

  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

 

  # Certificate for SSL client authentication

  #ssl.certificate: "/etc/pki/client/cert.pem"

 

  # Client Certificate Key

  #ssl.key: "/etc/pki/client/cert.key"

 

#======================== Logging ============================

 

# Sets log level. The default log level is info.

# Available log levels are: critical, error, warning, info, debug

#logging.level: debug

 

# At debug level, you can selectively enable logging only for some components.

# To enable all selectors use ["*"]. Examples of other selectors are "beat",

# "publish", "service".

#logging.selectors: ["*"]

 

logstash.conf:

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

            topics          => ["test"]

         group_id       => "consumer-test"

         #decroate_events  => true

       auto_offset_reset => "earliest"

   }

 

}

#flter{

#

#}

 

  • output {

       elasticsearch {

       hosts => "192.168.80.18:9200"

       codec => json

       }

      

}

 

6、SSL加密傳輸(加強安全性,僅配置了祕鑰和證書的filebeat服務器和logstash服務器才能進行日誌文件數據的傳輸):

參考文檔: https://blog.csdn.net/zsq12138/article/details/78753369

參考文檔:https://blog.csdn.net/Gamer_gyt/article/details/69280693?locationNum=5&fps=1

Logstash的配置文件:

註釋:

ssl_certificate_authorities :filebeat端傳來的證書所在位置

ssl_certificate => 本端生成的證書所在的位置

ssl_key => /本端生成的密鑰所在的位置

ssl_verify_mode => "force_peer"

beat.conf:

input {

    beats {

    port => 5044

    codec => "json"

    ssl => true

   ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]

   ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"

   ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"

ssl_verify_mode => "force_peer"#(需與ssl_certificate_authorities一塊兒使用

       }

    syslog{

       }

}

 

output {

    # 輸出到控制檯

    # stdout { }

 

    # 輸出到redis

    redis {

        host => "192.168.80.32"   # redis主機地址

        port => 6379              # redis端口號

        password => "123456"          # redis 密碼

       #db => 8                   # redis數據庫編號

        data_type => "channel"    # 使用發佈/訂閱模式

        key => "logstash_list_0"  # 發佈通道名稱

    }

    #輸出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

       }     

    #輸出到es

    elasticsearch {

       hosts => "node18:9200"

       codec => json

       }

 

}

 

filebeat的配置文件:

filebeat.yml:

################ #Filebeat Configuration Example #####################

 

# This file is an example configuration file highlighting only the most common

# options. The filebeat.full.yml file from the same directory contains all the

# supported options with more comments. You can use it as a reference.

#

# You can find the full configuration reference here:

# https://www.elastic.co/guide/en/beats/filebeat/index.html

 

#=================== Filebeat prospectors ========================

 

filebeat.prospectors:

 

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

 

- input_type: log

 

  # Paths that should be crawled and fetched. Glob based paths.

  paths:

    - /home/admin/helloworld/logs/*.log

    #- c:\programdata\elasticsearch\logs\*

 

  # Exclude lines. A list of regular expressions to match. It drops the lines that are

  # matching any regular expression from the list.

  #exclude_lines: ["^DBG"]

 

  # Include lines. A list of regular expressions to match. It exports the lines that are

  # matching any regular expression from the list.

  #include_lines: ["^ERR", "^WARN"]

 

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that

  # are matching any regular expression from the list. By default, no files are dropped.

  #exclude_files: [".gz$"]

 

  # Optional additional fields. These field can be freely picked

  # to add additional information to the crawled log files for filtering

  #fields:

  #  level: debug

  #  review: 1

 

  ### Multiline options

 

  # Mutiline can be used for log messages spanning multiple lines. This is common

  # for Java Stack Traces or C-Line Continuation

 

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [

  #multiline.pattern: ^\[

 

  # Defines if the pattern set under pattern should be negated or not. Default is false.

  #multiline.negate: false

 

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern

  # that was (not) matched before or after or as long as a pattern is not matched based on negate.

  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash

  #multiline.match: after

 

#======================== General ============================

 

# The name of the shipper that publishes the network data. It can be used to group

# all the transactions sent by a single shipper in the web interface.

#name:

 

# The tags of the shipper are included in their own field with each

# transaction published.

#tags: ["service-X", "web-tier"]

 

# Optional fields that you can specify to add additional information to the

# output.

#fields:

#  env: staging

 

#========================= Outputs ===========================

 

# Configure what outputs to use when sending the data collected by the beat.

# Multiple outputs may be used.

 

#----------------------------- Elasticsearch output ------------------------------

#output.elasticsearch:

  # Array of hosts to connect to.

  # hosts: ["localhost:9200"]

 

  # Optional protocol and basic auth credentials.

  #protocol: "https"

  #username: "elastic"

  #password: "changeme"

 

#----------------------------- Logstash output --------------------------------

output.logstash:

# The Logstash hosts

  hosts: ["192.168.80.18:5044"]

#加密傳輸

  ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"]

  ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt"

  ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key" 

 

#----------------------------- kafka  output-----------------------------------

#output.kafka:

#  hosts: ["192.168.80.42:9092"]

#  topic: test

#  required_acks: 1

 

  # Optional SSL. By default is off.

  # List of root certificates for HTTPS server verifications

  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

 

  # Certificate for SSL client authentication

  #ssl.certificate: "/etc/pki/client/cert.pem"

 

  # Client Certificate Key

  #ssl.key: "/etc/pki/client/cert.key"

 

#========================== Logging =========================

 

# Sets log level. The default log level is info.

# Available log levels are: critical, error, warning, info, debug

#logging.level: debug

 

# At debug level, you can selectively enable logging only for some components.

# To enable all selectors use ["*"]. Examples of other selectors are "beat",

# "publish", "service".

#logging.selectors: ["*"]

  7、logstash(非filebeat)進行文件採集,輸出到kafka緩存,讀取kafka數據並處理輸出到文件或es

讀數據:

kafkaput.conf:

input {

    file {

        path => [

            # 這裏填寫須要監控的文件

            "/home/admin/helloworld/logs/catalina.out"

        ]

    }

}

 

output {

    kafka {

    # 輸出到控制檯

    # stdout { }

    # 輸出到kafka

    bootstrap_servers => "192.168.80.42:9092"

    topic_id          => "test"

    }

}

 

取數據

indexer.conf

input{

#從redis讀取

 redis {

        host => "192.168.80.32"   # redis主機地址

        port => 6379              # redis端口號

       password  => "123456"      # redis 密碼

        #db => 8                   # redis數據庫編號

        data_type => "channel"    # 使用發佈/訂閱模式

        key => "logstash_list_0"  # 發佈通道名稱

}

#從kafka讀取

 kafka {

        bootstrap_servers => "192.168.80.42:9092"

           topics          => ["test"]

        auto_offset_reset => "earliest"

       }

}

 

output {

    #輸出到文件

    file {

        path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定寫入文件路徑

#       message_format => "%{host} %{message}"         # 指定寫入格式

        flush_interval => 0                             # 指定刷新間隔,0表明實時寫入

     codec => json

       }

   #輸出到es

   elasticsearch {

       hosts => "node18:9200"

       codec => json

       }

}

8、logstash同步mysql數據庫數據到es(logstash5版本以上已集成jdbc插件,無需下載安裝,直接使用)

mysql2es.conf:

input {

 stdin { }

    jdbc {

        jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql"

        jdbc_user => "fyyq"

        jdbc_password => "fyyq@2017"

   jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar"

        jdbc_driver_class => "com.mysql.jdbc.Driver"

        jdbc_paging_enabled => "true"

        statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql"

        #schedule => "* * * * *"

    }

 }

 

 output {

     stdout {

        codec => json_lines

    }

    elasticsearch {

        hosts => "node18:9200"

        #index => "mainIndex"

        #document_type => "user"

        #document_id => "%{id}"

    }

}

 

mysql2es.sql:

 

select * from sys_log

 

 9、logstash輸出到hdfs文件

input {

    beats {

      port => 5044

      #codec => "json"

      ssl => true

   ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]

      ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"

      ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"

      ssl_verify_mode => "force_peer"

                           }

}

 

filter{

   grok {

       match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}

}

}

 

 

output {

    # 輸出到控制檯

    # stdout { }

 

    # 輸出到redis

    redis {

        host => "192.168.80.32"   # redis主機地址

        port => 6379              # redis端口號

        password => "123456"          # redis 密碼

       #db => 8                   # redis數據庫編號

        data_type => "channel"    # 使用發佈/訂閱模式

        key => "logstash_list_0"  # 發佈通道名稱

    }

    #輸出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

                           }      

    #輸出到es

    elasticsearch {

                           hosts => "node18:9200"

                           codec => json

                           }

    #輸出到hdfs

     webhdfs {

     host => "192.168.80.42"

     port => 50070

     path => "/user/logstash/dt=%{+YYYY-MM-dd}/%{@source_host}-%{+HH}.log"

     user => "hadoop"

       }

}

 10、Logstash-input插件及插件參數概覽

僅以beat插件爲例,後續插件將以鏈接形式提供(都是官網標準介紹)

全部輸入插件都支持如下配置選項:

Setting

Input type

Required

add_field

hash

No(默認爲{})

codec

codec

No(輸入數據的編解碼器,默認「plain」

enable_metric

boolean

No(默認true)

id

string

No(自動生成,但最好自行定義)

tags

array

No

type

string

No

codec:可選

json (json格式編解碼器)

msgpack (msgpack格式編解碼器)

plain(文本格式編解碼器)

multiline(將多行文本event合併成一個event,eg:將java中的異常跟蹤日誌合併成一條消)]

 

經常使用輸入插件:

一、beat-input:Receives events from the Elastic Beats framework,從框架接收事件

Settings:

Setting

Input type

Required

cipher_suites

array

No

client_inactivity_timeout

number

No

host

string

No

include_codec_tag

boolean

No

port

number

Yes(必填項)

ssl

boolean

No

ssl_certificate

a valid filesystem path

No

ssl_certificate_authorities

array

No

ssl_handshake_timeout

number

No

ssl_key

a valid filesystem path

No

ssl_key_passphrase

password

No

ssl_verify_mode

string,one of ["none", "peer","force_peer"]

No

tls_max_version

number

No

tls_min_version

number

 

 

二、file-input:來自文件的Streams事件(path字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

 

三、stdin-input:從標準輸入讀取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html

 

四、syslog-input:將syslog消息做爲事件讀取

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html

 

五、tcp-input:從TCP讀取事件(port字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html

 

六、udp-input:經過UDP讀取事件(port字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html

 

七、twitter-input:從Twitter Streaming API讀取事件(相對經常使用場景)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-twitter.html

consumer_keyconsumer_secretoauth_tokenoauth_token_secret必填項)

 

八、redis-input:從Redis實例讀取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html

data_type["list", "channel", "pattern_channel"]、key必填項,)

 

九、kafka-input:從Kafka主題中讀取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

(參數過多,自行查看)

 

十、jdbc-input:從JDBC數據建立事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

jdbc_connection_stringjdbc_driver_classjdbc_user必填項)

 

十一、http-input:經過HTTP或HTTPS接收事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html

 

十二、elasticsearch-input:從Elasticsearch集羣讀取查詢結果

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html

 

1三、exec-input:將shell命令的輸出捕獲爲事件(command字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-exec.html

 

 

非 經常使用輸入插件:

自行進入logstash的插件中心進行查看,有須要自行配置

總:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

 

 

11、Logstash-filter插件及插件參數概覽

全部處理插件均支持的配置:

Setting

Input type

Required

add_field

hash

no

add_tag

array

no

enable_metric

boolean

no

id

string

no

periodic_flush

boolean

no

remove_field

array

no

remove_tag

array

no

 

經常使用處理插件:

一、 grok-filter:能夠將非結構化日誌數據解析爲結構化和可查詢的內容

https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics

grok模式的語法是 %{SYNTAX:SEMANTIC}

SYNTAX是與您的文本匹配的模式的名稱

SEMANTIC是您爲匹配的文本提供的標識符

grok是經過系統預約義的正則表達式或者經過本身定義正則表達式來匹配日誌中的各個值

正則解析式比較容易出錯,建議先調試(地址):

grok debugger調試:http://grokdebug.herokuapp.com/

 

grok事先已經預約義好了許多正則表達式規則,該規則文件存放路徑:

/usr/local/logstash-5.6.10/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns

等等,可自行進入查看

 

示例一:

filter {

  grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

  }

}

 

初始輸入的message是:

55.3.244.1 GET /index.html 15824 0.043

 

通過grok的正則分析後:

client: 55.3.244.1(IP)

method: GET(方法)

request: /index.html(請求文件路徑)

bytes: 15824(字節數)

duration: 0.043(訪問時長)

 

示例二:

filter {

    grok {

        match => { "message" => "%{COMBINEDAPACHELOG}"}

    }

}

 

COMBINEDAPACHELOG的具體內容見:

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/httpd

 

初始輸入message爲:

192.168.80.183 - - [04/Jan/2018:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png

HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel

Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

 

通過grok正則分析後:

"clientip" => "192.168.80.183",

"timestamp" => "04/Jan/2018:05:13:42 +0000",

"verb" => "GET",

"request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png",

"referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",

"response" => "200",

"bytes" => "203023",

"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",

 

 

示例三(自定義grok表達式mypattern[A-Z]):

filter {

  grok{
  match=>{
    "message"=>"%{IP:clientip}\s+(?<mypattern>[A-Z]+)"}
    }

}

 

初始輸入message:

12.12.12.12 ABC

 

通過grok正則分析後:

"clientip" => "12.12.12.12",
"mypattern" => "ABC"

 

示例四(移除重複字段):

filter {

    grok {

        #match => { "message" => "%{COMBINEDAPACHELOG}"}

         match => { "message" => "%{IP:clientip}\s+%{IP:clientip1}"}

    }

    mutate {

    remove_field => ["message"]

    remove_field => ["host"]

   }

}

 

初始輸入message:

1.1.1.1 2.2.2.2

 

通過grok正則解析後(json格式):

 

 

{

  "_index": "logstash-2018.07.31",

  "_type": "log",

  "_id": "AWTuNdzp6Wkp4mVEj3Fh",

  "_version": 1,

  "_score": null,

  "_source": {

    "@timestamp": "2018-07-31T02:41:00.014Z",

    "offset": 1114,

    "clientip": "1.1.1.1",

    "@version": "1",

    "input_type": "log",

    "beat": {

      "name": "node183",

      "hostname": "node183",

      "version": "5.6.10"

    },

    "source": "/home/usieip/bdp-datashare/logs/a.log",

    "type": "log",

    "clientip1": "2.2.2.2",

    "tags": [

      "beats_input_codec_plain_applied"

    ]

  },

  "fields": {

    "@timestamp": [

      1533004860014

    ]

  },

  "sort": [

    1533004860014

  ]

}

 

示例五(過濾篩選catalina.out文件中的信息,message字段已移除):

filter {

    grok {

         match => { "message" =>

 "%{DATA:ymd} %{DATA:sfm} %{DATA:http} %{DATA:info}  %{GREEDYDATA:index}"}

}

}

【Data在pattern中的定義是:.*? GREEDYDATA在pattern中的定義是:.*】

 

初始輸入message:

2018-07-30 17:04:31.317 [http-bio-8080-exec-19] INFO  c.u.i.b.m.s.i.LogInterceptor - ViewName: modules/datashare/front/index

 

通過grok正則解析後(截圖及json格式以下):

{

  "_index": "logstash-2018.07.31",

  "_type": "log",

  "_id": "AWTvhiPD6Wkp4mVEj3GU",

  "_version": 1,

  "_score": null,

  "_source": {

    "offset": 125,

    "input_type": "log",

    "index": "c.u.i.b.m.s.i.LogInterceptor - ViewName: modules/datashare/front/index",

    "source": "/home/usieip/bdp-datashare/logs/b.log",

    "type": "log",

    "tags": [],

    "ymd": "2018-07-30",

    "@timestamp": "2018-07-31T08:48:17.948Z",

    "@version": "1",

    "beat": {

      "name": "node183",

      "hostname": "node183",

      "version": "5.6.10"

    },

    "http": "[http-bio-8080-exec-19]",

    "sfm": "17:04:31.317",

    "info": "INFO"

  },

  "fields": {

    "ymd": [

      1532908800000

    ],

    "@timestamp": [

      1533026897948

    ]

  },

  "sort": [

    1533026897948

  ]

}

 

經常使用參數:

1)match:match做用:用來對字段的模式進行匹配

2)patterns_dir:用來指定規則的匹配路徑,若是使用logstash自定義的規則時,不須要寫此參數。Patterns_dir能夠同時制定多個存放過濾規則的目錄;

patterns_dir => ["/opt/logstash/patterns","/opt/logstash/extra_patterns"]

 

3)remove_field:若是匹配到某個」日誌字段,則將匹配的這個日誌字段從這條日誌中刪除(多個以逗號隔開)

remove_field => ["foo _%{somefield}"]

 

二、 clone-filter:克隆過濾器用於複製事件

三、  drop-filter:丟棄全部活動

四、  json-filter:解析JSON事件

五、  kv-filter:解析鍵值對

很是用參數:

參考教程:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

 

12、Logstash-output插件及插件參數概覽

全部輸出插件均支持如下配置:

Setting

Input type

Required

codec

codec

No(默認plain)

enable_metric

boolean

No(默認true)

id

string

No

 

經常使用插件:

一、Elasticsearch-output:此插件是在Elasticsearch中存儲日誌的推薦方法。若是您打算使用Kibana Web界面,則須要使用此輸出

 

二、file-output:此輸出將事件寫入磁盤上的文件(path字段必填項)

 

三、kafka-output:將事件寫入Kafka主題(topic_id是必填項)

 

四、 redis-output:此輸出將使用RPUSH將事件發送到Redis隊列

 

五、stdout-output:一個簡單的輸出,打印到運行Logstash的shell的STDOUT

 

很是用插件:

參考官網教程連接:https://www.elastic.co/guide/en/logstash/current/output-plugins.html

 

 十3、Logstash與flume簡單對比

1)結構:

Logstash: Shipper、Broker、Indexer  (broker部署redis或者kafka進行緩存)

Flume: Source、Channel、Sink

Logstash已集成,broker能夠不須要,直接讀取處理輸出,不進行緩存

Flume需單獨配置,三組件缺一不可

 

2)配置:

Logstash:配置簡潔清晰,三個部分的屬性都定義好了,可自行選擇,若沒有,可自行開發插件,便捷易用;且logstash在Filter plugin部分具備比較完備的功能,好比grok,能經過正則解析和結構化任何文本,Grok 目前是Logstash最好的方式對非結構化日誌數據解析成結構化和可查詢化。此外,Logstash還能夠重命名、刪除、替換和修改事件字段,固然也包括徹底丟棄事件,如debug事件。還有不少的複雜功能可供選擇,

Flume:配置繁瑣,分別手動配置source、channel、sink,採集環境若是複雜須要多個。Flume的插件比較多,channel經常使用的就內存和文件兩種

 

3)初衷:

Flume側重數據的傳輸,使用者需很是清楚整個數據的路由,相對來講其更可靠,channel是用於持久化目的的,數據必須確認傳輸到下一個目的地,纔會刪除;

Logstash側重數據的預處理,日誌字段通過預處理以後再進行解析

 

4)組件:

logstash能夠與elk其餘組件配合使用、開發,應用簡單,使用場景普遍;

flume新版本輕量級,適合有必定計算編程基礎的人使用,且場景針對性強,須要配合不少其餘工具進行使用,不方便

 

5)舉例:

Logstash:主板、電源、硬盤,機箱等都已經裝好的臺式機,能夠直接用

Flume  :提供一套完整的主板,電源、硬盤、機箱等,自行組裝,裝好了才能用

相關文章
相關標籤/搜索