完整的ELK+filebeat+kafka筆記

以前有寫過elasticsearch集羣和elk集羣的博客, 都是基於docker的,使用docker-compose進行編排(K8S暫未掌握)

<font style="color:gray">本文較長,上述兩個博文中的內容本文都會從新講解,建議收藏後進行閱讀。本文包含了ELK + kafka(zookeeper) + filebeat整套方案的架構與實施</font>

ELK介紹

<font style="color:gray">       ELK = Elasticsearch, Logstash, Kibana 是一套實時數據收集,存儲,索引,檢索,統計分析及可視化的解決方案。最新版本已經更名爲Elastic Stack,並新增了Beats項目。</font>

一般當系統或者應用發生故障時,運維或者開發須要登陸到各個服務器上,使用 grep / sed / awk 等 Linux 腳本工具或vim打開文件的方式去日誌裏查找故障緣由。
    在沒有日誌系統的狀況下,首先須要定位處理請求的服務器,若是這臺服務器部署了多個實例,則須要去每一個應用實例的日誌目錄下去找日誌文件。
    每一個應用實例還會設置日誌滾動策略(如:天天生成一個文件),還有日誌壓縮歸檔策略等。
    這樣一系列流程下來,對於咱們排查故障以及及時找到故障緣由,形成了比較大的麻煩,而且太靠人手工操做。
    所以,若是咱們能把這些日誌集中管理,並提供集中檢索功能,不只能夠提升診斷的效率,同時對系統狀況有個全面的理解,還能夠隨時進行擴容,避免過後救火的被動。

總的來講有如下三點用途:
    數據查找:經過檢索日誌信息,定位相應的 bug ,找出解決方案。
    服務診斷:經過對日誌信息進行統計、分析,瞭解服務器的負荷和服務運行狀態
    數據分析:能夠作進一步的應用數據分析,固然須要更深刻的數據分析,以便產生經濟價值。
