ELK-filebeat收集日誌到Kafka,並轉存ES

場景需求

在有些不須要安裝java環境的服務器如Nginx,php等爲主的web 應用能夠使用filebeat來對這些服務日誌進行收集。
Filebeat用於收集本地文件的日誌數據。 它做爲服務器上的代理安裝,Filebeat監視日誌目錄或特定的日誌文件,尾部文件,並將它們轉發到Elasticsearch或Logstash進行索引。
logstash 和filebeat都具備日誌收集功能,filebeat更輕量,使用go語言編寫,佔用資源更少,能夠有很高的併發,但logstash 具備filter功能,能過濾分析日誌。通常結構都是filebeat採集日誌,而後發送到消息隊列,如redis,kafka。而後logstash去獲取,利用filter功能過濾分析,而後存儲到elasticsearch中。
Kafka是LinkedIn開源的分佈式發佈-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特色是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支持複製,不支持事務,對消息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。php

環境說明

node1: Elasticsearch, zookeeper, Kafka,Nginx,Kibana,filebeat
node2: Elasticsearch, zookeeper,Kafka, logstash
node3: zookeeper,Kafkahtml

配置步驟

這裏介紹一下 zookeeper和 kafka配置步驟,node1和node2上關於ES,logstash,Kibana,Nginx的配置能夠參考以前的文章。node1上使用nginx代理本地Kibana。
在配置以前,須要在每臺機器是作好主機名解析:java

echo "192.168.20.60 node1" >> /etc/hosts
 echo "192.168.20.61 node2" >> /etc/hosts
 echo "192.168.20.62 node3" >> /etc/hosts

zookeeper依賴java環境,須要安裝openjdk:node

yum install java-1.8.0-openjdk -y

安裝配置zookeeper

在官網下載zookeeper的安裝包: https://www.apache.org/dyn/closer.cgi/zookeeper/ nginx

在三臺節點上解壓安裝,執行以下操做:web

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.5.2-alpha/zookeeper-3.5.2-alpha.tar.gz
tar xvf zookeeper-3.5.2-alpha.tar.gz
mv zookeeper-3.5.2-alpha /usr/local/
ln -sv /usr/local/zookeeper-3.5.2-alpha /usr/local/zookeeper
mkdir /usr/local/zookeeper/data/
vim /usr/local/zookeeper/conf/zoo.cfg

修改zookeeper的配置文件爲以下內容:redis

# grep "^[a-Z]" /usr/local/zookeeper/conf/zoo.cfg

tickTime=2000  #服務器之間或客戶端與服務器之間的單次心跳檢測時間間隔,單位爲毫秒
initLimit=10   #集羣中leader服務器與follower服務器第一次鏈接最屢次數
syncLimit=5  # leader 與 follower 之間發送和應答時間,若是該follower 在設置的時間內不能與leader 進行通訊,那麼此 follower 將被視爲不可用。
dataDir=/usr/local/zookeeper/data  #自定義的zookeeper保存數據的目錄
clientPort=2181  #客戶端鏈接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求
server.1=192.168.20.60:2888:3888  #服務器編號=服務器IP:LF數據同步端口:LF選舉端口
server.2=192.168.20.61:2888:3888
server.3=192.168.20.62:2888:3888

配置zookeeper集羣節點ID
在node1上執行:apache

[root@node1 ~]# echo "1" > /usr/local/zookeeper/data/myid

在node2上執行:json

[root@node2 ~]# echo "2" > /usr/local/zookeeper/data/myid

在node3上執行:bootstrap

[root@node3 ~]# echo "3" > /usr/local/zookeeper/data/myid

在三個節點上啓動zookeeper:

# /usr/local/zookeeper/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看三個節點的啓動狀態:

[root@node1 data]#  /usr/local/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

[root@node2 ~]#  /usr/local/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

[root@node3 ~]# /usr/local/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

對zookeeper集羣進行操做驗證:
在任意節點上執行:

[root@node1 data]# /usr/local/zookeeper/bin/zkCli.sh -server 192.168.20.62:2181

[zk: 192.168.20.62:2181(CONNECTED) 0] create /test "welcome"
Created /test

在其餘節點上獲取:

[root@node3 ~]# /usr/local/zookeeper/bin/zkCli.sh -server 192.168.20.60:2181

