Filebeat6.31整合Kafka集羣消息隊列(三)

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gzcss

[root@es-node1 ~]# tar zxvf filebeat-6.3.2-linux-x86_64.tar.gz -C /usr/local/html

# mv /usr/local/filebeat-6.3.2-linux-x86_64/ /usr/local/filebeatnode

# egrep -v "#|^$" filebeat.ymllinux

filebeat.prospectors         #用於定義數據原型,檢測日誌或是發現日誌
- input_type: log            #指定數據的輸入類型,默認爲log,另外還能夠指定stdin
  paths:                      #自定須要監控的日誌文件路徑;能夠是完整的日誌路徑也能夠是模糊的匹配格式
   - /var/log/messages        #指定系統日誌位置
  fields:                      #定義日誌來源,能夠添加自定義字段,其實就是定義Kafka消息隊列的topic主題名稱,若是kafka消息隊列中沒有該名稱,會自動生成
    log_topic: test
  paths:                    #與上述同樣定義須要監控的日誌文件路徑,不夠此次是定義apache-web服務的日誌
   - /etc/httpd/logs/*
  fields:                    #定義日誌來源,生成kafka消息隊列topic主題
    log_topic: webapache
processors:            #這個地方須要注意,此配置是將日誌輸出格式過濾掉,通常狀況下,一些無用的日誌字段咱們能夠刪除,只看關鍵性的信息
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]
name: "192.168.37.134"         #設置filebeat收集日誌中對應的主機名稱,,若是設置爲空,這使用該機器的主機名稱,這裏這是本地IP,便於區分多臺主機的日誌信息
output.kafka:                  #多種輸出類型,可支持想kafka,logstash,elasticsearch輸出日誌信,在這裏是將日誌信息輸出到Kafka中,
  enabled: true                 啓動該模塊
  hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"]        #指定輸出數據到kafka集羣上,地址與端口號想對應
  version: "0.10"          
  topic: '%{[fields][log_topic]}'    #指定要發送數據到kafka集羣的哪一個topic,與上述的"fields: log_topic:"相對應,這是6.x的配置
  partition.round_robin:         #開啓kafka的partition分區
    reachable_only: true   
  worker: 2
  required_acks: 1
  compression: gzip      #壓縮格式
  max_message_bytes: 10000000    #壓縮格式字節大小
logging.level: debug        #日誌類型爲debug

root@es-node1 bin]#nohup ./filebeat -e -c filebeat.yml &web

[root@es-node1 bin]# ./kafka-topics.sh --zookeeper 192.168.37.129:2181,192.168.37.133,192.168.37.133:2181 --list
osmessages
test
webapacheexpress

【Kafka節點 】啓動消費,本次消費是apache

[root@es-node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092 --topic test --from-beginningjson

{
    "@timestamp": "2018-08-16T04:24:19.871Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "doc",
        "version": "6.3.2",
        "topic": "test"
    },
    "message": "Aug 16 12:24:13 es-node1 dbus[623]: [system] Successfully activated service 'org.freedesktop.nm_dispatcher'",
    "fields": {
        "log_topic": "test"
    },
    "beat": {
        "name": "192.168.37.134",
        "hostname": "es-node1",
        "version": "6.3.2"
    },
    "host": {
        "name": "192.168.37.134"
    },
    "source": "/var/log/messages",
    "offset": 290635
}

 

如下是apache是經過json校驗的日誌輸出信息bootstrap

 1 {
 2     "@timestamp": "2018-08-16T04:19:34.153Z",
 3     "@metadata": {
 4         "beat": "filebeat",
 5         "type": "doc",
 6         "version": "6.3.2",
 7         "topic": "webapache"
 8     },
 9     "beat": {
10         "name": "192.168.37.129",
11         "hostname": "es-node1",
12         "version": "6.3.2"
13     },
14     "host": {
15         "name": "192.168.37.129"
16     },
17     "source": "/etc/httpd/logs/access_log",
18     "offset": 17968,
19     "message": "192.168.37.1 - - [16/Aug/2018:12:19:33 +0800] \"GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1\" 404 238 \"http://192.168.37.129/noindex/css/open-sans.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\"",
20     "fields": {
21         "log_topic": "webapache"
22     }
23 }

 

 

上述Filebeat收集到的日誌格式信息量過大,咱們只須要收集關鍵性的日誌信息便可,無用的能夠過濾掉,配置以下app

processors:
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]

 

過濾到以後 的apache日誌輸出以下所示

 1 {
 2     "@timestamp": "2018-08-16T05:10:02.261Z",
 3     "@metadata": {
 4         "beat": "filebeat",
 5         "type": "doc",
 6         "version": "6.3.2",
 7         "topic": "webapache"
 8     },
 9     "message": "192.168.37.1 - - [16/Aug/2018:13:09:53 +0800] \"GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1\" 404 238 \"http://192.168.37.129/noindex/css/open-sans.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\"",
10     "fields": {
11         "log_topic": "webapache"
12     },
13     "host": {
14         "name": "192.168.37.129"
15     }
16 }

【filebeat.yml配置文件】

文章最後結尾是filebeat的過濾和未過濾的配置,方便直接複製粘貼;

爲何這咱們再次將filebeat的配置文件粘貼出來呢?由於我被這個坑了整整一天的時間,啓動fiebeat一直報錯,網上關於6.x版本和kafka整合的博客和資料幾乎沒有,出現報錯,也沒有解決方案,當時直接氣暈,對外尋求幫助,也沒有解決方案,最後上層技術大佬,才得以解決,這個filebeat,yml實在是太多坑了,都是由於JSON格式,下面是我具體的報錯信息

 error initializing publisher: missing required field accessing 'output.kafka.hosts'

提示缺乏訪問「輸出. Kafka . hosts」的必需字段,就這個,我糾結了一天,好在問題解決了,心累~

[root@es-node1 filebeat]# egrep -v "#|^$" filebeat.yml
filebeat.prospectors:
- input_type: log
  paths: 
   - /var/log/messages
  fields: 
    log_topic: test
  paths: 
   - /etc/httpd/logs/*
  fields: 
    log_topic: webapache
processors:
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]
name: "192.168.37.134"
output.kafka:
  enabled: true
  hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"]
  version: "0.10"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000
logging.level: debug

 

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log

  # Paths that should be crawled and fetched. Glob based paths.
  paths: 
   - /var/log/messages
  fields: 
    log_topic: test
  paths: 
   - /etc/httpd/logs/*
  fields: 
    log_topic: webapache
    #- c:\programdata\elasticsearch\logs\*

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ["^DBG"]

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ["^ERR", "^WARN"]

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #exclude_files: [".gz$"]

  # Optional additional fields. These field can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

  ### Multiline options

  # Mutiline can be used for log messages spanning multiple lines. This is common
  # for Java Stack Traces or C-Line Continuation

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
  #multiline.pattern: ^\[

  # Defines if the pattern set under pattern should be negated or not. Default is false.
  #multiline.negate: false

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
  # that was (not) matched before or after or as long as a pattern is not matched based on negate.
  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
  #multiline.match: after

processors:
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
name: "192.168.37.134"

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
output.kafka:
  enabled: true
  hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"]
  version: "0.10"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000


#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: []
  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["172.16.213.51:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]
相關文章
相關標籤/搜索