(摘自:http://www.mamicode.com/info-detail-2741933.html)


固然elasticsearch除了用於ELK,還有許多其它用途, 咱們用的只是它的一小部分功能:
    分佈式的搜索引擎和數據分析引擎
    搜索:網站的站內搜索,IT系統的檢索
    數據分析:電商網站,統計銷售排名前10的商家
    全文檢索,結構化檢索,數據分析
    全文檢索:我想搜索商品名稱包含某個關鍵字的商品
    結構化檢索:我想搜索商品分類爲日化用品的商品都有哪些
    數據分析:咱們分析每個商品分類下有多少個商品
    對海量數據進行近實時的處理
    分佈式:ES自動能夠將海量數據分散到多臺服務器上去存儲和檢索
    海聯數據的處理:分佈式之後,就能夠採用大量的服務器去存儲和檢索數據,天然而然就能夠實現海量數據的處理了
    近實時:檢索數據要花費1小時(這就不要近實時,離線批處理,batch-processing);在秒級別對數據進行搜索和分析
    (摘自:https://blog.csdn.net/paicmis/article/details/82535018)

<font style="color:gray">       想必在平時工做學習中你也聽到過elk或es(elasticsearch),本文爲何在ELK的基礎上又使用了kafka(zookeeper)和filebeat呢?</font>

  • 首先logstash具備日誌採集、過濾、篩選等功能,功能完善但同時體量也會比較大,消耗系統資源天然也多。filebeat做爲一個輕量級日誌採集工具,雖然沒有過濾篩選功能,可是僅僅部署在應用服務器做爲咱們採集日誌的工具能夠是說最好的選擇了。但咱們有些時候可能又須要logstash的過濾篩選功能,因此咱們在採集日誌時用filebeat,而後交給logstash過濾篩選。php

  • 其次,logstash的吞吐量是有限的,一旦短期內filebeat傳過來的日誌過多會產生堆積和堵塞,對日誌的採集也會受到影響,因此在filebeat與logstash中間又加了一層kafka消息隊列來緩存或者說解耦,固然redis也是能夠的。這樣當衆多filebeat節點採集大量日誌直接放到kafka中,logstash慢慢的進行消費,兩邊互不干擾。html

  • 至於zookeeper,想必java的同窗都知道,分佈式服務管理神器,監控管理kafka的節點註冊,topic管理等,同時彌補了kafka集羣節點對外界沒法感知的問題,kafka實際已經自帶了zookeeper,但本教程將會使用獨立的zookeeper進行管理,方便後期zookeeper集羣的擴展。java

ELK + kafka(zookeeper) + filebeat 的設計模型

ELK + kafka(zookeeper) + filebeat 的構建

首先說下準備工做

  • 一、服務器: 三臺,內存最少4G,其中選擇一臺做爲主節點,建議主節點8G內存。node

    • PS:單臺大內存服務器也能夠,本教程爲了更接近實際使用因此建議三臺。linux

    • 若是資源有限,能夠單獨嘗試文章開頭的兩篇博文,也能夠嘗試本教程,但不保證能全部的步驟都成功,可能會內存不足,內存低於4G不建議嘗試)nginx

  • 二、系統環境:linux內核不低於3.10.0git

  • 三、安裝docker環境以及docker-compose,詳情見個人另外一篇博文:http://www.javashuo.com/article/p-coxcbzuv-cy.htmlgithub

  • 四、首先在三臺服務器上搭建es集羣,而後選擇在一個es節點上搭建ELK,而後在三個節點上安裝kafka-zookeeper集羣,最後filebeat,各個環節會進行各功能間的可用性測試。redis

  • 五、elk三個軟件版本均爲7.1.1,版本過低的話搭建完也該升級了,版本過高文檔比較少,坑多,目前最新版本7.5.0,現階段生產環境使用6.x.x的應該比較多,故選擇7.1.1。docker

es集羣的搭建

- 基於單機搭建elasticsearch集羣見官網 https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

本文旨在三臺不一樣的服務器,搭建elasticsearch集羣,版本爲7.1.1

  • 一、服務器列表即配置
172.168.50.40(8G), 172.168.50.41(16G), 172.168.50.240(8G)
    服務器內存儘可能不要低於4G
    選用172.168.50.41做爲master節點,
  • 二、創建存放yml文件的目錄
創建文件夾,/root/elasticsearch(隨意便可),用於存放啓動elasticsearch容器的yml文件以及es的配置文件
   mkdir /root/elasticsearch
  • 三、建立docker-compose.yml文件
cd /root/elasticsearch
    touch docker-compose.yml
  • 四、docker-compose.yml 的文件內容以下
version: '3'
services:
  elasticsearch:                    # 服務名稱
    image: elasticsearch:7.1.1      # 使用的鏡像
    container_name: elasticsearch   # 容器名稱
    restart: always                 # 失敗自動重啓策略
    environment:                                    
      - node.name=node-41                   # 節點名稱,集羣模式下每一個節點名稱惟一
      - network.publish_host=172.168.50.41  # 用於集羣內各機器間通訊,對外使用,其餘機器訪問本機器的es服務,通常爲本機宿主機IP
      - network.host=0.0.0.0                # 設置綁定的ip地址,能夠是ipv4或ipv6的,默認爲0.0.0.0,即本機
      - discovery.seed_hosts=172.168.50.40,172.168.50.240,172.168.50.41          # es7.0以後新增的寫法,寫入候選主節點的設備地址,在開啓服務後,若是master掛了,哪些能夠被投票選爲主節點
      - cluster.initial_master_nodes=172.168.50.40,172.168.50.240,172.168.50.41  # es7.0以後新增的配置,初始化一個新的集羣時須要此配置來選舉master
      - cluster.name=es-cluster     # 集羣名稱,相同名稱爲一個集羣, 三個es節點須一致
      # - http.cors.enabled=true    # 是否支持跨域,是:true // 這裏設置不起做用,可是能夠將此文件映射到宿主機進行修改,而後重啓,解決跨域
      # - http.cors.allow-origin="*" # 表示支持全部域名      // 這裏設置不起做用,可是能夠將此文件映射到宿主機進行修改,而後重啓,解決跨域
      - bootstrap.memory_lock=true  # 內存交換的選項,官網建議爲true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # 設置內存,如內存不足,能夠嘗試調低點
    ulimits:        # 棧內存的上限
      memlock:
        soft: -1    # 不限制
        hard: -1    # 不限制
    volumes:
      - /root/elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml  # 將容器中es的配置文件映射到本地,設置跨域, 不然head插件沒法鏈接該節點
      - esdata:/usr/share/elasticsearch/data  # 存放數據的文件, 注意:這裏的esdata爲 頂級volumes下的一項。
    ports:
      - 9200:9200    # http端口,能夠直接瀏覽器訪問
      - 9300:9300    # es集羣之間相互訪問的端口,jar之間就是經過此端口進行tcp協議通訊,遵循tcp協議。
volumes:
  esdata:
    driver: local    # 會生成一個對應的目錄和文件,如何查看,下面有說明。
  • 另外兩臺服務器也照着這個配置進行配置,但 publish_host,以及 node.name 須要改一下便可。

  • 內存設置,三臺服務器都須要進行設置

兩種設置內存的方式:

一、設置簡單,但機器重啓後需再次設置
    sysctl -w vm.max_map_count=262144

二、直接修改配置文件, 進入sysctl.conf文件添加一行(解決容器內存權限太小問題)

    vi /etc/sysctl.conf

    sysctl vm.max_map_count=262144  # 添加此行

    退出文件後,執行命令: sysctl -p 當即生效
{
  "cluster_name" : "es-cluster",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}
  • elasticsearch.yml 文件內容以下:
network.host: 0.0.0.0
    http.cors.enabled: true        # 是否支持跨域
    http.cors.allow-origin: "*"    # 表示支持全部域名
  • 上面說到的頂級volumes, 如何查看掛載卷在宿主機的位置呢?
# docker volume create elk_data    // 建立一個自定義容器卷,本教程內不須要執行,咱們的docker-compose.yml會幫咱們自動執行改命令。
    # docker volume ls                 // 查看全部容器卷,
    # docker volume inspect elk_data   // 查看指定容器卷詳情信息, 包括真實目錄
    注意: 若是你要刪除一個掛載卷,或者從新生成,請執行刪除卷操做
    # docker volume rm elk_data        // 直接執行這個命令,同時會刪除文件,可是先刪除文件的話,必須再次執行此命令,不然可能致使該節點沒法加入集羣。
  • 以上就是es集羣的搭建,若是出現權限不足等簡單問題,能夠百度自行解決。

  • 接下來咱們將在172.168.50.41服務器上搭建elk服務,用elk中的es替換掉50.41服務器上以前的es。

搭建ELK服務

// 下載elasticsearch,logstash,kibana的鏡像,建議提早下載好。 自es5開始,通常三個軟件的版本都保持一致了。若是下載網絡有問題,嘗試從新配置一下docker的鏡像源地址。

    docker pull docker.elastic.co/elasticsearch/elasticsearch:7.1.1 && docker pull docker.elastic.co/logstash/logstash:7.1.1 && docker pull docker.elastic.co/kibana/kibana:7.1.1

配置

  • 咱們創建一個目錄,用來存放yml文件(docker-compose啓動一組編排過的容器時使用)
mkdir -p /root/elk(位置和名稱都隨意)     # 與剛纔的elasticsearch目錄進行區別
    cd /root/elk
    vim docker-compose.yml
  • docker-compose.yml文件內容以下
version: '3'
services:                                                         # 服務(s表明複數)
  elasticsearch:                                                  # 服務名稱:elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:7.1.1    # 使用的鏡像以及版本號
    container_name: elasticsearch7.1.1                            # 容器名稱
    environment:                                                  # 環境變量
      - node.name=node-41                                         # 節點名稱
      - network.publish_host=172.168.50.41                        # 用於集羣內各機器間通訊,對外使用,其餘機器訪問本機器的es服務,通常爲本機宿主機IP
      - network.host=0.0.0.0                                      # 設置綁定的ip地址,能夠是ipv4或ipv6的,默認爲0.0.0.0,即本機
      - discovery.seed_hosts=172.168.50.40,172.168.50.240,172.168.50.41             # es7.0以後新增的寫法,寫入候選主節點的設備地址,在開啓服務後,若是master掛了,哪些能夠被投票選爲主節點
      - cluster.initial_master_nodes=172.168.50.40,172.168.50.240,172.168.50.41     # es7.0以後新增的配置,初始化一個新的集羣時須要此配置來選舉master
      - cluster.name=es-cluster                                   # 集羣名稱,相同名稱爲一個集羣, 三個es節點須一致                                 
      - bootstrap.memory_lock=true                                # 內存交換的選項,官網建議爲true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"                          # 設置內存,如內存不足,能夠嘗試調低點
      #- discovery.type=single-node                               # 單es節點模式,這裏咱們就不啓用,啓用的話,除了兩個內存外其餘的環境配置都不須要。
    volumes:
      - esdata:/usr/share/elasticsearch/data                      # 設置es數據存放的目錄
      - /root/elk/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml     # 映射es容器中的配置文件到宿主機
    hostname: elasticsearch                                       # 服務hostname
    ulimits:                                                      # 棧內存的上限
      memlock:
        soft: -1                                                  # 不限制
        hard: -1                                                  # 不限制
    restart: always                                               # 重啓策略,失敗時老是重啓
    ports:
      - 9200:9200                                                 # http端口,能夠直接瀏覽器訪問
      - 9300:9300                                                 # es集羣之間相互訪問的端口,jar之間就是經過此端口進行tcp協議通訊,遵循tcp協議。
  kibana:                                                         # 服務名稱:kibana
    image: docker.elastic.co/kibana/kibana:7.1.1                  # 使用的鏡像及版本號
    container_name: kibana7.1.1                                   # 容器名稱
    environment:                                                  # 環境配置
      - elasticsearch.hosts=http://elasticsearch:9200             # 設置鏈接的es節點,此處的elasticsearch即上面的elasticsearch服務中的hostname:elasticsearch 
     # - TZ=Asia/Shanghai                                          # 設置容器內時區                                      
    hostname: kibana                                              # 服務hostname
    depends_on:                                                   
      - elasticsearch                                             # 依賴es服務,必須先啓動es纔會啓動kibana
    restart: always
    ports:
      - 5601:5601                                                 # 訪問的端口號
  logstash:                                                       # 服務名稱:logstash
    image: docker.elastic.co/logstash/logstash:7.1.1 
    container_name: logstash7.1.1
    hostname: logstash
    restart: always
    # volumes:     # 掛載卷,先註釋,等ELK啓動起來以後,從容器中複製出來一份,下面會說
    #  - /root/elk/logstash/config/:/usr/share/logstash/config/     # logstash 配置文件目錄
    #  - /root/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ # logstash 的採集與輸入的配置文件目錄
    depends_on:
      - elasticsearch
    # ports:
     # - 9600:9600                                                 # 端口號,後來沒用到
     # - 5044:5044                                                 # filebeat直接傳遞數據給logstash時默認使用的端口號,測試了一下,沒用
volumes:
  esdata:
    driver: local
  • 上面用到了/root/elk/logstash/ 下面的config目錄與pipeline目錄,如何獲得這兩個目錄呢?

    // 先中止以前該服務器上的es節點 docker stop es節點ID docker rm es節點ID

    • 一、在/root/elk目錄下,docker-compose up -d 啓動elk

    • 二、mkdir logstash

    • 三、從容器中複製兩個目錄到logstash下,這樣咱們就有了初始配置文件

docker cp 容器ID:/usr/share/logstash/config/ logstash     // 容器ID爲Logstash的ID
    docker cp 容器ID:/usr/share/logstash/pipeline/ logstash
  • 修改配置文件 config目錄中的logstash.yml文件,內容以下
http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: [ "172.168.50.41:9200","172.168.50.40:9200","172.168.50.240:9200" ]

// elasticsearch.hosts 爲你的es服務器列表以及端口號,用於將數據存儲到es
  • 修改pipeline目錄中的logstash.conf文件(此文件是經過config目錄下的 pipelines.yml 來指定的),內容以下
input{                                         # 輸入組件
    kafka{                                     # 從kafka消費數據
        bootstrap_servers => ["172.168.50.41:9092,172.168.50.41:9097,172.168.50.41:9098"]  # kafka節點的IP+端口號
        #topics => "%{[@metadata][topic]}"     # 使用kafka傳過來的topic
        topics_pattern => "elk-.*"             # 使用正則匹配topic
        codec => "json"                        # 數據格式
        consumer_threads => 3                  # 消費線程數量
        decorate_events => true                # 可向事件添加Kafka元數據,好比主題、消息大小的選項,這將向logstash事件中添加一個名爲kafka的字段,
        auto_offset_reset => "latest"          # 自動重置偏移量到最新的偏移量
        group_id => "logstash-node-41"         # 消費組ID,多個有相同group_id的logstash實例爲一個消費組
  }
}
filter {
   # 當非業務字段時,無traceId則移除
   if ([message] =~ "traceId=null") {          # 過濾組件,這裏只是展現,無實際意義,根據本身的業務需求進行過濾
      drop {}
   }
}
output {                                       # 輸出組件
    elasticsearch {
        # Logstash輸出到es
        hosts => ["172.168.50.41:9200", "172.168.50.40:9200", "172.168.50.240:9200"]
        # index => "%{[fields][source]}"    #直接在日誌中匹配,索引會去掉elk
        index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}"  # 以日期建索引
        #flush_size => 20000
        #idle_flush_time => 10
        #sniffing => true
        #template_overwrite => false
    }
#    stdout {
#        codec => rubydebug
#    }
}
# 能夠寫多個input,可是output中的index即索引要處理好,根據過濾組件或者kafka傳過來的不一樣話題,應有不一樣的索引
# 也能夠寫多個output,向其餘程序(非es)傳送數據
  • 有了這兩個目錄後,將docker-compose.yml文件中的三行註釋打開