[zk: 192.168.20.60:2181(CONNECTED) 0] get /test
welcome

安裝配置Kafka

kafka官方下載地址: http://kafka.apache.org/downloads.html
在三臺節點上安裝Kafka集羣,進行以下操做:

wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
 tar xvf kafka_2.11-1.0.0.tgz
mv kafka_2.11-1.0.0 /usr/local/
ln -sv /usr/local/kafka_2.11-1.0.0/ /usr/local/kafka

node1上修改配置文件的以下選項,系統默認選項這裏沒有列出:

[root@node1 local]# grep "^[a-Z]" /usr/local/kafka/config/server.properties
broker.id=1         # 指定節點的ID號,不一樣節點ID必須不一樣
listeners=PLAINTEXT://192.168.20.60:9092  # 監聽的ip和端口
log.dirs=/tmp/kafka-logs
log.retention.hours=24  #  日誌保留的時間單位爲小時
zookeeper.connect=192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181  # zookeeper集羣IP端口

node2上修改kafka配置文件的以下選項:

#  /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.20.61:9092
log.dirs=/tmp/kafka-logs
log.retention.hours=24
zookeeper.connect=192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181

node3上修改以下參數:

# /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.20.62:9092
num.network.threads=3
log.dirs=/tmp/kafka-logs
log.retention.hours=24
zookeeper.connect=192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181

使用 -daemon 的方式啓動kafka:

