隨着業務不斷完善與發展,日誌的重要性穩步上升。咱們須要從日誌中排查錯誤,以及分析用戶行爲,爲業務發展提供參考意見。所以,須要一套專門的日誌系統幫助咱們收集、分析、處理日誌。html
之前我曾經寫過一個logstash的blog: http://my.oschina.net/abcfy2/blog/372138 ,版本比較低,可是logstash的配置沒變過。此篇blog將比上述blog更詳盡一些,擴展到產品環境搭建完整的日誌系統,可是logstash自己的配置很少作介紹,由於舊Blog已經介紹的比較詳細了。前端
本篇Blog主要介紹咱們目前使用的日誌系統的整體架構和部分配置。Kibana的使用暫時不在本篇Blog的覆蓋範圍以內,之後也許會單獨寫一篇kibana的使用,讀者也能夠參考饒琛琳的《ELK stack權威指南》一書的相關章節。java
本篇Blog的內容也並不是本身獨自完成,關於log4j 1.2部分的配置和使用是開發同事共同探究實現的。nginx
最後要說的一點是,日誌系統的實現並不僅是運維的工做,開發也須要配合,規範日誌格式,規範項目埋點,便於排查問題。最後歸結與一點,要有執行力,要有人推進,不能隨隨便便的打日誌,更不容許產品環境有亂七八糟的println
這種調試方式的日誌輸出。git
ELK Stack指代三個獨立的組件: Elasticsearch,Logstash,Kibana。這三個獨立的組件組合使用,能夠造成一套完整的日誌解決方案。目前這三個產品前後歸於Elastic.co公司旗下,該公司圍繞Elasticsearch這個核心產品逐步打造一整套生態環境,使得ELK Stack這套架構日益成熟,並且周邊也逐步開始完善。github
其中,Logstash的做用是處理日誌,將日誌解析爲JSON
格式進行傳遞。Elasticsearch的做用是數據庫,將最終解析的結果存庫,用於往後查詢與分析使用。Kibana是Elasticsearch的dashboard,用於圖形化展現elasticsearch數據庫的查詢結果。這三個組件搭配使用,將十分靈活,有如下幾個優勢(如下內容節選自饒琛琳的《ELK stack權威指南》一書,感謝做者的努力):web
Kibana的可視化效果:sql
其中,ELK的靈活性得益於Logstash的插件式設計,並且插件之間都是鬆耦合(經過JSON
事件交互,接口統一)。數據流在Logstash會通過三個階段: Input -> Filter -> Output,並且Filter能夠無限制串聯,造成流式處理,甚至能夠乾脆沒有。這三個階段既能夠在單節點上完成,也能夠直接Output到其餘節點上,分佈處理與卸載壓力。整個Logstash的基本流程圖以下:mongodb
Logstash的數據處理過程描述以下:數據庫
Filter
進行處理,如篩選,簡單聚合(如Multiline插件將多行JAVA堆棧異常聚合爲一個事件),編解碼(如將unix時間戳轉爲時間字符串,將k1=v1,k2=v2這種kv格式解析爲{k1:v1, k2:v2}這種JSON格式,正則解析文本日誌等),執行Ruby代碼等等,而且Filter能夠無限制串聯。此過程可直接跳過,即不對事件作任何處理。整個Logstash的ruby DSL配置語法看起來像這樣:
input { 插件名1 { # 插件相關配置屬性 } 插件名2 { # 插件相關配置屬性 } ... SNIP ... 插件名3 { # 插件相關配置屬性 } } filter { 插件名1 { # 插件相關配置屬性 } 插件名2 { # 插件相關配置屬性 } ... SNIP ... 插件名3 { # 插件相關配置屬性 } } output { 插件名1 { # 插件相關配置屬性 } 插件名2 { # 插件相關配置屬性 } ... SNIP ... 插件名3 { # 插件相關配置屬性 } }
舉個例子,好比存儲於日誌文件中的某http access log日誌:
55.3.244.1 GET /index.html 15824 0.043
通過了Logstash的inputs-file
插件,輸入成爲Logstash的一個事件,在Logstash會變成這樣(以rubydebug
格式顯示):
{ "message" => "55.3.244.1 GET /index.html 15824 0.043", "@version" => "1", "@timestamp" => "2016-03-01T03:37:33.081Z", "host" => "fengyu-Vostro-3900" }
日誌自己內容會存放在message
這個field中,除此以外還會加上一些元數據,如host
,@timestamp
等。
加上filters-grok
這個Filter進行正則解析處理,解析message
這個field(詳細配置參考filters-grok的文檔),最終將該事件解析成以下的事件:
{ "message" => "55.3.244.1 GET /index.html 15824 0.043", "@version" => "1", "@timestamp" => "2016-03-01T03:51:03.914Z", "host" => "fengyu-Vostro-3900", "client" => "55.3.244.1", "method" => "GET", "request" => "/index.html", "bytes" => "15824", "duration" => "0.043" }
最後,經過outputs-elasticsearch
這個output插件,將解析過的日誌推送至Elasticsearch數據庫中存儲。
經過elasticsearch中的各類查詢方式,便可按照本身的需求展現這些數據了。
Logstash的這種設計,能夠很容易進行線性擴展,好比不作filter處理,直接output到其餘logstash實例的input端,將處理分散在不一樣的節點上。最極端的狀況,甚至能夠擴展成這個架構,兼顧HA(High Availability)與HP(High Performance):
三個logstash實例互爲冗餘,將解析的結果推送至消息隊列,由另外一個logstash實例將日誌從消息隊列取出,推送至elasticsearch集羣中。
整個日誌數據流的模型圖:
每臺服務器上部署有咱們本身開發的應用程序,以及這些應用程序的第三方依賴服務項(如數據庫,web服務器等)。
所以日誌源主要有兩種: 本身開發的應用程序的日誌,依賴的第三方軟件的日誌。
咱們本身開發的程序,直接將日誌以JSON格式寫入消息隊列中。第三方服務大部分沒法直接將日誌寫入消息隊列中,而是輸出爲日誌文件,這種日誌源經過logstash的filters-grok插件,解析日誌文件後推送至消息隊列中。
須要收集的第三方依賴的日誌,以及收集哪些日誌,詳見文檔末的附錄。
消息隊列使用kafka
+ zookeeper
的方式實現。日誌專用的消息隊列部署在日誌服務器中。
注: 若是日誌量比較小的話,能夠不必這麼複雜,好比省略掉
kafka
這個消息隊列,日誌服務器也無需部署logstash,直接在應用服務器上用logstash將解析過的log推送至日誌服務器上的es數據庫中。
安全問題:
全部服務儘量只對內網ip暴露(經過防火牆實現),減小對外暴露的服務,而且以低權限帳戶運行。跨節點的服務(如mongodb複製集,kafka+zookeeper,postgresql集羣等)鏈接一概採用SSL雙向認證的方案,提升安全性。
詳細配置參考文檔末附錄的內容。
根據上述描述,咱們須要搭建一臺日誌服務器,安裝ELK與日誌專用的消息隊列。
應用程序產生的日誌直接推送至日誌服務器的消息隊列中,通過logstash的處理最終推送至elasticsearch中,在kibana上進行展現。
能夠在logstash的Filter上定義報警規則,當日志有嚴重的錯誤時Output郵件報警。
服務器上應用程序列表如圖所示:
多臺產品服務器上,每臺服務器分別部署有應用程序和logstash,其餘第三方服務按照須要組成集羣(如postgresql集羣,mongodb複製集等)。日誌服務器上部署完整的ELK Stack和Kafka+Zookeeper。
logs
這個TOPIC
中。此架構在擴展上將即爲便利,共有三個可擴展的點:
這些擴展方案都可在無需原程序改動的條件下進行擴展。
推薦使用elastic.co的倉庫(RHEL/CentOS和Ubuntu/Debian倉庫爲官方維護):
推薦使用清華大學鏡像倉庫,個人issue已經被tuna接受,國內安裝速度會快許多(官方倉庫在S3上,因此你懂的)。tuna鏡像地址: http://mirrors.tuna.tsinghua.edu.cn/ELK/
按照官方文檔的步驟,安裝以後根據須要定製配置,啓動服務,啓用開機自啓動便可。
包管理器安裝的logstash,啓動配置存放於/etc/logstash/conf.d/
,這個目錄一開始是空的,本身將logstash的配置文件以.conf
結尾扔到這個目錄後,便可使用service logstash start
啓動服務,日誌存放於/var/log/logstash/
。logstash的配置文件能夠拆分紅多個.conf
文件,以規範配置,好比input.conf
,filter.conf
,output.conf
。
特別注意: logstash能夠在一個目錄下存放多個
.conf
文件,logstash內部會將多個.conf
文件合併爲一個大的配置文件,合併的順序爲文件名順序。因此特別注意你的filter
配置,若是多個配置文件都有filter
配置,特別注意filter的加載次序!不然會搞亂你的配置。若是你的filter只針對某個應用的日誌使用,那麼推薦你使用if [type] == "appname" { filter配置 }
這種方式限制住你的filter的做用範圍。
我用的是salt
,產品環境Ubuntu Server 14.04 LTS,固然你也可使用其餘相似的工具,如puppet
,chef
,ansible
等等。
salt的logstash
這個state的目錄結構以下:
$ tree /srv/salt/logstash/ /srv/salt/logstash/ ├── config │ ├── logagent │ │ ├── 00_log4j.conf │ │ ├── 01_vertx.conf │ │ ├── 02_mongod.conf │ │ ├── 03_postgresql.conf │ │ ├── 04_nginx.conf │ │ └── 99_output.conf │ └── logserver │ └── logserver.conf └── init.sls
$ cat /srv/salt/logstash/init.sls logstash_repo: pkgrepo.managed: - name: deb http://mirrors.tuna.tsinghua.edu.cn/ELK/apt/logstash/2.3/ stable main - file: /etc/apt/sources.list.d/logstash.list - key_url: https://packages.elastic.co/GPG-KEY-elasticsearch - clean_file: True logstash: pkg.latest: - require: - pkgrepo: logstash_repo logstash_grains: grains.list_present: - name: roles - value: logstash logstash-config: file.recurse: - name: /etc/logstash/conf.d {% if 'logserver' in grains.get('roles', '') %} - source: salt://logstash/config/logserver/ {% else %} - source: salt://logstash/config/logagent/ {% endif %} - clean: True - makedirs: True - template: jinja {% if 'postgresql' in grains.get('roles', '') %} logstash-user: group.present: - name: adm - addusers: - "logstash" {% endif %} logstash-service: service.running: - name: logstash - enable: True - watch: - pkg: logstash - file: logstash-config
最終推送到/etc/logstash/conf.d/
目錄下的文件爲00_log4j.conf,01_vertx.conf,02_mongod.conf,03_postgresql.conf,04_nginx.conf,99_output.conf
,這樣命名是爲了按照本身預期的文件順序疊加input
,filter
,output
配置,而不會形成混亂。有關00_log4j.conf
的配置內容參考博客開頭提供的舊的blog,這裏基本沒大改過。
爲了演示這套架構的流程與效果,因此將這套架構最小化,將產品服務器的應用與日誌服務器的應用所有部署在一個節點上測試。
日誌文件數據源以Nginx
的access log爲例,使用logstash將nginx access log中的內容推送至kafka隊列中,另外一個logstash實例從kafka將nginx的log取出存入elasticsearch中。
本身開發的應用程序直接按照上述日誌規範打印日誌進入kafka,由logstash從kafka中取出應用程序的日誌,推送至elasticsearch中。
修改Nginx的配置文件,使之打印出JSON
格式的access log,配置方法見附錄內容。 access log內容以下:
{"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":191,"responsetime":0.000,"http_host":"172.16.250.3","url":"/mirror/","xff":"-","referer":"-","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":200} {"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":0,"responsetime":0.000,"http_host":"172.16.250.3","url":"/favicon.ico","xff":"-","referer":"http://172.16.250.3/mirror/","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":204} ...
模擬產品環境的Logstash的配置文件以下所示:
input { file { path => "/var/log/nginx/access.log" codec => json type => "nginx" tags => "access" } } output { # stdout這個output插件僅做爲調試階段使用,用於將處理過的結果打印在終端 # 真實產品環境不須要這個output stdout { codec => "rubydebug" } kafka { topic_id => "logs" bootstrap_servers => "172.16.250.10:9092" # 真實產品環境須要修改對應的kafka集羣列表 } }
啓動logstash,將會看到終端上顯示解析過的事件:
{ "@timestamp" => "2016-03-03T05:11:03.000Z", "host" => "sinoiot-172-16-250-3", "clientip" => "172.16.1.34", "size" => 191, "responsetime" => 0.0, "http_host" => "172.16.250.3", "url" => "/mirror/", "xff" => "-", "referer" => "-", "agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36", "status" => 200, "@version" => "1", "path" => "/var/log/nginx/access.log", "type" => "nginx", "tags" => [ [0] "access" ] } { "@timestamp" => "2016-03-03T05:11:03.000Z", "host" => "sinoiot-172-16-250-3", "clientip" => "172.16.1.34", "size" => 0, "responsetime" => 0.0, "http_host" => "172.16.250.3", "url" => "/favicon.ico", "xff" => "-", "referer" => "http://172.16.250.3/mirror/", "agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36", "status" => 204, "@version" => "1", "path" => "/var/log/nginx/access.log", "type" => "nginx", "tags" => [ [0] "access" ] }
從kafka隊列中的logs
這個topic獲取日誌信息,將看到下列內容:
$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning {"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/elasticsearch/2.x/debian/dists/stable/main/i18n/Translation-en","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]} {"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/kibana/4.4/debian/dists/stable/main/i18n/Translation-en_US","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]}
證實logstash已經將解析過的事件推送至kafka隊列中。
因爲消息隊列中存儲的日誌都是解析過的,因此日誌服務器上的配置就簡單多了,只須要經過logstash將kafka中的日誌推送至elasticsearch存儲便可。
日誌服務器的logstash配置就簡單的多(真實產品環境下須要配置email filter插件,用於郵件報警)。
模擬日誌服務器的logstash配置:
input { kafka { topic_id => "logs" zk_connect => "172.16.250.10:2181" # 真實產品環境替換爲對應的zookeeper集羣列表 } } output { elasticsearch { codec => json } # 產品環境調試完畢,不須要stdout這個output plugin stdout { codec => "rubydebug" } # 產品環境須要郵件報警的話,加入email output # if 報警條件 { # email { # # email output插件的配置 # } #} }
最後,在kibana中將看到以下的效果:
本身開發的應用程序直接按照JSON
格式推送至Kafka消息隊列中,所以不須要經過logstash output kafka這種方式。log4j 1.2版本須要手工格式化成JSON,log4j 2.x版本提供了JSON appender,不過目前來看log4j 1.x版本依舊佔據主流。輸出到kafka的配置參考附錄。
因爲推送的topic_id
是同樣的,所以日誌服務器中的logstash配置也無需修改。
從kafka隊列中取出log,看看格式:
$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning {"@timestamp":"2016-03-03T17:03:32.772+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}} {"@timestamp":"2016-03-03T17:03:32.773+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}} {"@timestamp":"2016-03-03T17:03:32.813+08:00","host":"172.16.1.4","type":"rtds","loglevel":"ERROR","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"errormsg":" java.math.BigDecimal.divide(Unknown Source)\n org.codehaus.groovy.runtime.typehandling.BigDecimalMath.divideImpl(BigDecimalMath.java:68)\n org.codehaus.groovy.runtime.typehandling.IntegerMath.divideImpl(IntegerMath.java:49)\n org.codehaus.groovy.runtime.dgmimpl.NumberNumberDiv$NumberNumber.invoke(NumberNumberDiv.java:323)\n org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.call(PojoMetaMethodSite.java:56)\n org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:125)\n hawkeyes.rtds.MainVerticle.test(MainVerticle.groovy:69)\n hawkeyes.rtds.MainVerticle.deployInStandaloneMode(MainVerticle.groovy:63)\n sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\n sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n java.lang.reflect.Method.invoke(Unknown Source)\n org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:93)\n groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1210)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1077)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1019)\n groovy.lang.Closure.call(Closure.java:426)\n groovy.lang.Closure.call(Closure.java:420)\n java_util_concurrent_Callable$call.call(Unknown Source)\n org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)\n hawkeyes.rtds.MainVerticle.start(MainVerticle.groovy:30)\n io.vertx.lang.groovy.GroovyVerticle.start(GroovyVerticle.groovy:64)\n io.vertx.lang.groovy.GroovyVerticle$1.start(GroovyVerticle.groovy:93)\n io.vertx.core.impl.DeploymentManager.lambda$doDeploy$159(DeploymentManager.java:429)\n io.vertx.core.impl.ContextImpl.lambda$wrapTask$16(ContextImpl.java:335)\n io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)\n io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)\n io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)\n java.lang.Thread.run(Unknown Source)\n"}} {"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}} {"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}}
最終在kibana中展現的效果如圖:
ELK這套架構的設計因爲其外部組件的鬆耦合性,幾乎能夠知足各類規模日誌收集,組合消息隊列,更是帶來了彈性伸縮的可能性。
這套架構的引入,將對從此日誌的收集管理提供便利,經過日誌提供的數據,也便於業務跟蹤。並且此架構在將來也容易擴展。
此架構涉及到的組件也相對較多,須要有必定的維護量。數據分析時不但須要有規範化的數據結構,也須要熟悉elasticsearch的聚合表達式,須要一些專業知識與學習成本。
修改log4j.properties
文件,配置kafka appender
便可將log內容輸入到kafka消息隊列中。
log4j.logger.hawkeyes.rtds=INFO, Kafka log4j.appender.Kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.Kafka.layout=org.apache.log4j.EnhancedPatternLayout log4j.appender.Kafka.layout.ConversionPattern=%m log4j.appender.Kafka.brokerList=127.0.0.1:9092 log4j.appender.Kafka.topic=logs log4j.appender.Kafka.requiredNumAcks=1
LOG4J主要由三大組件組成:
按照原來配置log4j.rootLogger=DEBUG, Kafka
這使程序中全部日誌都會向Kafka中寫入。但KafkaLog4jAppender在初始化時,自己會打印log,它在獲取logger對象時又會繼續建立KafkaLog4jAppender,新的KafkaLog4jAppender又會打log, 這就成了死循環,所以定義了一個輸出範圍log4j.category.hawkeyes.rtds=INFO, Kafka
,全部hawkeyes.rtds包下的類纔會向kafka消息隊列中輸出,這不會影響KafkaLog4jAppender中log輸出。
爲知足不重啓程序就能修改日誌級別的需求,可使用log4j的動態改變log輸出級別的功能。
改變Logger中level屬性便可。
參考代碼:
def rtdsLogger = Logger.getLogger("hawkeyes.rtds") rtdsLogger.setLevel(Level.toLevel("info"))
而後將這種方法進行封裝,對外提供一個能夠操做的api便可(如REST api)。
編輯/etc/nginx/nginx.conf
配置文件,加入如下內容:
log_format json '{"@timestamp":"$time_iso8601",' '"host":"$hostname",' '"clientip":"$remote_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"http_host":"$host",' '"url":"$uri",' '"xff":"$http_x_forwarded_for",' '"referer":"$http_referer",' '"agent":"$http_user_agent",' '"status":$status}'; access_log /var/log/nginx/access.log json;
刪掉原來默認的配置行access_log /var/log/nginx/access.log
。重啓nginx,以後nginx的access log文件/var/log/nginx/access.log
將以json_lines
的格式打印日誌。
以上配置參考了饒琛琳的《ELK stack權威指南》的相關章節
Zookeeper集羣配置範例:
須要改動的文件有兩個。在zookeeper的配置目錄中
myid
: 這個文件的內容修改成一個正整數,要求每一個節點的數值不一樣zoo.cfg
: 修改server.${id}=${ip}:2888:3888。這個id和myid
中的數字一一對應,後面的ip是節點的ip(注意不要使用環回ip,必須是能被其餘節點訪問到的ip,也能夠是域名)。參考範例:server.1=172.16.250.10:2888:3888 server.2=172.16.250.13:2888:3888 server.3=172.16.250.14:2888:3888
Zookeeper啓用SSL雙向認證: //TODO
Kafka集羣配置範例:
修改config
目錄下的主配置文件server.properties
。關鍵的幾個配置參數以下:
broker.id=1 advertised.host.name=172.16.250.10 zookeeper.connect=172.16.250.10:2181,172.16.250.13:2181,172.16.250.14:2181
broker.id
: 同zookeeper
集羣配置,每一個節點的id均爲不重複的正整數。advertised.host.name
: 同zookeeper
的集羣配置,設置爲能被其餘節點訪問到的ip或域名(該選項默認爲系統主機名,不用hosts或dns基本沒法被其餘節點訪問到)。zookeeper.connect
: 爲zookeeper集羣列表,格式爲ip:port
。多個節點使用,
分割。Kafka啓用SSL雙向認證: //TODO
產品服務器的logstash將日誌從文件取出,格式化後推送至日誌服務器的Kafka中:
input { file { path => "/path/to/log/file" # 日誌文件路徑 type => "app" # 應用名,如nginx,postgresql等 ... SNIP ... # 這裏根據不一樣的文件格式可能須要作不一樣處理 } } filter { # filter這裏主要是grok正則,nginx配置JSON日誌格式後不須要grok解析 grok { ... SNIP ... } } output { kafka { topic_id => "logs" bootstrap_servers => "kafka" # 真實產品環境須要修改對應的kafka集羣列表 ... SINP ... } }
日誌服務器logstash從kafka消息隊列中取出對應的日誌消息,推送至elasticsearch存儲。 日誌報警規則在日誌服務器指定,便於修改報警規則。
input { kafka { zk_connect => "zookeeper_cluster:2181" topic_id => "logs" ... SNIP ... } } filter { # 這裏詳細指定郵件報警規則 if "email_alert" in [tags] { email { ... SNIP ... } } } output { elasticsearch { ... SNIP ... } }