【產品環境】使用ELK搭建日誌系統

隨着業務不斷完善與發展,日誌的重要性穩步上升。咱們須要從日誌中排查錯誤,以及分析用戶行爲,爲業務發展提供參考意見。所以,須要一套專門的日誌系統幫助咱們收集、分析、處理日誌。html

之前我曾經寫過一個logstash的blog: http://my.oschina.net/abcfy2/blog/372138 ,版本比較低,可是logstash的配置沒變過。此篇blog將比上述blog更詳盡一些,擴展到產品環境搭建完整的日誌系統,可是logstash自己的配置很少作介紹,由於舊Blog已經介紹的比較詳細了。前端

本篇Blog主要介紹咱們目前使用的日誌系統的整體架構和部分配置。Kibana的使用暫時不在本篇Blog的覆蓋範圍以內,之後也許會單獨寫一篇kibana的使用,讀者也能夠參考饒琛琳的《ELK stack權威指南》一書的相關章節java

本篇Blog的內容也並不是本身獨自完成,關於log4j 1.2部分的配置和使用是開發同事共同探究實現的。nginx

最後要說的一點是,日誌系統的實現並不僅是運維的工做,開發也須要配合,規範日誌格式,規範項目埋點,便於排查問題。最後歸結與一點,要有執行力,要有人推進,不能隨隨便便的打日誌,更不容許產品環境有亂七八糟的println這種調試方式的日誌輸出。git

關於ELK

ELK Stack指代三個獨立的組件: Elasticsearch,Logstash,Kibana。這三個獨立的組件組合使用,能夠造成一套完整的日誌解決方案。目前這三個產品前後歸於Elastic.co公司旗下,該公司圍繞Elasticsearch這個核心產品逐步打造一整套生態環境,使得ELK Stack這套架構日益成熟,並且周邊也逐步開始完善。github

其中,Logstash的做用是處理日誌,將日誌解析爲JSON格式進行傳遞。Elasticsearch的做用是數據庫,將最終解析的結果存庫,用於往後查詢與分析使用。Kibana是Elasticsearch的dashboard,用於圖形化展現elasticsearch數據庫的查詢結果。這三個組件搭配使用,將十分靈活,有如下幾個優勢(如下內容節選自饒琛琳的《ELK stack權威指南》一書,感謝做者的努力):web

  • 處理方式靈活。Elasticsearch是實時全文索引,不須要像Storm那樣預先編程才能使用。
  • 配置簡易上手。Elasticsearch所有采用JSON接口,Logstash是Ruby DSL設計,都是目前業界最通用的配置語法設計。
  • 檢索性能高效。雖然每次查詢都是實時計算,可是優秀的設計和實現基本能夠達到百億級數據查詢的秒級響應。
  • 集羣線性擴展。無論是Elasticsearch集羣仍是Logstash集羣都是能夠線性擴展的。
  • 前端操做炫麗。Kibana界面上,只需點擊鼠標,就能夠完成搜索、聚合功能,生成炫麗的儀表盤。

Kibana的可視化效果:sql

輸入圖片說明

Logstash的處理流程

其中,ELK的靈活性得益於Logstash的插件式設計,並且插件之間都是鬆耦合(經過JSON事件交互,接口統一)。數據流在Logstash會通過三個階段: Input -> Filter -> Output,並且Filter能夠無限制串聯,造成流式處理,甚至能夠乾脆沒有。這三個階段既能夠在單節點上完成,也能夠直接Output到其餘節點上,分佈處理與卸載壓力。整個Logstash的基本流程圖以下:mongodb

Logstash的數據處理過程描述以下:數據庫

  1. 進入Logstash的數據流,會被解析成一條一條的JSON記錄。每一條JSON在Logstash中稱爲一個事件(event)。
  2. Logstash對每條事件記錄可使用Filter進行處理,如篩選,簡單聚合(如Multiline插件將多行JAVA堆棧異常聚合爲一個事件),編解碼(如將unix時間戳轉爲時間字符串,將k1=v1,k2=v2這種kv格式解析爲{k1:v1, k2:v2}這種JSON格式,正則解析文本日誌等),執行Ruby代碼等等,而且Filter能夠無限制串聯。此過程可直接跳過,即不對事件作任何處理。
  3. 經過Output插件,將解析處理過的數據輸出到指定目標,如RDB,TCP/UDP端口,Elasticsearch,消息隊列,文件等等,只要有對應插件的支持,就能夠輸出到對應的目標中。