[root@node1 ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 
[root@node2 ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@node3 ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

查看kafka啓動狀態:

# tail -f /usr/local/kafka/logs/server.log   # 日誌最後一條,顯示已經啓動
...
[2017-12-19 16:10:05,542] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

查看後臺進程:

[root@node1 ~]# jps
6594 QuorumPeerMain
7570 Kafka
7653 Jps
1884 Elasticsearch

[root@node3 ~]# jps
5184 QuorumPeerMain
6003 Jps
5885 Kafka

提示: jps 是查看java進程的小工具,若是沒有jps命令,說明只安裝了java-1.8.0-openjdk,還須要安裝java-1.8.0-openjdk-devel

Kafka 操做命令

測試Kafka添加topic:

[root@node3 logs]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --partitions 3 --replication-factor 3 --topic  kafkatest
Created topic "kafkatest".

查看添加的topic(集羣中任意服務器上均可以看到):

[root@node1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181
kafkatest

測試獲取topic:

[root@node2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --topic kafkatest
Topic:kafkatest PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: kafkatest    Partition: 0    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3
    Topic: kafkatest    Partition: 1    Leader: 3   Replicas: 3,2,1 Isr: 3,2,1
    Topic: kafkatest    Partition: 2    Leader: 1   Replicas: 1,3,2 Isr: 1,3,2

狀態說明:logstashtest有三個分區分別爲0、一、2,分區0的leader是2(broker.id),分區0有三個副本,而且狀態都爲lsr(ln-sync,表示能夠參加選舉成爲leader).

刪除topic:

[root@node2 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --topic kafkatest
Topic kafkatest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

測試使用命令發送消息:
建立一個topic:

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --partitions 3 --replication-factor 3 --topic  kafkatest
Created topic "kafkatest".

發送消息:

[root@node3 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.20.60:9092,192.168.20.61:9092,192.168.20.62:9092 --topic kafkatest
>hello
>good
>welcome

其餘節點獲取數據(在任意節點上都能獲取):

[root@node1 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --topic kafkatest --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
good
welcome
hello

使用Logstash和Kafka交互

編輯logstash配置文件:

[root@node1 ~]# cat /etc/logstash/conf.d/kafka.conf 
input{
  stdin{}
}

output{
  kafka{
    topic_id =>"kafkatest"
    bootstrap_servers => "192.168.20.60:9092"
    batch_size => 5
}
  stdout{
     codec => "rubydebug"
}
}

前臺啓動logstash,輸入數據:

[root@node1 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf 

The stdin plugin is now waiting for input:
123456
{
    "@timestamp" => 2017-12-19T15:19:34.885Z,
       "message" => "123456",
          "host" => "node1",
      "@version" => "1"
}
trying
{
    "@timestamp" => 2017-12-19T15:20:15.234Z,
       "message" => "trying",
          "host" => "node1",
      "@version" => "1"
}

在kafka中查看寫入的數據:

[root@node3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --topic kafkatest --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

2017-12-19T15:19:34.885Z node1 123456
2017-12-19T15:20:15.234Z node1 trying

數據寫入成功,kafka配置完成。

安裝filebeat

從官網下載filebeat的安裝包進行安裝: https://www.elastic.co/downloads

[root@node1 tmp]# yum install filebeat-6.1.1-x86_64.rpm -y

在filebeat :6.1的版本中,默認的filebeat.reference.yml有各類模塊的配置示例,配置filebeat寫入本地文件:

[root@node1 filebeat]# cat /etc/filebeat/filebeat.yml

filebeat.modules:
- module: nginx
  access:
    enabled: true
    var.paths: 
      - /var/log/nginx/access.log
filebeat.prospectors:
- type: log
  enabled: false
  paths:
    - /var/log/*.log
output.file:
  enabled: true
  path: "/tmp/filebeat"
  rotate_every_kb: 10000
  number_of_files: 7
logging.to_files: true

提示:(引用官方原文) 在filebeat 6.0之前的版本中,能夠同時啓用多個輸出,但只能使用不一樣的類型。例如,您能夠啓用Elasticsearch和Logstash輸出,但不能啓用兩個Logstash輸出。啓用多個輸出的缺點是在繼續以前等待確認(Filebeat和Winlogbeat)的Beats減慢到最慢的輸出。這種含義並不明顯,阻礙了多個產出有用的用例。
做爲咱們爲6.0所作的管道從新架構的一部分,咱們刪除了同時啓用多個輸出的選項。這有助於簡化管道並明確Beats中的輸出範圍。
若是您須要多個輸出,您有如下選擇:
使用Logstash輸出,而後使用Logstash將事件傳送到多個輸出.
運行相同Beat的多個實例.
若是您使用文件或控制檯輸出進行調試,除了主輸出以外,咱們建議使用-d 「public」選項,將發佈的事件記錄在Filebeat日誌中。

訪問nginx,使用tail -f /tmp/filebeat/filebeat命令追蹤日誌的變化,發現有輸出則配置成功。

輸出日誌到kakfa:

[root@node1 filebeat]# cat /etc/filebeat/filebeat.yml

filebeat.modules:
- module: nginx
  access:
    enabled: true
    var.paths: 
      - /var/log/nginx/access.log
filebeat.prospectors:
- type: log
  enabled: false
  paths:
    - /var/log/*.log
output.kafka:
  enabled: true
  hosts: ["192.168.20.60:9092","192.168.20.61:9092","192.168.20.62:9092"]
  topic: "nginx-filebeat"
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

訪問nginx查看kafka中是否寫入:

[root@node3 ~]#  /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181
kafkatest
nginx-filebeat
[root@node3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.20.60:2181,192.168.20.61:2181,192.168.20.62:2181 --topic nginx-filebeat

...

當日志成功寫入Kafka後配置logstash.

使用logstash從Kafka讀取日誌到ES

若是使用filebeat 6.0以上的版本,能夠將filebeat收集到的各類類型日誌統一輸入到Kafka,而後經過配置logstash將日誌分類輸出到不一樣的容器中。

配置logstash讀取kafka日誌:

[root@node2 log]# cat /etc/logstash/conf.d/kafka2es.conf 
input{
  kafka{
    bootstrap_servers => "192.168.20.62:9092"
    topics => "nginx-filebeat"
    consumer_threads => 1
    decorate_events => true
    codec => "json"
    auto_offset_reset => "latest"

}
}

output{
  elasticsearch {
    hosts => ["192.168.20.61:9200"]
    index => "nginx-filebeat-%{+YYYY.MM.dd}"

}
  stdout{
     codec => "rubydebug"
}

}

測試文件:

[root@node2 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka2es.conf -t

在前臺運行,確保日誌能正確輸出:

[root@node2 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka2es.conf 
...

重啓logstash,查看Elasticsearch中是否收到數據:

[root@node2 ~]# systemctl restart logstash

Elasticsearch已經有了對應的數據:
ELK-filebeat收集日誌到Kafka,並轉存ES

添加索引到Kibana,展現數據:

ELK-filebeat收集日誌到Kafka,並轉存ES

相關文章
相關標籤/搜索