volumes:     # 掛載卷,先註釋,等ELK啓動起來以後,從容器中複製出來一份,下面會說
      - /root/elk/logstash/config/:/usr/share/logstash/config/     # logstash 配置文件目錄
      - /root/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ # logstash 的採集與輸入的配置文件目錄
  • 在/root/elk目錄,從新編譯和啓動elk
// 啓動ELK
    docker-compose up -d --force --build
  • 出現done表示成功,docker-compose logs 查看日誌(分別輸出elk三個服務的日誌)執行docker-compose ps 能夠看到三個服務的運行狀態

  • 在瀏覽器輸入http://IP:5601/ 訪問kibana。

  • 可使用head監控es

// 拉取鏡像
    docker pull mobz/elasticsearch-head:5
    // 啓動
    docker run -d --name es_admin -p 9100:9100 mobz/elasticsearch-head:5

  • 注意,IP地址要使用你es所在服務器的地址,而後點擊鏈接,出現相似的顯示說明,沒問題,本教程是三個節點,當關掉一個es會變成兩個節點。

  • 若是點擊鏈接沒有反應,F12查看network是403,或者200,可是console中提示跨域,那麼應該是跨域未設置好,從新回去檢查一遍,看看容器內配置文件是否加入了那兩行跨域。

  • 至此,咱們的Elasticsearch(集羣)+ Logstash + Kibana 搭建完成, 你能夠在kibana中的dev-tools菜單中操做es,甚至向es中添加數據,或者經過其餘方式向es中添加數據進行測試。

  • Logstash能夠採集數據, 過濾數據; Elasticsearch 能夠存儲和索引數據; Kibana提供可視化展現和操做

  • 這個時候就能夠配置Logstash採集數據而後想es輸入了,可是咱們並不知足於此,我想在須要採集日誌的應用服務器上安裝 filebeat,而後經filebeat將數據轉發給Logstash,Logstash只進行過濾便可(也能夠filebeat直接將數據傳給es,不要Logstash,看我的需求)。

  • 但我沒有保留filebeat直接傳給Logstash的配置文件,因此咱們直接在此時將kafka也接入進來,再也不演示filebeat與Logstash直接交互的情景。

  • 不用擔憂,咱們儘可能下降複雜度,好比,如今咱們不考慮ELK,也不考慮filebeat,拋開這些東西,僅僅搭建一個kafka集羣。

  • kafka依賴zookeeper,咱們前面說過,本文使用獨立的zookeeper集羣,因此接下來介紹如何搭建zookeeper

