wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gzcss
[root@es-node1 ~]# tar zxvf filebeat-6.3.2-linux-x86_64.tar.gz -C /usr/local/html
# mv /usr/local/filebeat-6.3.2-linux-x86_64/ /usr/local/filebeatnode
# egrep -v "#|^$" filebeat.ymllinux
filebeat.prospectors #用於定義數據原型,檢測日誌或是發現日誌 - input_type: log #指定數據的輸入類型,默認爲log,另外還能夠指定stdin paths: #自定須要監控的日誌文件路徑;能夠是完整的日誌路徑也能夠是模糊的匹配格式 - /var/log/messages #指定系統日誌位置 fields: #定義日誌來源,能夠添加自定義字段,其實就是定義Kafka消息隊列的topic主題名稱,若是kafka消息隊列中沒有該名稱,會自動生成 log_topic: test paths: #與上述同樣定義須要監控的日誌文件路徑,不夠此次是定義apache-web服務的日誌 - /etc/httpd/logs/* fields: #定義日誌來源,生成kafka消息隊列topic主題 log_topic: webapache processors: #這個地方須要注意,此配置是將日誌輸出格式過濾掉,通常狀況下,一些無用的日誌字段咱們能夠刪除,只看關鍵性的信息 - drop_fields: fields: ["beat", "input_type", "source", "offset",] name: "192.168.37.134" #設置filebeat收集日誌中對應的主機名稱,,若是設置爲空,這使用該機器的主機名稱,這裏這是本地IP,便於區分多臺主機的日誌信息 output.kafka: #多種輸出類型,可支持想kafka,logstash,elasticsearch輸出日誌信,在這裏是將日誌信息輸出到Kafka中, enabled: true 啓動該模塊 hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"] #指定輸出數據到kafka集羣上,地址與端口號想對應 version: "0.10" topic: '%{[fields][log_topic]}' #指定要發送數據到kafka集羣的哪一個topic,與上述的"fields: log_topic:"相對應,這是6.x的配置 partition.round_robin: #開啓kafka的partition分區 reachable_only: true worker: 2 required_acks: 1 compression: gzip #壓縮格式 max_message_bytes: 10000000 #壓縮格式字節大小 logging.level: debug #日誌類型爲debug
root@es-node1 bin]#nohup ./filebeat -e -c filebeat.yml &web
[root@es-node1 bin]# ./kafka-topics.sh --zookeeper 192.168.37.129:2181,192.168.37.133,192.168.37.133:2181 --list
osmessages
test
webapacheexpress
【Kafka節點 】啓動消費,本次消費是apache
[root@es-node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092 --topic test --from-beginningjson
{ "@timestamp": "2018-08-16T04:24:19.871Z", "@metadata": { "beat": "filebeat", "type": "doc", "version": "6.3.2", "topic": "test" }, "message": "Aug 16 12:24:13 es-node1 dbus[623]: [system] Successfully activated service 'org.freedesktop.nm_dispatcher'", "fields": { "log_topic": "test" }, "beat": { "name": "192.168.37.134", "hostname": "es-node1", "version": "6.3.2" }, "host": { "name": "192.168.37.134" }, "source": "/var/log/messages", "offset": 290635 }
如下是apache是經過json校驗的日誌輸出信息bootstrap
1 { 2 "@timestamp": "2018-08-16T04:19:34.153Z", 3 "@metadata": { 4 "beat": "filebeat", 5 "type": "doc", 6 "version": "6.3.2", 7 "topic": "webapache" 8 }, 9 "beat": { 10 "name": "192.168.37.129", 11 "hostname": "es-node1", 12 "version": "6.3.2" 13 }, 14 "host": { 15 "name": "192.168.37.129" 16 }, 17 "source": "/etc/httpd/logs/access_log", 18 "offset": 17968, 19 "message": "192.168.37.1 - - [16/Aug/2018:12:19:33 +0800] \"GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1\" 404 238 \"http://192.168.37.129/noindex/css/open-sans.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\"", 20 "fields": { 21 "log_topic": "webapache" 22 } 23 }
上述Filebeat收集到的日誌格式信息量過大,咱們只須要收集關鍵性的日誌信息便可,無用的能夠過濾掉,配置以下app
processors: - drop_fields: fields: ["beat", "input_type", "source", "offset",]
過濾到以後 的apache日誌輸出以下所示
1 { 2 "@timestamp": "2018-08-16T05:10:02.261Z", 3 "@metadata": { 4 "beat": "filebeat", 5 "type": "doc", 6 "version": "6.3.2", 7 "topic": "webapache" 8 }, 9 "message": "192.168.37.1 - - [16/Aug/2018:13:09:53 +0800] \"GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1\" 404 238 \"http://192.168.37.129/noindex/css/open-sans.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\"", 10 "fields": { 11 "log_topic": "webapache" 12 }, 13 "host": { 14 "name": "192.168.37.129" 15 } 16 }
【filebeat.yml配置文件】
文章最後結尾是filebeat的過濾和未過濾的配置,方便直接複製粘貼;
爲何這咱們再次將filebeat的配置文件粘貼出來呢?由於我被這個坑了整整一天的時間,啓動fiebeat一直報錯,網上關於6.x版本和kafka整合的博客和資料幾乎沒有,出現報錯,也沒有解決方案,當時直接氣暈,對外尋求幫助,也沒有解決方案,最後上層技術大佬,才得以解決,這個filebeat,yml實在是太多坑了,都是由於JSON格式,下面是我具體的報錯信息
error initializing publisher: missing required field accessing 'output.kafka.hosts'
提示缺乏訪問「輸出. Kafka . hosts」的必需字段,就這個,我糾結了一天,好在問題解決了,心累~
[root@es-node1 filebeat]# egrep -v "#|^$" filebeat.yml filebeat.prospectors: - input_type: log paths: - /var/log/messages fields: log_topic: test paths: - /etc/httpd/logs/* fields: log_topic: webapache processors: - drop_fields: fields: ["beat", "input_type", "source", "offset",] name: "192.168.37.134" output.kafka: enabled: true hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"] version: "0.10" topic: '%{[fields][log_topic]}' partition.round_robin: reachable_only: true worker: 2 required_acks: 1 compression: gzip max_message_bytes: 10000000 logging.level: debug
###################### Filebeat Configuration Example ######################### # This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html #=========================== Filebeat prospectors ============================= filebeat.prospectors: # Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations. - input_type: log # Paths that should be crawled and fetched. Glob based paths. paths: - /var/log/messages fields: log_topic: test paths: - /etc/httpd/logs/* fields: log_topic: webapache #- c:\programdata\elasticsearch\logs\* # Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"] # Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"] # Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"] # Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1 ### Multiline options # Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\[ # Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after processors: - drop_fields: fields: ["beat", "input_type", "source", "offset",] #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. name: "192.168.37.134" # The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"] # Optional fields that you can specify to add additional information to the # output. #fields: # env: staging #================================ Outputs ===================================== # Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used. output.kafka: enabled: true hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"] version: "0.10" topic: '%{[fields][log_topic]}' partition.round_robin: reachable_only: true worker: 2 required_acks: 1 compression: gzip max_message_bytes: 10000000 #-------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. #hosts: [] # Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme" #----------------------------- Logstash output -------------------------------- #output.logstash: # The Logstash hosts #hosts: ["172.16.213.51:5044"] # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem" # Client Certificate Key #ssl.key: "/etc/pki/client/cert.key" #================================ Logging ===================================== # Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug logging.level: debug # At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"]