整個Logstash的ruby DSL配置語法看起來像這樣:

input {
    插件名1 {
        # 插件相關配置屬性
    }

    插件名2 {
        # 插件相關配置屬性
    }
   ... SNIP ...

    插件名3 {
        # 插件相關配置屬性
    }
}

filter {
    插件名1 {
        # 插件相關配置屬性
    }

    插件名2 {
        # 插件相關配置屬性
    }
   ... SNIP ...

    插件名3 {
        # 插件相關配置屬性
    }
}

output {
    插件名1 {
        # 插件相關配置屬性
    }

    插件名2 {
        # 插件相關配置屬性
    }
   ... SNIP ...

    插件名3 {
        # 插件相關配置屬性
    }
}

舉個例子,好比存儲於日誌文件中的某http access log日誌:

55.3.244.1 GET /index.html 15824 0.043

通過了Logstash的inputs-file插件,輸入成爲Logstash的一個事件,在Logstash會變成這樣(以rubydebug格式顯示):

{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
      "@version" => "1",
    "@timestamp" => "2016-03-01T03:37:33.081Z",
          "host" => "fengyu-Vostro-3900"
}

日誌自己內容會存放在message這個field中,除此以外還會加上一些元數據,如host@timestamp等。

加上filters-grok這個Filter進行正則解析處理,解析message這個field(詳細配置參考filters-grok的文檔),最終將該事件解析成以下的事件:

{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
      "@version" => "1",
    "@timestamp" => "2016-03-01T03:51:03.914Z",
          "host" => "fengyu-Vostro-3900",
        "client" => "55.3.244.1",
        "method" => "GET",
       "request" => "/index.html",
         "bytes" => "15824",
      "duration" => "0.043"
}

最後,經過outputs-elasticsearch這個output插件,將解析過的日誌推送至Elasticsearch數據庫中存儲。

經過elasticsearch中的各類查詢方式,便可按照本身的需求展現這些數據了。

Logstash的這種設計,能夠很容易進行線性擴展,好比不作filter處理,直接output到其餘logstash實例的input端,將處理分散在不一樣的節點上。最極端的狀況,甚至能夠擴展成這個架構,兼顧HA(High Availability)與HP(High Performance):

三個logstash實例互爲冗餘,將解析的結果推送至消息隊列,由另外一個logstash實例將日誌從消息隊列取出,推送至elasticsearch集羣中。

架構設計

整個日誌數據流的模型圖:

輸入圖片說明

每臺服務器上部署有咱們本身開發的應用程序,以及這些應用程序的第三方依賴服務項(如數據庫,web服務器等)。

所以日誌源主要有兩種: 本身開發的應用程序的日誌,依賴的第三方軟件的日誌。

咱們本身開發的程序,直接將日誌以JSON格式寫入消息隊列中。第三方服務大部分沒法直接將日誌寫入消息隊列中,而是輸出爲日誌文件,這種日誌源經過logstash的filters-grok插件,解析日誌文件後推送至消息隊列中。

須要收集的第三方依賴的日誌,以及收集哪些日誌,詳見文檔末的附錄。

消息隊列使用kafka + zookeeper的方式實現。日誌專用的消息隊列部署在日誌服務器中。

注: 若是日誌量比較小的話,能夠不必這麼複雜,好比省略掉kafka這個消息隊列,日誌服務器也無需部署logstash,直接在應用服務器上用logstash將解析過的log推送至日誌服務器上的es數據庫中。

安全問題:

全部服務儘量只對內網ip暴露(經過防火牆實現),減小對外暴露的服務,而且以低權限帳戶運行。跨節點的服務(如mongodb複製集,kafka+zookeeper,postgresql集羣等)鏈接一概採用SSL雙向認證的方案,提升安全性。

詳細配置參考文檔末附錄的內容。

解決方案

根據上述描述,咱們須要搭建一臺日誌服務器,安裝ELK與日誌專用的消息隊列。

應用程序產生的日誌直接推送至日誌服務器的消息隊列中,通過logstash的處理最終推送至elasticsearch中,在kibana上進行展現。

能夠在logstash的Filter上定義報警規則,當日志有嚴重的錯誤時Output郵件報警。

部署方案

服務器上應用程序列表如圖所示:

輸入圖片說明