zookeeper集羣的搭建

  • 依然是三節點集羣,可是爲了減小複雜性,咱們直接在172.168.50.41(內存最大的服務器)上經過docker容器搭建單機zookeeper集羣。

  • 創建目錄,並建立yml文件

mkdir -p /root/zookeeper
    cd /root/zookeeper
    vim docker-compose.yml
  • docker-compose.yml文件內容以下:
version: '2'
services:
  zoo1:
    image: zookeeper:3.4.14                                    # 儘可能使用3.4版本,3.4.14的鏡像就能夠。 3.5坑太多
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
      - 2181:2181                                              # 端口號
    volumes:
      - /usr/local/docker-compose/zk/zk1/data:/data            # 數據文件存放目錄
      - /usr/local/docker-compose/zk/zk1/datalog:/datalog      # 日誌文件存放目錄
      - /root/zookeeper/zoo.cfg:/conf/zoo.cfg                  # zookeeper的配置文件,下面會給出該文件內容
    environment:
      ZOO_MY_ID: 1                                             # 表示 ZK服務的 id, 它是1-255 之間的整數, 必須在集羣中惟一
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181   # ZK集羣的主機列表即端口號
      # ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      default:
        ipv4_address: 172.23.0.11                              # 集羣使用的網絡
  zoo2:
    image: zookeeper:3.4.14
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2180:2181
    volumes:
      - /usr/local/docker-compose/zk/zk2/data:/data
      - /usr/local/docker-compose/zk/zk2/datalog:/datalog
      - /root/zookeeper/zoo.cfg:/conf/zoo.cfg
    environment:
      ZOO_MY_ID: 2
      #ZOOKEEPER_CLIENT_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
    networks:
      default:
        ipv4_address: 172.23.0.12
  zoo3:
    image: zookeeper:3.4.14
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2179:2181
    volumes:
      - /usr/local/docker-compose/zk/zk3/data:/data
      - /usr/local/docker-compose/zk/zk3/datalog:/datalog
      - /root/zookeeper/zoo.cfg:/conf/zoo.cfg
    environment:
      ZOO_MY_ID: 3
      #ZOOKEEPER_CLIENT_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    networks:
      default:
        ipv4_address: 172.23.0.13
