Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日誌採集方案

前言  

  Elastic Stack 提供 Beats 和 Logstash 套件來採集任何來源、任何格式的數據。其實Beats 和 Logstash的功能差很少,都可以與 Elasticsearch 產生協同做用,並且html

logstash比filebeat功能更強大一點,2個都使用是由於:Beats 是一個輕量級的採集器,支持從邊緣機器向 Logstash 和 Elasticsearch 發送數據。考慮到 Logstash 佔用系java

統資源較多,咱們採用 Filebeat 來做爲咱們的日誌採集器。而且這裏採用kafka做爲傳輸方式是爲了不堵塞和丟失,以實現日誌的實時更新。node

介紹

  1.Filebeatfilebat是一個用於轉發和集中日誌數據的輕量級shipper。做爲代理安裝在服務器上,filebeat監視指定的日誌文件或位置,收集日誌事件,並將它們轉發給ElasticSearchlogstash進行索引。linux

  2.Logstash:Logstash 是開源的服務器端數據處理管道,可以同時從多個來源採集數據,轉換數據,而後將數據發送到存儲庫。apache

  3.ElasticSearch:Elasticsearch 是基於 JSON 的分佈式搜索和分析引擎,專爲實現水平擴展、高可靠性和管理便捷性而設計。json

  4.Kibana:Kibana 可以以圖表的形式呈現數據,而且具備可擴展的用戶界面,供您全方位配置和管理 Elastic Stack。bootstrap

Elastic Stack架構

 

 軟件環境

  Ubuntu 18.04.1 ubuntu

  

  Elastic Stack官網:https://www.elastic.co/vim

kafka、Zookeeper安裝

  kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。  api

  ZooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個爲分佈式應用

提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分佈式同步、組服務等。

注:安裝kafka時會發現它須要安裝Zookeeper的。kafka的官方文檔有說明。zookeeper是爲了解決分佈式一致性問題的工具,kafka使用ZooKeeper用於管理、協調代理。當Kafka系統中新增了代理或某個代理失效時,Zookeeper服務將通知生產者和消費者。生產者與消費者據此開始與其餘代理協調工做。

Zookeeper單機安裝配置:

Step 1:下載壓縮包並解壓

1 >wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
2 >tar -zxvf  zookeeper-3.4.13.tar.gz
3 >cd zookeeper-3.4.13

Step 2:修改配置文件

先複製模板配置文件,並重命名,而後裏面存放數據的路徑dataDir能夠本身定義

1 >cp -rf conf/zoo_sample.cfg conf/zoo.cfg
2 >vim zoo.cfg

Step 3:啓動服務

1 >./bin/zkServer.sh start 2 
3 >./bin/zkServer.sh status

先啓動,系統會默認加載zoo.cfg配置,而後使用 zkServer.sh status 查看服務狀態

kafka安裝配置:

根據官網教程開始安裝(複製的官網教程):

Step 1: Download the code