多臺產品服務器上,每臺服務器分別部署有應用程序和logstash,其餘第三方服務按照須要組成集羣(如postgresql集羣,mongodb複製集等)。日誌服務器上部署完整的ELK Stack和Kafka+Zookeeper。

  • 日誌信息由應用程序生成時,直接寫入日誌服務器的kafka隊列中。相關規範與配置參考文檔末的附錄內容。
  • 由第三方依賴程序產生的log,一般以文件形式存儲在產品服務器上。經過產品服務器的logstash解析日誌文件後,推送至日誌服務器的kafka。須要收集的日誌列表參考文檔末的附錄。
  • 日誌統一輸出到logs這個TOPIC中。
  • 日誌服務器的logstash負責從日誌服務器的kafka隊列中取出日誌信息,推送至elasticsearch儲存,同時作報警規則,遇到須要報警的日誌經過郵件方式報警。kibana做爲elasticsearch的dashboard使用,對es數據庫存儲的內容進行可視化展現。

擴展問題

此架構在擴展上將即爲便利,共有三個可擴展的點:

  1. 經過消息隊列將外部依賴解耦,使得橫向擴展很容易,若是日誌解析負載較高,能夠利用消息隊列,在別的節點上進行解析後推送至es數據庫。甚至藉助於Hadoop,Spark這樣的大數據處理引擎去解析。
  2. 若是存儲容量成爲一個問題,能夠選擇hdfs,ceph這種分佈式存儲解決方案,分散數據存儲容量,也可使用增長elasticsearch分片節點解決這個問題。
  3. 若是elasticsearch存儲效率成爲了瓶頸,能夠選擇增長elasticsearch分片集羣節點解決這個問題。

這些擴展方案都可在無需原程序改動的條件下進行擴展。

部署步驟

單節點部署

推薦使用elastic.co的倉庫(RHEL/CentOS和Ubuntu/Debian倉庫爲官方維護):

推薦使用清華大學鏡像倉庫,個人issue已經被tuna接受,國內安裝速度會快許多(官方倉庫在S3上,因此你懂的)。tuna鏡像地址: http://mirrors.tuna.tsinghua.edu.cn/ELK/

按照官方文檔的步驟,安裝以後根據須要定製配置,啓動服務,啓用開機自啓動便可。

包管理器安裝的logstash,啓動配置存放於/etc/logstash/conf.d/,這個目錄一開始是空的,本身將logstash的配置文件以.conf結尾扔到這個目錄後,便可使用service logstash start啓動服務,日誌存放於/var/log/logstash/。logstash的配置文件能夠拆分紅多個.conf文件,以規範配置,好比input.conf,filter.conf,output.conf

特別注意: logstash能夠在一個目錄下存放多個.conf文件,logstash內部會將多個.conf文件合併爲一個大的配置文件,合併的順序爲文件名順序。因此特別注意你的filter配置,若是多個配置文件都有filter配置,特別注意filter的加載次序!不然會搞亂你的配置。若是你的filter只針對某個應用的日誌使用,那麼推薦你使用if [type] == "appname" { filter配置 }這種方式限制住你的filter的做用範圍。

批量部署

我用的是salt,產品環境Ubuntu Server 14.04 LTS,固然你也可使用其餘相似的工具,如puppetchefansible等等。

salt的logstash這個state的目錄結構以下:

$ tree /srv/salt/logstash/
/srv/salt/logstash/
├── config
│   ├── logagent
│   │   ├── 00_log4j.conf
│   │   ├── 01_vertx.conf
│   │   ├── 02_mongod.conf
│   │   ├── 03_postgresql.conf
│   │   ├── 04_nginx.conf
│   │   └── 99_output.conf
│   └── logserver
│       └── logserver.conf
└── init.sls
$ cat /srv/salt/logstash/init.sls 
logstash_repo:
  pkgrepo.managed:
    - name: deb http://mirrors.tuna.tsinghua.edu.cn/ELK/apt/logstash/2.3/ stable main
    - file: /etc/apt/sources.list.d/logstash.list
    - key_url: https://packages.elastic.co/GPG-KEY-elasticsearch
    - clean_file: True

logstash:
  pkg.latest:
    - require:
      - pkgrepo: logstash_repo

logstash_grains:
  grains.list_present:
    - name: roles
    - value: logstash

logstash-config:
  file.recurse:
    - name: /etc/logstash/conf.d
  {% if 'logserver' in grains.get('roles', '') %}
    - source: salt://logstash/config/logserver/
  {% else %}
    - source: salt://logstash/config/logagent/
  {% endif %}
    - clean: True
    - makedirs: True
    - template: jinja

{% if 'postgresql' in grains.get('roles', '') %}
logstash-user:
  group.present:
    - name: adm
    - addusers: 
      - "logstash"
{% endif %}

