filebeat+kafka+logstash+elasticsearch

服務架構

filebeat -> kafka -> logstash -> elasticsearch -> UI(kibana/我的定製化)

因爲博主是在本機測試,因此的filebeat和logstash是用的傳統方式安裝,其餘組件用的docker容器。html

除了filebeat,其餘組件會用到java虛擬機,因此爲了方便,最好都走docker。我這爲了好調試logstash就是直接安裝的,因此還必須安裝JAVA。java

文章的內容是基於單機本機搭建的,要用集羣的方式修改,根據對應組件的官網文檔來修改相應的hosts便可。git

 

服務搭建

JAVA安裝:

github

filebeat:

安裝

filebeat的安裝docker

配置 

配置文件: filebeat.yml (路徑根據本身的安裝方式,會有不一樣。Linux是在/etc/filebeat/)bootstrap

配置信息:bash

#=========================== Filebeat inputs =============================
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /*/log/*
  fields:
    testname: xxxnn
  fields_under_root: true

#----------------------------- kafka output --------------------------------
output.kafka:
  enabled: true
  hosts: ["localhost:9092"]
  max_retries: 5
  timeout: 300
  topic: "filebeat"

配置驗證:

./filebeat test config -c filebeat.yml

 

kafka:

docker 鏡像拉取

docker pull wurstmeister/zookeeper:latest
docker pull wurstmeister/kafka:latest

鏈接、啓動服務

sudo docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper:latest

sudo docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092  wurstmeister/kafka:latest

這一步,kafka服務意境啓動成功了。服務器

(這裏有個坑, 若是你想經過內網其餘服務器鏈接到kafka容器,這裏KAFKA_ADVERTISED_HOST_NAME應該設置爲eth0的IP地址,詳情能夠看這裏https://github.com/wurstmeister/kafka-docker/issues/17)架構

進入容器查看配置

docker exec -it kafka bash
cd /opt/kafka

配置文件(啓動容器後,沒有作任何修改)

bash-4.4# grep -Ev "^$|^#" config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-fbfc07e603b5
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
advertised.port=9092
advertised.host.name=127.0.0.1
port=9092

bash-4.4# grep -Ev "^$|^#" config/consumer.properties
bootstrap.servers=localhost:9092
group.id=test-consumer-group

bash-4.4# grep -Ev "^$|^#" config/producer.properties
bootstrap.servers=localhost:9092
compression.type=none

 

logstash

安裝

logstash安裝socket

添加配置文件test.conf

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    topics => ["filebeat"]
    group_id => "test-consumer-group"
    codec => "plain"
    consumer_threads => 1
    decorate_events => true
  }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "test"
    workers => 1
  }
}

測試配置文件

bin/logstash -f test.conf --config.test_and_exit

 

es

拉取鏡像

docker pull docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4

這個拉取的是沒有x-pack的鏡像,具體的選擇看這裏es鏡像

 

服務啓動

依次啓動

es:

sudo docker run -d --name es -p 9200:9200 -t docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4

logstash:

bin/logstash -f test.conf --config.reload.automatic

kafka:

上文已經啓動了

filebeat:

./filebeat -e -c filebeat.yml -d "publish"

當在控制檯上看到filebeat,publish成功後,再去es裏面看,數據已經被索引成功啦!

 

結語

 目前主流的日誌採集系統架構都是基於ELK(es+logstash+kibana),或者ELFK,多了一個filebeat。

爲何要加入kafka消息中間件呢?

博主認爲kafka有兩個重要的特色,特別適合引入到日誌採集系統中,高吞吐率,以及數據持久化(固然還有其餘優秀的特性,好比消息主題,分佈式高可用等等),提升系統中數據的可靠傳輸。

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息