1  > wget http://mirror.apache-kr.org/kafka/2.1.0/kafka_2.11-2.1.0.tgz
2  > tar -xzf kafka_2.11-2.1.0.tgz
3  > cd kafka_2.11-2.1.0/

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.(因爲我本身安裝的Zookeeper,因此這一步能夠省略

1 > ./bin/zookeeper-server-start.sh config/zookeeper.properties
2 [2019-01-23 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
3 ...

Now start the Kafka server:

首先能夠先修改一下配置文件server.propertise

broker.id=0  #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣
port=9092 #當前kafka對外提供服務的端口默認是9092
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能
socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除
log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能
zookeeper.connect=localhost:12181#設置zookeeper的鏈接端口

 

1 > ./bin/kafka-server-start.sh config/server.properties 2 [2019-01-23 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
3 [2019-01-23 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
4 ...

後臺啓動的命令(藍色部分爲控制檯輸出地址):

>nohup bin/kafka-server-start.sh config/server.properties 1>/home/ec2-user/tools/kafka_2.11-2.1.0/logs/kafka.log 2>&1 &

 

到這裏服務就啓動成功了。

查看java進程pid也能夠發現kafka

Step 3: Create a topic

Let's create a topic named "test" with a single partition and only one replica:

1 > ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

if you want delete a topic:

1 >./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

We can now see that topic if we run the list topic command:

1 > ./bin/kafka-topics.sh --list --zookeeper localhost:2181
2 test

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

在上一次配置的日子目錄(server.properties裏配置的log.dirs)上能夠看到,已經存在了該主題的日誌信息文件了,因爲server.properties裏面配置的num.partitions=1(默認的分區數,一個topic默認1個分區數因此只有一個文件夾

要查看topic詳細信息的話:

1  >bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

 

Step 4: Star a producer(建立發佈者)

1 > ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Step 5: Start a consumer(建立訂閱者)

1 > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic test --from-beginning

 建立好後,能夠在發佈者端發送消息進行測試,咱們這裏的發佈者是filebeat,訂閱者是logstash,能夠不用配置。

Filebeat安裝配置

  beat是一個輕量級的數據傳輸者,它有好幾種分類,這裏暫時只用到Filebeat。

  Filebeat的處理流程基本分爲3部分:Input、Filter、Output。

 

 開始安裝

  1.官網下載 :filebeat-6.5.4-linux-x86_64.tar.gz,複製到LINUX服務器,而後解壓( tar -xzvf filebeat-6.5.4-linux-x86_64.tar.gz)

  2.進入filebeat-6.5.4-linux-x86_64文件夾,修改filebeat.yml,filebeat默認是將數據傳輸到elasticsearch,須要修改成kafka

 

 這裏放一個filebeat詳細配置參考:http://www.javashuo.com/article/p-wywerlao-mh.html

配置完成後啓動filebeat:

1 > sudo ./filebeat -e -c filebeat.yml

ELK(elasticsearch+logstash+kibana 6.5.4)配置

elasticsearch安裝配置

  Elasticsearch 是一個分佈式、RESTful 風格的搜索和數據分析引擎,可以解決不斷涌現出的各類用例。

  順帶提一下Elasticsearch的數據結構:FST(Finite StateTransducers),其特色高度重複利用數據前綴和後綴存儲數據,能很大層度上壓縮數據。

elasticsearch安裝配置:

  從官網上下載軟件:https://www.elastic.co/cn/downloads/elasticsearch 

  解壓以後,修改config/elasticsearch.yml配置文件,配置一下ip和端口號:

1 skyworth@skyworth-ubuntu:~$ cd /home/tools/elk/elasticsearch-6.5.4/
2 skyworth@skyworth-ubuntu:/home/tools/elk/elasticsearch-6.5.4$ vim config/elasticsearch.yml

而後啓動命令:

1 skyworth@skyworth-ubuntu:/home/tools/elk/elasticsearch-6.5.4$ ./bin/elasticsearch

後臺運行命令:

>./bin/elasticsearch -d

而後瀏覽器打開端口(http://192.168.80.15:9200/),出現以下信息說明啓動成功

elasticsearch啓動常見錯誤參考:http://www.javashuo.com/article/p-fryyljlc-me.html

elastucsearch安全框架(Search Guard ,x-pack收費)

官方參考文檔:https://docs.search-guard.com/latest/demo-installer.html#install-search-guard-on-elasticsearch

配置參考:http://xiaoqiangge.com/aritcle/1536058241842.html

Install Search Guard on Elasticsearch:

>bin/elasticsearch-plugin install -b com.floragunn:search-guard-6:6.5.4-24.0

 接着,配置ssl,因爲ElasticSearch節點之間通信默值非加密,形成數據不安全,Search Guard強制ElasticSearch節點之間通信爲加密方式。

 能夠在 https://search-guard.com/tls-certificate-generator 上申請密匙。

Email用來接收密鑰,

Organization Name能夠隨便填寫,

Hostname填寫ElasticSearch集羣中每一個節點的node name,這是一一對應的,這裏我只有一個節點,node name爲node-1

填寫好以後,郵箱會收到一個壓縮包,將這個壓縮包解壓到Elastucsearch的config目錄下,並在elasticsearch.yml追加配置,以下:

http.compression: true
xpack.security.enabled: false

searchguard.ssl.transport.pemcert_filepath: CN=node-1.crtfull.pem
searchguard.ssl.transport.pemkey_filepath: CN=node-1.key.pem
searchguard.ssl.transport.pemkey_password: 3816b97cbd40d1a97402
searchguard.ssl.transport.pemtrustedcas_filepath: chain-ca.pem
searchguard.ssl.transport.enforce_hostname_verification: false
searchguard.ssl.http.enabled: true
searchguard.ssl.http.pemcert_filepath: CN=node-1.crtfull.pem
searchguard.ssl.http.pemkey_filepath: CN=node-1.key.pem
searchguard.ssl.http.pemkey_password: 3816b97cbd40d1a97402
searchguard.ssl.http.pemtrustedcas_filepath: chain-ca.pem

searchguard.authcz.admin_dn:
  - CN=sgadmin

searchguard.audit.type: internal_elasticsearch
searchguard.enable_snapshot_restore_privilege: true
searchguard.check_snapshot_restore_write_privileges: true
searchguard.restapi.roles_enabled: ["sg_all_access"]
cluster.routing.allocation.disk.threshold_enabled: false

注:有關配置屬性以及pemkey_password屬性能夠在下載的密鑰包中README.txt文件中查看到,以下:

## Passwords

### Common passwords

Root CA password: 9c5eeb48cee19d1ae6ce0315154f7271ff6f8b8c
Truststore password: 1d6df567841c5e8a7663
Admin keystore and private key password: 83f5fdded6c38208a9ad
Demouser keystore and private key password: e598a7e1684295923d36

## Host/Node specific passwords

Host: node-1
node-1 keystore and private key password: 3816b97cbd40d1a97402
node-1 keystore: node-certificates/CN=node-1-keystore.jks
node-1 PEM certificate: node-certificates/CN=node-1.crtfull.pem
node-1 PEM private key: node-certificates/CN=node-1.key.pem

這時還沒法啓動ES,須要對節點密鑰文件進行受權,權限打小爲0644(rw-r--r--),不然會出現對密鑰文件沒有讀取權限,以下:

>chmod 644  root-ca.pem chain-ca.pem node-certificates/CN\=node-1.key.pem node-certificates/CN\=node-1.crtfull.pem /home/ec2-user/tools/elasticsearch-6.5.4/config/

把節點受權的4個文件複製到ES的config文件夾下

cp root-ca.pem chain-ca.pem node-certificates/CN\=node-1.key.pem node-certificates/CN\=node-1.crtfull.pem /home/ec2-user/tools/elasticsearch-6.5.4/config/

接着對sgadmin客戶端密鑰進行受權:

>chmod 644  root-ca.pem chain-ca.pem client-certificates/CN\=sgadmin.key.pem client-certificates/CN\=sgadmin.crtfull.pem

而後把客戶端密匙的4個文件複製到插件的tools目錄下

cp root-ca.pem chain-ca.pem client-certificates/CN\=sgadmin.key.pem client-certificates/CN\=sgadmin.crtfull.pem  /home/ec2-user/tools/elasticsearch-6.5.4/plugins/search-guard-6/tools 

注:這裏都沒有去使用root用戶權限,本身建立用戶

而後再啓動ES,分別檢查狀態和日誌是否存在錯誤,若是沒有存在錯誤開始執行最後一步,初始化sgadmin配置,以下:

進入目錄
/home/ec2-user/tools/elasticsearch-6.5.4/plugins/search-guard-6/tools
對sgadmin.sh進行受權
chmod 744 sgadmin.sh
執行初始化語句
sh sgadmin.sh -cd ../sgconfig/ -icl -nhnv -cacert root-ca.pem -cert CN=sgadmin.crtfull.pem -key CN=sgadmin.key.pem -h localhost -keypass 83f5fdded6c38208a9ad

執行完以後會出現以下效果:

 最後在瀏覽器書如https://admin:admin@localhost:9200能夠查看結果:

 

kibana安裝配置

  經過 Kibana,可以對 Elasticsearch 中的數據進行可視化並在 Elastic Stack 進行操做.

kibana安裝配置:

  首先下載軟件,而後解壓,修改配置文件config/kibana.yml,再啓動就行,與elasticsearch相似。

 

1 skyworth@skyworth-ubuntu:/home/tools/elk/kibana-6.5.4-linux-x86_64$ vim config/kibana.yml

1 skyworth@skyworth-ubuntu:/home/tools/elk/kibana-6.5.4-linux-x86_64$ ./bin/kibana

kibana後臺啓動

>nohup bin/kibana 1>/home/ec2-user/tools/kibana-6.5.4-linux-x86_64/logs/kibana.log 2>&1 &

 

Install Search Guard on Kibana

這裏只須要修改一下 kibana.yml,增長以下配置就好:

# Use HTTPS instead of HTTP
elasticsearch.url: "https://localhost:9200"

# Configure the Kibana internal server user
elasticsearch.username: "kibanaserver"
elasticsearch.password: "kibanaserver"

# Disable SSL verification because we use self-signed demo certificates
elasticsearch.ssl.verificationMode: none

# Whitelist the Search Guard Multi Tenancy Header
elasticsearch.requestHeadersWhitelist: [ "Authorization", "sgtenant" ]

 

logstash安裝配置

filebeat中message要麼是一段字符串,要麼在日誌生成的時候拼接成json而後在filebeat中指定爲json。可是大部分系統日誌沒法去修改日誌格式,filebeat則沒法經過正則去匹配出對應的field,這時須要結合logstash的grok來過濾。  

Logstash 是開源的服務器端數據處理管道,可以同時從多個來源採集數據,轉換數據,而後將數據發送到存儲庫中。

   安裝過程依然是下載壓縮包,解壓後修改配置文件,再啓動。

  進入config文件夾,咱們能夠修改原有的logstash-sample.conf,或者本身新建一個(這裏選擇本身新建一個logstash-skyworth.conf)。

  

  修改配置文件:

input{
   kafka{
    bootstrap_servers => ["192.168.80.15:9092"]
        client_id => "test"
        group_id => "test"
        auto_offset_reset => "latest" //從最新的偏移量開始消費
        consumer_threads => 5    
        decorate_events => true //此屬性會將當前topic、offset、group、partition等信息也帶到message中
        topics => ["test"]
        type => "skyworth" 
  }
}
filter {
        if ([message]== "")
        {
            drop {}
        }
}
output {
  if [type] == "skyworth"{
        elasticsearch {
        hosts => ["192.168.80.15:9200"]                  # ElasticSearch的地址加端口
        index => "skyworth-log-%{+YYYYMMdd}"            # ElasticSearch的保存文檔的index名稱,
    }
  }
}

啓動命令:

1 >./bin/logstash -f config/logstash-skyworth.conf

注:elastic stack 安全框架X-Pack 安裝配置請參考:http://www.javashuo.com/article/p-edvzmify-mc.html

 總結

  業務流程偷了張圖以下:

  注:有的人會在filebeat和kafka之間再加一層前置的logstash用於格式化日誌,目前我也沒明白爲何要那樣作。

相關文章
相關標籤/搜索