logstash-service:
  service.running:
    - name: logstash
    - enable: True
    - watch:
      - pkg: logstash
      - file: logstash-config

最終推送到/etc/logstash/conf.d/目錄下的文件爲00_log4j.conf,01_vertx.conf,02_mongod.conf,03_postgresql.conf,04_nginx.conf,99_output.conf,這樣命名是爲了按照本身預期的文件順序疊加input,filter,output配置,而不會形成混亂。有關00_log4j.conf的配置內容參考博客開頭提供的舊的blog,這裏基本沒大改過。

測試用例

爲了演示這套架構的流程與效果,因此將這套架構最小化,將產品服務器的應用與日誌服務器的應用所有部署在一個節點上測試。

日誌文件數據源以Nginx的access log爲例,使用logstash將nginx access log中的內容推送至kafka隊列中,另外一個logstash實例從kafka將nginx的log取出存入elasticsearch中。

本身開發的應用程序直接按照上述日誌規範打印日誌進入kafka,由logstash從kafka中取出應用程序的日誌,推送至elasticsearch中。

日誌文件用例

修改Nginx的配置文件,使之打印出JSON格式的access log,配置方法見附錄內容。 access log內容以下:

{"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":191,"responsetime":0.000,"http_host":"172.16.250.3","url":"/mirror/","xff":"-","referer":"-","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":200}
{"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":0,"responsetime":0.000,"http_host":"172.16.250.3","url":"/favicon.ico","xff":"-","referer":"http://172.16.250.3/mirror/","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":204}
...

模擬產品環境的Logstash的配置文件以下所示:

input {
    file {
        path => "/var/log/nginx/access.log"
        codec => json
        type => "nginx"
        tags => "access"
    }
}

output {
    # stdout這個output插件僅做爲調試階段使用,用於將處理過的結果打印在終端
    # 真實產品環境不須要這個output
    stdout {
        codec => "rubydebug"
    }

    kafka {
        topic_id => "logs"
        bootstrap_servers => "172.16.250.10:9092"  # 真實產品環境須要修改對應的kafka集羣列表
    }
}

啓動logstash,將會看到終端上顯示解析過的事件:

{
      "@timestamp" => "2016-03-03T05:11:03.000Z",
            "host" => "sinoiot-172-16-250-3",
        "clientip" => "172.16.1.34",
            "size" => 191,
    "responsetime" => 0.0,
       "http_host" => "172.16.250.3",
             "url" => "/mirror/",
             "xff" => "-",
         "referer" => "-",
           "agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36",
          "status" => 200,
        "@version" => "1",
            "path" => "/var/log/nginx/access.log",
            "type" => "nginx",
            "tags" => [
        [0] "access"
    ]
}
{
      "@timestamp" => "2016-03-03T05:11:03.000Z",
            "host" => "sinoiot-172-16-250-3",
        "clientip" => "172.16.1.34",
            "size" => 0,
    "responsetime" => 0.0,
       "http_host" => "172.16.250.3",
             "url" => "/favicon.ico",
             "xff" => "-",
         "referer" => "http://172.16.250.3/mirror/",
           "agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36",
          "status" => 204,
        "@version" => "1",
            "path" => "/var/log/nginx/access.log",
            "type" => "nginx",
            "tags" => [
        [0] "access"
    ]
}

從kafka隊列中的logs這個topic獲取日誌信息,將看到下列內容:

