一般當系統或者應用發生故障時,運維或者開發須要登陸到各個服務器上,使用 grep / sed / awk 等 Linux 腳本工具或vim打開文件的方式去日誌裏查找故障緣由。 在沒有日誌系統的狀況下,首先須要定位處理請求的服務器,若是這臺服務器部署了多個實例,則須要去每一個應用實例的日誌目錄下去找日誌文件。 每一個應用實例還會設置日誌滾動策略(如:天天生成一個文件),還有日誌壓縮歸檔策略等。 這樣一系列流程下來,對於咱們排查故障以及及時找到故障緣由,形成了比較大的麻煩,而且太靠人手工操做。 所以,若是咱們能把這些日誌集中管理,並提供集中檢索功能,不只能夠提升診斷的效率,同時對系統狀況有個全面的理解,還能夠隨時進行擴容,避免過後救火的被動。 總的來講有如下三點用途: 數據查找:經過檢索日誌信息,定位相應的 bug ,找出解決方案。 服務診斷:經過對日誌信息進行統計、分析,瞭解服務器的負荷和服務運行狀態 數據分析:能夠作進一步的應用數據分析,固然須要更深刻的數據分析,以便產生經濟價值。 (摘自:http://www.mamicode.com/info-detail-2741933.html) 固然elasticsearch除了用於ELK,還有許多其它用途, 咱們用的只是它的一小部分功能: 分佈式的搜索引擎和數據分析引擎 搜索:網站的站內搜索,IT系統的檢索 數據分析:電商網站,統計銷售排名前10的商家 全文檢索,結構化檢索,數據分析 全文檢索:我想搜索商品名稱包含某個關鍵字的商品 結構化檢索:我想搜索商品分類爲日化用品的商品都有哪些 數據分析:咱們分析每個商品分類下有多少個商品 對海量數據進行近實時的處理 分佈式:ES自動能夠將海量數據分散到多臺服務器上去存儲和檢索 海聯數據的處理:分佈式之後,就能夠採用大量的服務器去存儲和檢索數據,天然而然就能夠實現海量數據的處理了 近實時:檢索數據要花費1小時(這就不要近實時,離線批處理,batch-processing);在秒級別對數據進行搜索和分析 (摘自:https://blog.csdn.net/paicmis/article/details/82535018)
首先logstash具備日誌採集、過濾、篩選等功能,功能完善但同時體量也會比較大,消耗系統資源天然也多。filebeat做爲一個輕量級日誌採集工具,雖然沒有過濾篩選功能,可是僅僅部署在應用服務器做爲咱們採集日誌的工具能夠是說最好的選擇了。但咱們有些時候可能又須要logstash的過濾篩選功能,因此咱們在採集日誌時用filebeat,而後交給logstash過濾篩選。php
其次,logstash的吞吐量是有限的,一旦短期內filebeat傳過來的日誌過多會產生堆積和堵塞,對日誌的採集也會受到影響,因此在filebeat與logstash中間又加了一層kafka消息隊列來緩存或者說解耦,固然redis也是能夠的。這樣當衆多filebeat節點採集大量日誌直接放到kafka中,logstash慢慢的進行消費,兩邊互不干擾。html
至於zookeeper,想必java的同窗都知道,分佈式服務管理神器,監控管理kafka的節點註冊,topic管理等,同時彌補了kafka集羣節點對外界沒法感知的問題,kafka實際已經自帶了zookeeper,但本教程將會使用獨立的zookeeper進行管理,方便後期zookeeper集羣的擴展。java
一、服務器: 三臺,內存最少4G,其中選擇一臺做爲主節點,建議主節點8G內存。node
PS:單臺大內存服務器也能夠,本教程爲了更接近實際使用因此建議三臺。linux
若是資源有限,能夠單獨嘗試文章開頭的兩篇博文,也能夠嘗試本教程,但不保證能全部的步驟都成功,可能會內存不足,內存低於4G不建議嘗試)nginx
二、系統環境:linux內核不低於3.10.0git
三、安裝docker環境以及docker-compose,詳情見個人另外一篇博文:http://www.javashuo.com/article/p-coxcbzuv-cy.htmlgithub
四、首先在三臺服務器上搭建es集羣,而後選擇在一個es節點上搭建ELK,而後在三個節點上安裝kafka-zookeeper集羣,最後filebeat,各個環節會進行各功能間的可用性測試。redis
五、elk三個軟件版本均爲7.1.1,版本過低的話搭建完也該升級了,版本過高文檔比較少,坑多,目前最新版本7.5.0,現階段生產環境使用6.x.x的應該比較多,故選擇7.1.1。docker
- 基於單機搭建elasticsearch集羣見官網 https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html
172.168.50.40(8G), 172.168.50.41(16G), 172.168.50.240(8G) 服務器內存儘可能不要低於4G 選用172.168.50.41做爲master節點,
創建文件夾,/root/elasticsearch(隨意便可),用於存放啓動elasticsearch容器的yml文件以及es的配置文件 mkdir /root/elasticsearch
cd /root/elasticsearch touch docker-compose.yml
version: '3' services: elasticsearch: # 服務名稱 image: elasticsearch:7.1.1 # 使用的鏡像 container_name: elasticsearch # 容器名稱 restart: always # 失敗自動重啓策略 environment: - node.name=node-41 # 節點名稱,集羣模式下每一個節點名稱惟一 - network.publish_host=172.168.50.41 # 用於集羣內各機器間通訊,對外使用,其餘機器訪問本機器的es服務,通常爲本機宿主機IP - network.host=0.0.0.0 # 設置綁定的ip地址,能夠是ipv4或ipv6的,默認爲0.0.0.0,即本機 - discovery.seed_hosts=172.168.50.40,172.168.50.240,172.168.50.41 # es7.0以後新增的寫法,寫入候選主節點的設備地址,在開啓服務後,若是master掛了,哪些能夠被投票選爲主節點 - cluster.initial_master_nodes=172.168.50.40,172.168.50.240,172.168.50.41 # es7.0以後新增的配置,初始化一個新的集羣時須要此配置來選舉master - cluster.name=es-cluster # 集羣名稱,相同名稱爲一個集羣, 三個es節點須一致 # - http.cors.enabled=true # 是否支持跨域,是:true // 這裏設置不起做用,可是能夠將此文件映射到宿主機進行修改,而後重啓,解決跨域 # - http.cors.allow-origin="*" # 表示支持全部域名 // 這裏設置不起做用,可是能夠將此文件映射到宿主機進行修改,而後重啓,解決跨域 - bootstrap.memory_lock=true # 內存交換的選項,官網建議爲true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # 設置內存,如內存不足,能夠嘗試調低點 ulimits: # 棧內存的上限 memlock: soft: -1 # 不限制 hard: -1 # 不限制 volumes: - /root/elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml # 將容器中es的配置文件映射到本地,設置跨域, 不然head插件沒法鏈接該節點 - esdata:/usr/share/elasticsearch/data # 存放數據的文件, 注意:這裏的esdata爲 頂級volumes下的一項。 ports: - 9200:9200 # http端口,能夠直接瀏覽器訪問 - 9300:9300 # es集羣之間相互訪問的端口,jar之間就是經過此端口進行tcp協議通訊,遵循tcp協議。 volumes: esdata: driver: local # 會生成一個對應的目錄和文件,如何查看,下面有說明。
另外兩臺服務器也照着這個配置進行配置,但 publish_host,以及 node.name 須要改一下便可。
內存設置,三臺服務器都須要進行設置
兩種設置內存的方式: 一、設置簡單,但機器重啓後需再次設置 sysctl -w vm.max_map_count=262144 二、直接修改配置文件, 進入sysctl.conf文件添加一行(解決容器內存權限太小問題) vi /etc/sysctl.conf sysctl vm.max_map_count=262144 # 添加此行 退出文件後,執行命令: sysctl -p 當即生效
而後三臺服務器依次執行 docker-compose up -d
而後訪問 http://172.168.50.41:9200/_cluster/health?pretty 查看是否集羣正常運行, 正常運行會返回以下信息
{ "cluster_name" : "es-cluster", "status" : "green", "timed_out" : false, "number_of_nodes" : 3, "number_of_data_nodes" : 3, "active_primary_shards" : 0, "active_shards" : 0, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0 }
network.host: 0.0.0.0 http.cors.enabled: true # 是否支持跨域 http.cors.allow-origin: "*" # 表示支持全部域名
# docker volume create elk_data // 建立一個自定義容器卷,本教程內不須要執行,咱們的docker-compose.yml會幫咱們自動執行改命令。 # docker volume ls // 查看全部容器卷, # docker volume inspect elk_data // 查看指定容器卷詳情信息, 包括真實目錄 注意: 若是你要刪除一個掛載卷,或者從新生成,請執行刪除卷操做 # docker volume rm elk_data // 直接執行這個命令,同時會刪除文件,可是先刪除文件的話,必須再次執行此命令,不然可能致使該節點沒法加入集羣。
以上就是es集羣的搭建,若是出現權限不足等簡單問題,能夠百度自行解決。
接下來咱們將在172.168.50.41服務器上搭建elk服務,用elk中的es替換掉50.41服務器上以前的es。
// 下載elasticsearch,logstash,kibana的鏡像,建議提早下載好。 自es5開始,通常三個軟件的版本都保持一致了。若是下載網絡有問題,嘗試從新配置一下docker的鏡像源地址。 docker pull docker.elastic.co/elasticsearch/elasticsearch:7.1.1 && docker pull docker.elastic.co/logstash/logstash:7.1.1 && docker pull docker.elastic.co/kibana/kibana:7.1.1
mkdir -p /root/elk(位置和名稱都隨意) # 與剛纔的elasticsearch目錄進行區別 cd /root/elk vim docker-compose.yml
version: '3' services: # 服務(s表明複數) elasticsearch: # 服務名稱:elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:7.1.1 # 使用的鏡像以及版本號 container_name: elasticsearch7.1.1 # 容器名稱 environment: # 環境變量 - node.name=node-41 # 節點名稱 - network.publish_host=172.168.50.41 # 用於集羣內各機器間通訊,對外使用,其餘機器訪問本機器的es服務,通常爲本機宿主機IP - network.host=0.0.0.0 # 設置綁定的ip地址,能夠是ipv4或ipv6的,默認爲0.0.0.0,即本機 - discovery.seed_hosts=172.168.50.40,172.168.50.240,172.168.50.41 # es7.0以後新增的寫法,寫入候選主節點的設備地址,在開啓服務後,若是master掛了,哪些能夠被投票選爲主節點 - cluster.initial_master_nodes=172.168.50.40,172.168.50.240,172.168.50.41 # es7.0以後新增的配置,初始化一個新的集羣時須要此配置來選舉master - cluster.name=es-cluster # 集羣名稱,相同名稱爲一個集羣, 三個es節點須一致 - bootstrap.memory_lock=true # 內存交換的選項,官網建議爲true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # 設置內存,如內存不足,能夠嘗試調低點 #- discovery.type=single-node # 單es節點模式,這裏咱們就不啓用,啓用的話,除了兩個內存外其餘的環境配置都不須要。 volumes: - esdata:/usr/share/elasticsearch/data # 設置es數據存放的目錄 - /root/elk/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml # 映射es容器中的配置文件到宿主機 hostname: elasticsearch # 服務hostname ulimits: # 棧內存的上限 memlock: soft: -1 # 不限制 hard: -1 # 不限制 restart: always # 重啓策略,失敗時老是重啓 ports: - 9200:9200 # http端口,能夠直接瀏覽器訪問 - 9300:9300 # es集羣之間相互訪問的端口,jar之間就是經過此端口進行tcp協議通訊,遵循tcp協議。 kibana: # 服務名稱:kibana image: docker.elastic.co/kibana/kibana:7.1.1 # 使用的鏡像及版本號 container_name: kibana7.1.1 # 容器名稱 environment: # 環境配置 - elasticsearch.hosts=http://elasticsearch:9200 # 設置鏈接的es節點,此處的elasticsearch即上面的elasticsearch服務中的hostname:elasticsearch # - TZ=Asia/Shanghai # 設置容器內時區 hostname: kibana # 服務hostname depends_on: - elasticsearch # 依賴es服務,必須先啓動es纔會啓動kibana restart: always ports: - 5601:5601 # 訪問的端口號 logstash: # 服務名稱:logstash image: docker.elastic.co/logstash/logstash:7.1.1 container_name: logstash7.1.1 hostname: logstash restart: always # volumes: # 掛載卷,先註釋,等ELK啓動起來以後,從容器中複製出來一份,下面會說 # - /root/elk/logstash/config/:/usr/share/logstash/config/ # logstash 配置文件目錄 # - /root/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ # logstash 的採集與輸入的配置文件目錄 depends_on: - elasticsearch # ports: # - 9600:9600 # 端口號,後來沒用到 # - 5044:5044 # filebeat直接傳遞數據給logstash時默認使用的端口號,測試了一下,沒用 volumes: esdata: driver: local
上面用到了/root/elk/logstash/ 下面的config目錄與pipeline目錄,如何獲得這兩個目錄呢?
// 先中止以前該服務器上的es節點 docker stop es節點ID docker rm es節點ID
一、在/root/elk目錄下,docker-compose up -d 啓動elk
二、mkdir logstash
三、從容器中複製兩個目錄到logstash下,這樣咱們就有了初始配置文件
docker cp 容器ID:/usr/share/logstash/config/ logstash // 容器ID爲Logstash的ID docker cp 容器ID:/usr/share/logstash/pipeline/ logstash
http.host: "0.0.0.0" xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.hosts: [ "172.168.50.41:9200","172.168.50.40:9200","172.168.50.240:9200" ] // elasticsearch.hosts 爲你的es服務器列表以及端口號,用於將數據存儲到es
input{ # 輸入組件 kafka{ # 從kafka消費數據 bootstrap_servers => ["172.168.50.41:9092,172.168.50.41:9097,172.168.50.41:9098"] # kafka節點的IP+端口號 #topics => "%{[@metadata][topic]}" # 使用kafka傳過來的topic topics_pattern => "elk-.*" # 使用正則匹配topic codec => "json" # 數據格式 consumer_threads => 3 # 消費線程數量 decorate_events => true # 可向事件添加Kafka元數據,好比主題、消息大小的選項,這將向logstash事件中添加一個名爲kafka的字段, auto_offset_reset => "latest" # 自動重置偏移量到最新的偏移量 group_id => "logstash-node-41" # 消費組ID,多個有相同group_id的logstash實例爲一個消費組 } } filter { # 當非業務字段時,無traceId則移除 if ([message] =~ "traceId=null") { # 過濾組件,這裏只是展現,無實際意義,根據本身的業務需求進行過濾 drop {} } } output { # 輸出組件 elasticsearch { # Logstash輸出到es hosts => ["172.168.50.41:9200", "172.168.50.40:9200", "172.168.50.240:9200"] # index => "%{[fields][source]}" #直接在日誌中匹配,索引會去掉elk index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}" # 以日期建索引 #flush_size => 20000 #idle_flush_time => 10 #sniffing => true #template_overwrite => false } # stdout { # codec => rubydebug # } } # 能夠寫多個input,可是output中的index即索引要處理好,根據過濾組件或者kafka傳過來的不一樣話題,應有不一樣的索引 # 也能夠寫多個output,向其餘程序(非es)傳送數據
volumes: # 掛載卷,先註釋,等ELK啓動起來以後,從容器中複製出來一份,下面會說 - /root/elk/logstash/config/:/usr/share/logstash/config/ # logstash 配置文件目錄 - /root/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ # logstash 的採集與輸入的配置文件目錄
// 啓動ELK docker-compose up -d --force --build
出現done表示成功,docker-compose logs 查看日誌(分別輸出elk三個服務的日誌)執行docker-compose ps 能夠看到三個服務的運行狀態
在瀏覽器輸入http://IP:5601/ 訪問kibana。
可使用head監控es
// 拉取鏡像 docker pull mobz/elasticsearch-head:5 // 啓動 docker run -d --name es_admin -p 9100:9100 mobz/elasticsearch-head:5
注意,IP地址要使用你es所在服務器的地址,而後點擊鏈接,出現相似的顯示說明,沒問題,本教程是三個節點,當關掉一個es會變成兩個節點。
若是點擊鏈接沒有反應,F12查看network是403,或者200,可是console中提示跨域,那麼應該是跨域未設置好,從新回去檢查一遍,看看容器內配置文件是否加入了那兩行跨域。
至此,咱們的Elasticsearch(集羣)+ Logstash + Kibana 搭建完成, 你能夠在kibana中的dev-tools菜單中操做es,甚至向es中添加數據,或者經過其餘方式向es中添加數據進行測試。
Logstash能夠採集數據, 過濾數據; Elasticsearch 能夠存儲和索引數據; Kibana提供可視化展現和操做
這個時候就能夠配置Logstash採集數據而後想es輸入了,可是咱們並不知足於此,我想在須要採集日誌的應用服務器上安裝 filebeat,而後經filebeat將數據轉發給Logstash,Logstash只進行過濾便可(也能夠filebeat直接將數據傳給es,不要Logstash,看我的需求)。
但我沒有保留filebeat直接傳給Logstash的配置文件,因此咱們直接在此時將kafka也接入進來,再也不演示filebeat與Logstash直接交互的情景。
不用擔憂,咱們儘可能下降複雜度,好比,如今咱們不考慮ELK,也不考慮filebeat,拋開這些東西,僅僅搭建一個kafka集羣。
kafka依賴zookeeper,咱們前面說過,本文使用獨立的zookeeper集羣,因此接下來介紹如何搭建zookeeper
依然是三節點集羣,可是爲了減小複雜性,咱們直接在172.168.50.41(內存最大的服務器)上經過docker容器搭建單機zookeeper集羣。
創建目錄,並建立yml文件
mkdir -p /root/zookeeper cd /root/zookeeper vim docker-compose.yml
version: '2' services: zoo1: image: zookeeper:3.4.14 # 儘可能使用3.4版本,3.4.14的鏡像就能夠。 3.5坑太多 restart: always hostname: zoo1 container_name: zoo1 ports: - 2181:2181 # 端口號 volumes: - /usr/local/docker-compose/zk/zk1/data:/data # 數據文件存放目錄 - /usr/local/docker-compose/zk/zk1/datalog:/datalog # 日誌文件存放目錄 - /root/zookeeper/zoo.cfg:/conf/zoo.cfg # zookeeper的配置文件,下面會給出該文件內容 environment: ZOO_MY_ID: 1 # 表示 ZK服務的 id, 它是1-255 之間的整數, 必須在集羣中惟一 ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181 # ZK集羣的主機列表即端口號 # ZOOKEEPER_CLIENT_PORT: 2181 networks: default: ipv4_address: 172.23.0.11 # 集羣使用的網絡 zoo2: image: zookeeper:3.4.14 restart: always hostname: zoo2 container_name: zoo2 ports: - 2180:2181 volumes: - /usr/local/docker-compose/zk/zk2/data:/data - /usr/local/docker-compose/zk/zk2/datalog:/datalog - /root/zookeeper/zoo.cfg:/conf/zoo.cfg environment: ZOO_MY_ID: 2 #ZOOKEEPER_CLIENT_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181 networks: default: ipv4_address: 172.23.0.12 zoo3: image: zookeeper:3.4.14 restart: always hostname: zoo3 container_name: zoo3 ports: - 2179:2181 volumes: - /usr/local/docker-compose/zk/zk3/data:/data - /usr/local/docker-compose/zk/zk3/datalog:/datalog - /root/zookeeper/zoo.cfg:/conf/zoo.cfg environment: ZOO_MY_ID: 3 #ZOOKEEPER_CLIENT_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181 networks: default: ipv4_address: 172.23.0.13 networks: default: external: name: zookeeper_network # 網絡名稱,此網絡須要進行建立
tickTime=2000 # 心跳2秒,服務器之間或客戶端與服務器之間維持心跳的時間間隔 initLimit=10 # 集羣中的follower服務器(F)與leader服務器(L)之間 初始鏈接 時能容忍的最多心跳數(tickTime的數量)。 syncLimit=5 # 集羣中的follower服務器(F)與leader服務器(L)之間 請求和應答 之間能容忍的最多心跳數(tickTime的數量)。 dataDir=/data # 數據目錄 dataLogDir=/datalog # 日誌文件目錄 clientPort=2181 # 客戶端鏈接端口 autopurge.snapRetainCount=3 # 至少須要保留3個快照數據文件和對應的事務日誌文件 autopurge.purgeInterval=1 # 進行歷史文件自動清理的頻率,單位小時 server.1= zoo1:2888:3888 # 定義個服務通訊端口,以及投票端口 server.2= zoo2:2888:3888 server.3= zoo3:2888:3888
docker network create --driver bridge --subnet 172.23.0.0/25 --gateway 172.23.0.1 zookeeper_network
當前目錄下執行 docker-compose up -d
docker run -it --rm --link zoo1:zk1 --link zoo2:zk2 --link zoo3:zk3 --net zookeeper_network zookeeper:3.4.14 zkCli.sh -server zk1:2181,zk2:2181,zk3:2181 // 執行完上面的命令後,會進去zk的客戶端,咱們能夠進行一些操做,好比 執行 ls / 能夠查看根節點信息,而後根據現實內容,查看下一級數據節點的信息 建議等下搭建完kafka集羣並建立topic後經過 ls /brokers/topics 來查看topic列表(kafka在zookeeper註冊後,建立的話題會被zookeeper記錄下來) zk經常使用命令 create /test "test" get /test set /test "hello test" delete /test quit
mkdir -p /root/kafka cd /root/kafka vim docker-compose.yml
version: '2' services: kafka1: image: wurstmeister/kafka:2.12-2.3.0 restart: always hostname: kafka1 container_name: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.168.50.41:9092 # 宿主機的IP地址而非容器的IP,及暴露出來的端口 KAFKA_ADVERTISED_HOST_NAME: 172.168.50.41 # 外網訪問地址 KAFKA_ADVERTISED_PORT: 9092 # 端口 KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181 # 鏈接的zookeeper服務及端口 JMX_PORT: 9966 # kafka須要監控broker和topic的數據的時候,是須要開啓jmx_port的 volumes: - /usr/local/docker-compose/kafka/kafka1:/kafka # kafka數據文件存儲目錄 - /root/kafka/kafka-run-class.sh:/opt/kafka_2.12-2.3.0/bin/kafka-run-class.sh # kafka啓動時的配置文件,咱們修改了一部分代碼,用來解決java.rmi.server.ExportException: Port already in use這個問題。過程詳見https://github.com/apache/kafka/pull/1983/commits/2c5d40e946bcc149b1a9b2c01eced4ae47a734c5。 external_links: # 鏈接本compose文件之外的容器 - zoo1 - zoo2 - zoo3 networks: default: ipv4_address: 172.23.0.14 # 使用zookeeper建立的網絡中一個IP kafka2: image: wurstmeister/kafka:2.12-2.3.0 restart: always hostname: kafka2 container_name: kafka2 ports: - "9097:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.168.50.41:9097 KAFKA_ADVERTISED_HOST_NAME: 172.168.50.41 KAFKA_ADVERTISED_PORT: 9097 KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181 JMX_PORT: 9988 volumes: - /usr/local/docker-compose/kafka/kafka2:/kafka - /root/kafka/kafka-run-class.sh:/opt/kafka_2.12-2.3.0/bin/kafka-run-class.sh external_links: # 鏈接compose文件之外的container - zoo1 - zoo2 - zoo3 networks: default: ipv4_address: 172.23.0.15 kafka3: image: wurstmeister/kafka:2.12-2.3.0 restart: always hostname: kafka3 container_name: kafka3 ports: - "9098:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.168.50.41:9098 KAFKA_ADVERTISED_HOST_NAME: 172.168.50.41 KAFKA_ADVERTISED_PORT: 9098 KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181 JMX_PORT: 9977 volumes: - /usr/local/docker-compose/kafka/kafka3 - /root/kafka/kafka-run-class.sh:/opt/kafka_2.12-2.3.0/bin/kafka-run-class.sh external_links: # 鏈接compose文件之外的container - zoo1 - zoo2 - zoo3 networks: default: ipv4_address: 172.23.0.16 kafka-manager: # kafka管理和監控的一個工具 image: sheepkiller/kafka-manager:latest # 此時我使用的最新版本爲5.0 restart: always container_name: kafa-manager hostname: kafka-manager ports: - "9099:9000" # 瀏覽器訪問的端口 links: # 鏈接本compose文件建立的container - kafka1 - kafka2 - kafka3 external_links: # 鏈接compose文件之外的container - zoo1 - zoo2 - zoo3 environment: ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 # zk服務列表 KAFKA_BROKERS: kafka1:9092,kafka2:9097,kafka3:9098 # kafka節點列表及端口號 APPLICATION_SECRET: letmein # 應用祕鑰,默認letmein KM_ARGS: -Djava.net.preferIPv4Stack=true # 在ipv4的機器上想獲取到完整的機器名, 應該是IPV4網絡相關的東西 networks: default: ipv4_address: 172.23.0.10 # IP地址,zookeeper建立的網絡中的一個IP networks: default: external: name: zookeeper_network # 使用zookeeper建立的網絡
kafka-run-class.sh 文件內容以下,可能會出現有權限問題, 使用下面我修改後的文件
或者 參考該鏈接 https://github.com/apache/kafka/pull/1983/commits/2c5d40e946bcc149b1a9b2c01eced4ae47a734c5
#!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]" exit 1 fi # CYGWIN == 1 if Cygwin is detected, else 0. if [[ $(uname -a) =~ "CYGWIN" ]]; then CYGWIN=1 else CYGWIN=0 fi if [ -z "$INCLUDE_TEST_JARS" ]; then INCLUDE_TEST_JARS=false fi # Exclude jars not necessary for running commands. regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$" should_include_file() { if [ "$INCLUDE_TEST_JARS" = true ]; then return 0 fi file=$1 if [ -z "$(echo "$file" | egrep "$regex")" ] ; then return 0 else return 1 fi } ################ lizheng ################# # need to check if called to start server or client # in order to correctly decide about JMX_PORT ISKAFKASERVER="false" if [[ "$*" =~ "kafka.Kafka" ]]; then ISKAFKASERVER="true" fi ################ lizheng ################# base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then SCALA_VERSION=2.12.8 fi if [ -z "$SCALA_BINARY_VERSION" ]; then SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.') fi # run ./gradlew copyDependantLibs to get all dependant jars in a local dir shopt -s nullglob for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*; do CLASSPATH="$CLASSPATH:$dir/*" done for file in "$base_dir"/examples/build/libs/kafka-examples*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then clients_lib_dir=$(dirname $0)/../clients/build/libs streams_lib_dir=$(dirname $0)/../streams/build/libs rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} else clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs streams_lib_dir=$clients_lib_dir rocksdb_lib_dir=$streams_lib_dir fi for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done else VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; do if should_include_file "$file"; then CLASSPATH="$file":"$CLASSPATH" fi done fi for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done for file in "$base_dir"/tools/build/libs/kafka-tools*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*; do CLASSPATH="$CLASSPATH:$dir/*" done for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension" do for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*" fi done # classpath addition for release for file in "$base_dir"/libs/*; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done shopt -u nullglob if [ -z "$CLASSPATH" ] ; then echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'" exit 1 fi # JMX settings if [ -z "$KAFKA_JMX_OPTS" ]; then KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " fi # JMX port to use #if [ $JMX_PORT ]; then if [ $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " fi # Log directory to use if [ "x$LOG_DIR" = "x" ]; then LOG_DIR="$base_dir/logs" fi # Log4j settings if [ -z "$KAFKA_LOG4J_OPTS" ]; then # Log to console. This is a tool. LOG4J_DIR="$base_dir/config/tools-log4j.properties" # If Cygwin is detected, LOG4J_DIR is converted to Windows format. (( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}") KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}" else # create logs directory if [ ! -d "$LOG_DIR" ]; then mkdir -p "$LOG_DIR" fi fi # If Cygwin is detected, LOG_DIR is converted to Windows format. (( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}") KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS" # Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="" fi # Set Debug options if enabled if [ "x$KAFKA_DEBUG" != "x" ]; then # Use default ports DEFAULT_JAVA_DEBUG_PORT="5005" if [ -z "$JAVA_DEBUG_PORT" ]; then JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT" fi # Use the defaults if JAVA_DEBUG_OPTS was not set DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT" if [ -z "$JAVA_DEBUG_OPTS" ]; then JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS" fi echo "Enabling Java debug options: $JAVA_DEBUG_OPTS" KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS" fi # Which java to use if [ -z "$JAVA_HOME" ]; then JAVA="java" else JAVA="$JAVA_HOME/bin/java" fi # Memory options if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx256M" fi # JVM performance options if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true" fi while [ $# -gt 0 ]; do COMMAND=$1 case $COMMAND in -name) DAEMON_NAME=$2 CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out shift 2 ;; -loggc) if [ -z "$KAFKA_GC_LOG_OPTS" ]; then GC_LOG_ENABLED="true" fi shift ;; -daemon) DAEMON_MODE="true" shift ;; *) break ;; esac done # GC options GC_FILE_SUFFIX='-gc.log' GC_LOG_FILE_NAME='' if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX # The first segment of the version number, which is '1' for releases before Java 9 # it then becomes '9', '10', ... # Some examples of the first line of `java --version`: # 8 -> java version "1.8.0_152" # 9.0.4 -> java version "9.0.4" # 10 -> java version "10" 2018-03-20 # 10.0.1 -> java version "10.0.1" 2018-04-17 # We need to match to the end of the line to prevent sed from printing the characters that do not match JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p') if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400" else KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M" fi fi # Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank) # Syntax used on the right side is native Bash string manipulation; for more details see # http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal" CLASSPATH=${CLASSPATH#:} # If Cygwin is detected, classpath is converted to Windows format. (( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}") # Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi
docker-compose up -d
一、單機搭建zk集羣須要建立網絡 二、zk的版本要使用3.4的,3.5坑太多,另外須要掛載配置文件,並在配置文件中設置客戶端端口 三、使kafka與zk在同一網絡下, 四、映射kafka-run-class.sh文件,主要修改配置解決JMX_PORT相關的報錯
經過 http://你的IP:9099 訪問kafka-manager,而後點擊 Cluster 按鈕, 在下拉框中選擇 Add Cluster, 添加你的kafka集羣接口,其中kafka的版本不用管他
而後查看該集羣,能看到相關的zookeeper、topic等信息。
而後咱們能夠建立話題並查看
一、 經過docker exec -it 容器ID /bin/bash 進入容器 二、 cd /opt/kafka_2.12-2.3.0/ 三、 bin/kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 3 --partitions 3 --topic test (測試數據記得刪除) 四、 bin/kafka-topics.sh --list --zookeeper zoo1:2181 # 查看話題列表
運行一個消息生產者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 運行一個消息消費者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 此時咱們在生產者中輸入任何字符,都會及時的顯示在消費者的頁面中,而且kafka在zookeeper註冊後,建立的話題會被zookeeper記錄下來。
鏈接zk,並打開客戶端 docker run -it --rm --link zoo1:zk1 --link zoo2:zk2 --link zoo3:zk3 --net zookeeper_network zookeeper:3.4.14 zkCli.sh -server zk1:2181,zk2:2181,zk3:2181 // 執行完上面的命令後,會進去zk的客戶端,咱們能夠進行一些操做,好比 執行 ls / 能夠查看根節點信息,而後根據現實內容,查看下一級數據節點的信息 建議等下搭建完kafka集羣並建立topic後經過 ls /brokers/topics 來查看topic列表(kafka在zookeeper註冊後,建立的話題會被zookeeper記錄下來)
mkdir -p /root/filebeat // 能夠是任意目錄 cd /root/filebeat vim docker-compose.yml
version: '3' services: filebeat-240: image: docker.elastic.co/beats/filebeat:7.1.1 # 鏡像依然選擇與elk的版本一致 container_name: filebeat-240 restart: always volumes: - /home/wwwlogs:/home/wwwlogs # 此處是宿主機的Nginx日誌目錄(修改成你本身的目錄) # - /root/dockerfiles/nginx/logs:/logs/nginx/logs # 宿主機部署了nginx+php的 docker環境,此處爲容器中Nginx映射到宿主機的日誌目錄 - /root/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml # filebeat的配置文件
filebeat.inputs: # inputs爲複數,表名type能夠有多個 - type: log # 輸入類型 access: enabled: true # 啓用這個type配置 paths: - /logs/nginx/logs/access.log # 監控nginx1 的access日誌,建議採集日誌時先採集單一nginx的access日誌。 - /home/wwwlogs/access.log # 監控nginx2 的access日誌(docker產生的日誌) close_rename: true # 重命名後,文件將不會繼續被採集信息 tail_files: true # 配置爲true時,filebeat將重新文件的最後位置開始讀取,若是配合日誌輪循使用,新文件的第一行將被跳過 multiline.pattern: '^\[.+\]' # 這三行表示,若是是多行日誌,合併爲一行 multiline.negate: true multiline.match: "after" fields: # 額外的字段 source: nginx-access-240 # 自定義source字段,用於es建議索引(字段名小寫,我記得大寫好像不行) - type: log access: enabled: true paths: - /logs/nginx/logs/error.log # 監控nginx1 的error日誌 - /home/wwwlogs/error.log # 監控nginx2 的error日誌(docker產生的日誌) fields: source: nginx-error-240 # 多行合併一行配置 multiline.pattern: '^\[.+\]' multiline.negate: true multiline.match: "after" tail_files: true filebeat.config: # 這裏是filebeat的各模塊配置,我理解modules爲inputs的一種新寫法。 modules: path: ${path.config}/modules.d/*.yml # 進入容器查看這裏有不少模塊配置文件,Nginx,redis,apache什麼的 reload.enabled: false # 設置爲true來啓用配置重載 reload.period: 10s # 檢查路徑下的文件更改的期間(多久檢查一次) # 在7.4版本中,自定義es的索引須要把ilm設置爲false, 這裏未驗證,抄來的 setup.ilm.enabled: false output.kafka: # 輸出到kafka enabled: true # 該output配置是否啓用 hosts: ["172.168.50.41:9092", "172.168.50.41:9097", "172.168.50.41:9098"] # kafka節點列表 topic: "elk-%{[fields.source]}" # kafka會建立該topic,而後logstash(能夠過濾修改)會傳給es做爲索引名稱 partition.hash: reachable_only: true # 是否只發往可達分區 compression: gzip # 壓縮 max_message_bytes: 1000000 # Event最大字節數。默認1000000。應小於等於kafka broker message.max.bytes值 required_acks: 1 # kafka ack等級 worker: 3 # kafka output的最大併發數 # version: 0.10.1 # kafka版本 bulk_max_size: 2048 # 單次發往kafka的最大事件數 logging.to_files: true # 輸出全部日誌到file,默認true, 達到日誌文件大小限制時,日誌文件會自動限制替換,詳細配置:https://www.cnblogs.com/qinwengang/p/10982424.html close_older: 30m # 若是一個文件在某個時間段內沒有發生過更新,則關閉監控的文件handle。默認1h force_close_files: false # 這個選項關閉一個文件,當文件名稱的變化。只在window建議爲true #沒有新日誌採集後多長時間關閉文件句柄,默認5分鐘,設置成1分鐘,加快文件句柄關閉; #close_inactive: 1m # ##傳輸了3h後荏沒有傳輸完成的話就強行關閉文件句柄,這個配置項是解決以上案例問題的key point; #close_timeout: 3h # ###這個配置項也應該配置上,默認值是0表示不清理,不清理的意思是採集過的文件描述在registry文件裏永不清理,在運行一段時間後,registry會變大,可能會帶來問題。 #clean_inactive: 72h # ##設置了clean_inactive後就須要設置ignore_older,且要保證ignore_older < clean_inactive #ignore_older: 70h # ## 限制 CPU和內存資源 #max_procs: 1 #queue.mem.events: 256 #queue.mem.flush.min_events: 128
配置完成後,docker-compose up -d 啓動filebeat服務
訪問你的nginx,看看access.log文件是否正常產生日誌
經過kafka-manage查看是否接受到消息 http://172.168.50.41:9099/ (部署kafka-manage服務的機器)
kafka正常收到消息後,經過elasticsearch-head查看是否正常創建索引 http://172.168.50.41:9201/ (部署elasticsearch-head服務的機器)
如kafka未正常收到消息,檢查file的配置,如es未正常創建索引,檢查logstash配置並查看日誌
查看單個容器日誌
docker logs --since 30m 容器ID
須要注意的一點就是容器內時區設置的問題
kibana的使用教程,不打算寫了,我會找一篇優質的文章貼過來。
轉載請註明原文