networks:
  default:
    external:
      name: zookeeper_network          # 網絡名稱,此網絡須要進行建立
  • zoo.cfg文件內容以下:
tickTime=2000       # 心跳2秒,服務器之間或客戶端與服務器之間維持心跳的時間間隔
initLimit=10        # 集羣中的follower服務器(F)與leader服務器(L)之間 初始鏈接 時能容忍的最多心跳數(tickTime的數量)。
syncLimit=5         # 集羣中的follower服務器(F)與leader服務器(L)之間 請求和應答 之間能容忍的最多心跳數(tickTime的數量)。
dataDir=/data       # 數據目錄  
dataLogDir=/datalog # 日誌文件目錄
clientPort=2181     # 客戶端鏈接端口
autopurge.snapRetainCount=3     # 至少須要保留3個快照數據文件和對應的事務日誌文件 
autopurge.purgeInterval=1       # 進行歷史文件自動清理的頻率,單位小時
server.1= zoo1:2888:3888        # 定義個服務通訊端口,以及投票端口
server.2= zoo2:2888:3888 
server.3= zoo3:2888:3888
  • 啓動前先建立網絡
docker network create --driver bridge --subnet 172.23.0.0/25 --gateway 172.23.0.1  zookeeper_network
  • 啓動zk集羣
當前目錄下執行 docker-compose up -d
  • 鏈接zk,並打開客戶端
docker run -it --rm --link zoo1:zk1 --link zoo2:zk2 --link zoo3:zk3 --net zookeeper_network zookeeper:3.4.14 zkCli.sh -server zk1:2181,zk2:2181,zk3:2181
        // 執行完上面的命令後,會進去zk的客戶端,咱們能夠進行一些操做,好比
	執行 ls /  能夠查看根節點信息,而後根據現實內容,查看下一級數據節點的信息
        建議等下搭建完kafka集羣並建立topic後經過 ls /brokers/topics 來查看topic列表(kafka在zookeeper註冊後,建立的話題會被zookeeper記錄下來)
        
        zk經常使用命令

            create /test "test"

            get /test

            set /test "hello test"

            delete /test

            quit
  • 至此zookeeper集羣搭建完畢,比較簡單。

kafka集羣的搭建

  • 老規矩先創建目錄,用於存放yml文件以及kafka的配置文件
mkdir -p /root/kafka
    cd /root/kafka
    vim docker-compose.yml
  • docker-compose.yml 文件內容以下:
version: '2'
 