$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning
{"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/elasticsearch/2.x/debian/dists/stable/main/i18n/Translation-en","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]}
{"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/kibana/4.4/debian/dists/stable/main/i18n/Translation-en_US","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]}

證實logstash已經將解析過的事件推送至kafka隊列中。

因爲消息隊列中存儲的日誌都是解析過的,因此日誌服務器上的配置就簡單多了,只須要經過logstash將kafka中的日誌推送至elasticsearch存儲便可。

日誌服務器的logstash配置就簡單的多(真實產品環境下須要配置email filter插件,用於郵件報警)。

模擬日誌服務器的logstash配置:

input {
    kafka {
        topic_id => "logs"
        zk_connect => "172.16.250.10:2181"  # 真實產品環境替換爲對應的zookeeper集羣列表
    }
}

output {
    elasticsearch {
        codec => json
    }

    # 產品環境調試完畢,不須要stdout這個output plugin
    stdout {
        codec => "rubydebug"
    }

    # 產品環境須要郵件報警的話,加入email output
    # if 報警條件 {
    #    email {
    #        # email output插件的配置
    #    }
    #}
}

最後,在kibana中將看到以下的效果: 輸入圖片說明

應用程序日誌

本身開發的應用程序直接按照JSON格式推送至Kafka消息隊列中,所以不須要經過logstash output kafka這種方式。log4j 1.2版本須要手工格式化成JSON,log4j 2.x版本提供了JSON appender,不過目前來看log4j 1.x版本依舊佔據主流。輸出到kafka的配置參考附錄。

因爲推送的topic_id是同樣的,所以日誌服務器中的logstash配置也無需修改。

從kafka隊列中取出log,看看格式:

$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning
{"@timestamp":"2016-03-03T17:03:32.772+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}}
{"@timestamp":"2016-03-03T17:03:32.773+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}}
{"@timestamp":"2016-03-03T17:03:32.813+08:00","host":"172.16.1.4","type":"rtds","loglevel":"ERROR","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"errormsg":"   java.math.BigDecimal.divide(Unknown Source)\n   org.codehaus.groovy.runtime.typehandling.BigDecimalMath.divideImpl(BigDecimalMath.java:68)\n   org.codehaus.groovy.runtime.typehandling.IntegerMath.divideImpl(IntegerMath.java:49)\n   org.codehaus.groovy.runtime.dgmimpl.NumberNumberDiv$NumberNumber.invoke(NumberNumberDiv.java:323)\n   org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.call(PojoMetaMethodSite.java:56)\n   org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n   org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n   org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:125)\n   hawkeyes.rtds.MainVerticle.test(MainVerticle.groovy:69)\n   hawkeyes.rtds.MainVerticle.deployInStandaloneMode(MainVerticle.groovy:63)\n   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n   sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\n   sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n   java.lang.reflect.Method.invoke(Unknown Source)\n   org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:93)\n   groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)\n   groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1210)\n   groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1077)\n   groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1019)\n   groovy.lang.Closure.call(Closure.java:426)\n   groovy.lang.Closure.call(Closure.java:420)\n   java_util_concurrent_Callable$call.call(Unknown Source)\n   org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n   org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n   org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)\n   hawkeyes.rtds.MainVerticle.start(MainVerticle.groovy:30)\n   io.vertx.lang.groovy.GroovyVerticle.start(GroovyVerticle.groovy:64)\n   io.vertx.lang.groovy.GroovyVerticle$1.start(GroovyVerticle.groovy:93)\n   io.vertx.core.impl.DeploymentManager.lambda$doDeploy$159(DeploymentManager.java:429)\n   io.vertx.core.impl.ContextImpl.lambda$wrapTask$16(ContextImpl.java:335)\n   io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)\n   io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)\n   io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)\n   java.lang.Thread.run(Unknown Source)\n"}}
{"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}}
{"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}}

最終在kibana中展現的效果如圖: 輸入圖片說明

總結

ELK這套架構的設計因爲其外部組件的鬆耦合性,幾乎能夠知足各類規模日誌收集,組合消息隊列,更是帶來了彈性伸縮的可能性。

這套架構的引入,將對從此日誌的收集管理提供便利,經過日誌提供的數據,也便於業務跟蹤。並且此架構在將來也容易擴展。

此架構涉及到的組件也相對較多,須要有必定的維護量。數據分析時不但須要有規範化的數據結構,也須要熟悉elasticsearch的聚合表達式,須要一些專業知識與學習成本。

附錄

附錄一 應用程序log輸出到Kafka的方法

修改log4j.properties文件,配置kafka appender便可將log內容輸入到kafka消息隊列中。

log4j.logger.hawkeyes.rtds=INFO, Kafka
log4j.appender.Kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.Kafka.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.Kafka.layout.ConversionPattern=%m
log4j.appender.Kafka.brokerList=127.0.0.1:9092
log4j.appender.Kafka.topic=logs
log4j.appender.Kafka.requiredNumAcks=1

LOG4J主要由三大組件組成:

  • Logger: 決定什麼日誌信息應該被輸出、什麼日誌信息應該被忽略;
  • Appender: 指定日誌信息應該輸出到什麼地方, 這些地方能夠是控制檯、文件、網絡設備;
  • Layout: 指定日誌信息的輸出格式;

按照原來配置log4j.rootLogger=DEBUG, Kafka這使程序中全部日誌都會向Kafka中寫入。但KafkaLog4jAppender在初始化時,自己會打印log,它在獲取logger對象時又會繼續建立KafkaLog4jAppender,新的KafkaLog4jAppender又會打log, 這就成了死循環,所以定義了一個輸出範圍log4j.category.hawkeyes.rtds=INFO, Kafka,全部hawkeyes.rtds包下的類纔會向kafka消息隊列中輸出,這不會影響KafkaLog4jAppender中log輸出。

附錄二 應用程序動態調整日誌級別的實現方法

爲知足不重啓程序就能修改日誌級別的需求,可使用log4j的動態改變log輸出級別的功能。

動態修改loglevel原理

改變Logger中level屬性便可。

參考代碼:

def rtdsLogger = Logger.getLogger("hawkeyes.rtds")
rtdsLogger.setLevel(Level.toLevel("info"))

而後將這種方法進行封裝,對外提供一個能夠操做的api便可(如REST api)。

附錄三 部分相關服務的配置參考

Nginx輸出JSON格式的log配置方法

編輯/etc/nginx/nginx.conf配置文件,加入如下內容:

log_format json '{"@timestamp":"$time_iso8601",'
        '"host":"$hostname",'
        '"clientip":"$remote_addr",'
        '"size":$body_bytes_sent,'
        '"responsetime":$request_time,'
        '"http_host":"$host",'
        '"url":"$uri",'
        '"xff":"$http_x_forwarded_for",'
        '"referer":"$http_referer",'
        '"agent":"$http_user_agent",'
        '"status":$status}';

access_log /var/log/nginx/access.log json;

刪掉原來默認的配置行access_log /var/log/nginx/access.log。重啓nginx,以後nginx的access log文件/var/log/nginx/access.log將以json_lines的格式打印日誌。

以上配置參考了饒琛琳的《ELK stack權威指南》的相關章節

Zookeeper相關配置參考

Zookeeper集羣配置範例:

須要改動的文件有兩個。在zookeeper的配置目錄中

  • myid: 這個文件的內容修改成一個正整數,要求每一個節點的數值不一樣
  • zoo.cfg: 修改server.${id}=${ip}:2888:3888。這個id和myid中的數字一一對應,後面的ip是節點的ip(注意不要使用環回ip,必須是能被其餘節點訪問到的ip,也能夠是域名)。參考範例:
server.1=172.16.250.10:2888:3888
server.2=172.16.250.13:2888:3888
server.3=172.16.250.14:2888:3888

Zookeeper啓用SSL雙向認證: //TODO

Kafka相關配置參考

Kafka集羣配置範例:

修改config目錄下的主配置文件server.properties。關鍵的幾個配置參數以下:

broker.id=1
advertised.host.name=172.16.250.10
zookeeper.connect=172.16.250.10:2181,172.16.250.13:2181,172.16.250.14:2181
  • broker.id: 同zookeeper集羣配置,每一個節點的id均爲不重複的正整數。
  • advertised.host.name: 同zookeeper的集羣配置,設置爲能被其餘節點訪問到的ip或域名(該選項默認爲系統主機名,不用hosts或dns基本沒法被其餘節點訪問到)。
  • zookeeper.connect: 爲zookeeper集羣列表,格式爲ip:port。多個節點使用,分割。

Kafka啓用SSL雙向認證: //TODO

Logstash配置參考

產品服務器的logstash將日誌從文件取出,格式化後推送至日誌服務器的Kafka中:

input {
    file {
        path => "/path/to/log/file"  # 日誌文件路徑
        type => "app"                # 應用名,如nginx,postgresql等
        ... SNIP ...                 # 這裏根據不一樣的文件格式可能須要作不一樣處理
    }
}

filter {
    # filter這裏主要是grok正則,nginx配置JSON日誌格式後不須要grok解析
    grok {
        ... SNIP ...
    }
}

output {
    kafka {
        topic_id => "logs"
        bootstrap_servers => "kafka"  # 真實產品環境須要修改對應的kafka集羣列表
        ... SINP ...
    }
}

日誌服務器logstash從kafka消息隊列中取出對應的日誌消息,推送至elasticsearch存儲。 日誌報警規則在日誌服務器指定,便於修改報警規則。

input {
    kafka {
        zk_connect => "zookeeper_cluster:2181"
        topic_id => "logs"
       ... SNIP ...
    }
}

filter {
    # 這裏詳細指定郵件報警規則
    if "email_alert" in [tags] {
        email {
            ... SNIP ...
        }
    }
}

output {
    elasticsearch {
       ... SNIP ...
    }
}

參考文獻

相關文章
相關標籤/搜索