services:
  kafka1:
    image: wurstmeister/kafka:2.12-2.3.0
    restart: always
    hostname: kafka1
    container_name: kafka1
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.168.50.41:9092       # 宿主機的IP地址而非容器的IP,及暴露出來的端口
      KAFKA_ADVERTISED_HOST_NAME: 172.168.50.41                        # 外網訪問地址
      KAFKA_ADVERTISED_PORT: 9092                                      # 端口
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181           # 鏈接的zookeeper服務及端口
      JMX_PORT: 9966                                                   # kafka須要監控broker和topic的數據的時候,是須要開啓jmx_port的
    volumes:
    - /usr/local/docker-compose/kafka/kafka1:/kafka                    # kafka數據文件存儲目錄
    - /root/kafka/kafka-run-class.sh:/opt/kafka_2.12-2.3.0/bin/kafka-run-class.sh  # kafka啓動時的配置文件,咱們修改了一部分代碼,用來解決java.rmi.server.ExportException: Port already in use這個問題。過程詳見https://github.com/apache/kafka/pull/1983/commits/2c5d40e946bcc149b1a9b2c01eced4ae47a734c5。
    external_links:                                                    # 鏈接本compose文件之外的容器
    - zoo1
    - zoo2
    - zoo3
    networks:
      default:
        ipv4_address: 172.23.0.14                                      # 使用zookeeper建立的網絡中一個IP
 
  kafka2:
    image: wurstmeister/kafka:2.12-2.3.0
    restart: always
    hostname: kafka2
    container_name: kafka2
    ports:
    - "9097:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.168.50.41:9097 
      KAFKA_ADVERTISED_HOST_NAME: 172.168.50.41
      KAFKA_ADVERTISED_PORT: 9097
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      JMX_PORT: 9988
    volumes:
    - /usr/local/docker-compose/kafka/kafka2:/kafka
    - /root/kafka/kafka-run-class.sh:/opt/kafka_2.12-2.3.0/bin/kafka-run-class.sh
    external_links:                                                  # 鏈接compose文件之外的container
    - zoo1
    - zoo2
    - zoo3
    networks:
      default:
        ipv4_address: 172.23.0.15
 
  kafka3:
    image: wurstmeister/kafka:2.12-2.3.0
    restart: always
    hostname: kafka3
    container_name: kafka3
    ports:
    - "9098:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.168.50.41:9098
      KAFKA_ADVERTISED_HOST_NAME: 172.168.50.41
      KAFKA_ADVERTISED_PORT: 9098
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
      JMX_PORT: 9977
    volumes:
    - /usr/local/docker-compose/kafka/kafka3
    - /root/kafka/kafka-run-class.sh:/opt/kafka_2.12-2.3.0/bin/kafka-run-class.sh
    external_links:  # 鏈接compose文件之外的container
    - zoo1
    - zoo2
    - zoo3
    networks:
      default:
        ipv4_address: 172.23.0.16
 
  kafka-manager:                                            # kafka管理和監控的一個工具
    image: sheepkiller/kafka-manager:latest                 # 此時我使用的最新版本爲5.0
    restart: always
    container_name: kafa-manager
    hostname: kafka-manager
    ports:
      - "9099:9000"                                         # 瀏覽器訪問的端口
    links:                                                  # 鏈接本compose文件建立的container
      - kafka1
      - kafka2
      - kafka3
    external_links:                                         # 鏈接compose文件之外的container
      - zoo1
      - zoo2
      - zoo3
    environment:
      ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181               # zk服務列表
      KAFKA_BROKERS: kafka1:9092,kafka2:9097,kafka3:9098    # kafka節點列表及端口號
      APPLICATION_SECRET: letmein                           # 應用祕鑰,默認letmein        
      KM_ARGS: -Djava.net.preferIPv4Stack=true              # 在ipv4的機器上想獲取到完整的機器名, 應該是IPV4網絡相關的東西
    networks:
      default:
        ipv4_address: 172.23.0.10                           # IP地址,zookeeper建立的網絡中的一個IP
 
networks:
  default:
    external:   
      name: zookeeper_network                               # 使用zookeeper建立的網絡
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  exit 1
fi

# CYGWIN == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; then
  CYGWIN=1
else
  CYGWIN=0
fi

if [ -z "$INCLUDE_TEST_JARS" ]; then
  INCLUDE_TEST_JARS=false
fi

# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
should_include_file() {
  if [ "$INCLUDE_TEST_JARS" = true ]; then
    return 0
  fi
  file=$1
  if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
    return 0
  else
    return 1
  fi
}

################ lizheng  #################          
# need to check if called to start server or client  
# in order to correctly decide about JMX_PORT      
ISKAFKASERVER="false"                                                
if [[ "$*" =~ "kafka.Kafka" ]]; then                               
    ISKAFKASERVER="true"                                                  
fi                                                                   
################ lizheng  ################# 

base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
  SCALA_VERSION=2.12.8
fi

if [ -z "$SCALA_BINARY_VERSION" ]; then
  SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi

# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
do
  CLASSPATH="$CLASSPATH:$dir/*"
done

for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
  clients_lib_dir=$(dirname $0)/../clients/build/libs
  streams_lib_dir=$(dirname $0)/../streams/build/libs
  rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
  streams_lib_dir=$clients_lib_dir
  rocksdb_lib_dir=$streams_lib_dir
fi


for file in "$clients_lib_dir"/kafka-clients*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for file in "$streams_lib_dir"/kafka-streams*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
else
  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$file":"$CLASSPATH"
    fi
  done
fi

for file in "$rocksdb_lib_dir"/rocksdb*.jar;
do
  CLASSPATH="$CLASSPATH":"$file"
done

for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
do
  CLASSPATH="$CLASSPATH:$dir/*"
done

for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension"
do
  for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
  if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
    CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
  fi
done

# classpath addition for release
for file in "$base_dir"/libs/*;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done
shopt -u nullglob

if [ -z "$CLASSPATH" ] ; then
  echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"
  exit 1
fi

# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
fi

# JMX port to use
#if [  $JMX_PORT ]; then
if [  $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then
  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi

# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi

# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
  # Log to console. This is a tool.
  LOG4J_DIR="$base_dir/config/tools-log4j.properties"
  # If Cygwin is detected, LOG4J_DIR is converted to Windows format.
  (( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
  KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else
  # create logs directory
  if [ ! -d "$LOG_DIR" ]; then
    mkdir -p "$LOG_DIR"
  fi
fi

# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS=""
fi

# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then

    # Use default ports
    DEFAULT_JAVA_DEBUG_PORT="5005"

    if [ -z "$JAVA_DEBUG_PORT" ]; then
        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
    fi

    # Use the defaults if JAVA_DEBUG_OPTS was not set
    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
    if [ -z "$JAVA_DEBUG_OPTS" ]; then
        JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
    fi

    echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
    KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi

# Which java to use
if [ -z "$JAVA_HOME" ]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; then
  KAFKA_HEAP_OPTS="-Xmx256M"
fi

# JVM performance options
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
fi

while [ $# -gt 0 ]; do
  COMMAND=$1
  case $COMMAND in
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;
    -loggc)
      if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
        GC_LOG_ENABLED="true"
      fi
      shift
      ;;
    -daemon)
      DAEMON_MODE="true"
      shift
      ;;
    *)
      break
      ;;
  esac
done

# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX

  # The first segment of the version number, which is '1' for releases before Java 9
  # it then becomes '9', '10', ...
  # Some examples of the first line of `java --version`:
  # 8 -> java version "1.8.0_152"
  # 9.0.4 -> java version "9.0.4"
  # 10 -> java version "10" 2018-03-20
  # 10.0.1 -> java version "10.0.1" 2018-04-17
  # We need to match to the end of the line to prevent sed from printing the characters that do not match
  JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
    KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
  else
    KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
  fi
fi

# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}

# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")

# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
  • 啓動kafka,在當前目錄下
docker-compose up -d
注意點:
一、單機搭建zk集羣須要建立網絡
二、zk的版本要使用3.4的,3.5坑太多,另外須要掛載配置文件,並在配置文件中設置客戶端端口
三、使kafka與zk在同一網絡下,
四、映射kafka-run-class.sh文件,主要修改配置解決JMX_PORT相關的報錯
  • 經過 http://你的IP:9099 訪問kafka-manager,而後點擊 Cluster 按鈕, 在下拉框中選擇 Add Cluster, 添加你的kafka集羣接口,其中kafka的版本不用管他

  • 而後查看該集羣,能看到相關的zookeeper、topic等信息。

  • 而後咱們能夠建立話題並查看

一、 經過docker exec -it 容器ID /bin/bash 進入容器
    二、 cd /opt/kafka_2.12-2.3.0/
    三、 bin/kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 3 --partitions 3 --topic test (測試數據記得刪除)
    四、 bin/kafka-topics.sh --list --zookeeper zoo1:2181  # 查看話題列表
  • 話題建立成功後,就能夠退出容器了,咱們在宿主機上運行一個生產者和一個消費者
運行一個消息生產者
	bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
運行一個消息消費者
	bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

此時咱們在生產者中輸入任何字符,都會及時的顯示在消費者的頁面中,而且kafka在zookeeper註冊後,建立的話題會被zookeeper記錄下來。
  • 另外咱們能夠在zookeeper中查看話題是否建立成功
鏈接zk,並打開客戶端
        docker run -it --rm --link zoo1:zk1 --link zoo2:zk2 --link zoo3:zk3 --net zookeeper_network zookeeper:3.4.14 zkCli.sh -server zk1:2181,zk2:2181,zk3:2181
        // 執行完上面的命令後,會進去zk的客戶端,咱們能夠進行一些操做,好比
	執行 ls /  能夠查看根節點信息,而後根據現實內容,查看下一級數據節點的信息
        建議等下搭建完kafka集羣並建立topic後經過 ls /brokers/topics 來查看topic列表(kafka在zookeeper註冊後,建立的話題會被zookeeper記錄下來)
  • 至此,kafka + zookeeper 集羣搭建完畢,也進行了topic的建立以及監聽,記得刪除測試數據。

下面講解filebeat的部署在應用服務器採集應用或系統日誌,此處咱們以Nginx日誌爲例

咱們選擇172.168.50.240 做爲應用服務器,你能夠將filebeat部署到任何能夠與kafka集羣通訊的機器

  • 老規矩,創建文件夾,存放yml以及filebeat的配置文件
mkdir -p /root/filebeat  // 能夠是任意目錄
    cd /root/filebeat
    vim docker-compose.yml
  • docker-compose.yml的內容以下:
version: '3'
services:
  filebeat-240:
    image: docker.elastic.co/beats/filebeat:7.1.1        # 鏡像依然選擇與elk的版本一致
    container_name: filebeat-240
    restart: always
    volumes:
      - /home/wwwlogs:/home/wwwlogs                      # 此處是宿主機的Nginx日誌目錄(修改成你本身的目錄)
      # - /root/dockerfiles/nginx/logs:/logs/nginx/logs  # 宿主機部署了nginx+php的 docker環境,此處爲容器中Nginx映射到宿主機的日誌目錄
      - /root/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml   # filebeat的配置文件
  • 上面提到的filebeat.yml 配置文件以下:
filebeat.inputs:                   # inputs爲複數,表名type能夠有多個
- type: log                        # 輸入類型
  access:
  enabled: true                    # 啓用這個type配置
  paths:
    - /logs/nginx/logs/access.log  # 監控nginx1 的access日誌,建議採集日誌時先採集單一nginx的access日誌。
    - /home/wwwlogs/access.log     # 監控nginx2 的access日誌(docker產生的日誌)
  close_rename: true               # 重命名後,文件將不會繼續被採集信息
  tail_files: true                 # 配置爲true時,filebeat將重新文件的最後位置開始讀取,若是配合日誌輪循使用,新文件的第一行將被跳過
  multiline.pattern: '^\[.+\]'     # 這三行表示,若是是多行日誌,合併爲一行
  multiline.negate: true
  multiline.match: "after"

  fields:                          # 額外的字段
    source: nginx-access-240       # 自定義source字段,用於es建議索引(字段名小寫,我記得大寫好像不行)
- type: log
  access:
  enabled: true
  paths:
    - /logs/nginx/logs/error.log  # 監控nginx1 的error日誌 
    - /home/wwwlogs/error.log     # 監控nginx2 的error日誌(docker產生的日誌)
  fields:
    source: nginx-error-240
  # 多行合併一行配置
  multiline.pattern: '^\[.+\]'
  multiline.negate: true
  multiline.match: "after"
  tail_files: true

filebeat.config:                 # 這裏是filebeat的各模塊配置,我理解modules爲inputs的一種新寫法。
  modules:
    path: ${path.config}/modules.d/*.yml    # 進入容器查看這裏有不少模塊配置文件,Nginx,redis,apache什麼的
    reload.enabled: false                   # 設置爲true來啓用配置重載
    reload.period: 10s                      # 檢查路徑下的文件更改的期間(多久檢查一次)

# 在7.4版本中,自定義es的索引須要把ilm設置爲false, 這裏未驗證,抄來的
setup.ilm.enabled: false

output.kafka:            # 輸出到kafka
  enabled: true          # 該output配置是否啓用
  hosts: ["172.168.50.41:9092", "172.168.50.41:9097", "172.168.50.41:9098"]  # kafka節點列表
  topic: "elk-%{[fields.source]}"   # kafka會建立該topic,而後logstash(能夠過濾修改)會傳給es做爲索引名稱
  partition.hash:
    reachable_only: true # 是否只發往可達分區
  compression: gzip      # 壓縮
  max_message_bytes: 1000000  # Event最大字節數。默認1000000。應小於等於kafka broker message.max.bytes值
  required_acks: 1  # kafka ack等級
  worker: 3  # kafka output的最大併發數
  # version: 0.10.1      # kafka版本
  bulk_max_size: 2048    # 單次發往kafka的最大事件數
logging.to_files: true   # 輸出全部日誌到file,默認true, 達到日誌文件大小限制時,日誌文件會自動限制替換,詳細配置:https://www.cnblogs.com/qinwengang/p/10982424.html
close_older: 30m         # 若是一個文件在某個時間段內沒有發生過更新,則關閉監控的文件handle。默認1h
force_close_files: false # 這個選項關閉一個文件,當文件名稱的變化。只在window建議爲true 

#沒有新日誌採集後多長時間關閉文件句柄,默認5分鐘,設置成1分鐘,加快文件句柄關閉;
#close_inactive: 1m
#
##傳輸了3h後荏沒有傳輸完成的話就強行關閉文件句柄,這個配置項是解決以上案例問題的key point;
#close_timeout: 3h
#
###這個配置項也應該配置上,默認值是0表示不清理,不清理的意思是採集過的文件描述在registry文件裏永不清理,在運行一段時間後,registry會變大,可能會帶來問題。
#clean_inactive: 72h
#
##設置了clean_inactive後就須要設置ignore_older,且要保證ignore_older < clean_inactive
#ignore_older: 70h
#
## 限制 CPU和內存資源
#max_procs: 1 
#queue.mem.events: 256
#queue.mem.flush.min_events: 128
  • 配置完成後,docker-compose up -d 啓動filebeat服務

  • 訪問你的nginx,看看access.log文件是否正常產生日誌

  • 經過kafka-manage查看是否接受到消息 http://172.168.50.41:9099/ (部署kafka-manage服務的機器)

  • kafka正常收到消息後,經過elasticsearch-head查看是否正常創建索引 http://172.168.50.41:9201/ (部署elasticsearch-head服務的機器)

  • 如kafka未正常收到消息,檢查file的配置,如es未正常創建索引,檢查logstash配置並查看日誌

  • 查看單個容器日誌

docker logs --since 30m 容器ID

以上就是日誌採集與存儲的過程,以Nginx的access.log日誌(應用日誌,接口日誌也同樣)爲例,filebeat從服務器的日誌文件採集到日誌,發送給kafka,而後由logstash進行消費,再經由logstash過濾後輸出到elasticsearch進行存儲並創建索引。

  • 須要注意的一點就是容器內時區設置的問題

  • kibana的使用教程,不打算寫了,我會找一篇優質的文章貼過來。

  • 轉載請註明原文

相關文章
相關標籤/搜索