SpringBoot+kafka+ELK分佈式日誌收集

1、背景

隨着業務複雜度的提高以及微服務的興起,傳統單一項目會被按照業務規則進行垂直拆分,另外爲了防止單點故障咱們也會將重要的服務模塊進行集羣部署,經過負載均衡進行服務的調用。那麼隨着節點的增多,各個服務的日誌也會散落在各個服務器上。這對於咱們進行日誌分析帶來了巨大的挑戰,總不能一臺一臺的登陸去下載日誌吧。那麼咱們須要一種收集日誌的工具將散落在各個服務器節點上的日誌收集起來,進行統一的查詢及管理統計。那麼ELK就能夠作到這一點。html

ELK是ElasticSearch+Logstash+Kibana的簡稱,在這裏我分別對如上幾個組件作個簡單的介紹:java

1.一、ElasticSearch(簡稱ES)

Elasticsearch是一個高度可擴展的開源全文搜索和分析引擎。它容許您快速、實時地存儲、搜索和分析大量數據。它一般用做底層引擎/技術,爲具備複雜搜索特性和需求的應用程序提供動力。咱們能夠藉助如ElasticSearch完成諸如搜索,日誌收集,反向搜索,智能分析等功能。ES設計的目標:git

  • 快速實時搜索

Elasticsearch是一個實時搜索平臺。這意味着,從索引文檔到可搜索文檔,存在輕微的延遲(一般爲一秒)。github

  • 集羣

集羣是一個或多個節點(服務器)的集合,這些節點(服務器)一塊兒保存整個數據,並提供跨全部節點的聯合索引和搜索功能。集羣由一個唯一的名稱來標識,默認狀況下該名稱爲「elasticsearch」。這個名稱很重要,由於節點只能是集羣的一部分,若是節點被設置爲經過其名稱加入集羣的話。確保不要在不一樣的環境中重用相同的集羣名稱,不然可能會致使節點加入錯誤的集羣。例如,您可使用logging-dev、logging-test和logging-prod開發、測試和生產集羣。spring

  • 節點

節點是單個服務器,它是集羣的一部分,它用來存儲數據,並參與集羣的索引和搜索功能。與集羣同樣,節點的名稱默認爲在啓動時分配給節點的隨機唯一標識符(UUID)。若是不須要默認值,能夠定義任何節點名稱。這個名稱對於管理很是重要,由於您想要肯定網絡中的哪些服務器對應於Elasticsearch集羣中的哪些節點。apache

  • 索引

索引是具備相似特徵的文檔的集合。例如,您能夠有一個客戶數據索引、另外一個產品目錄索引和另外一個訂單數據索引。索引由一個名稱標識(必須是小寫的),該名稱用於在對其中的文檔執行索引、搜索、更新和刪除操做時引用索引。在單個集羣中,能夠定義任意數量的索引。json

  • 文檔

文檔是能夠創建索引的基本信息單元。例如,能夠爲單個客戶提供一個文檔,爲單個產品提供一個文檔,爲單個訂單提供另外一個文檔。這個文檔用JSON (JavaScript對象符號)表示。在索引中,能夠存儲任意數量的文檔。請注意,儘管文檔在物理上駐留在索引中,但實際上文檔必須被索引/分配到索引中的類型中。bootstrap

1.二、Logstash

Logstash是一個開源數據收集引擎,具備實時流水線功能。Logstash能夠動態地未來自不一樣數據源的數據統一塊兒來,並將數據規範化後(經過Filter過濾)傳輸到您選擇的目標。
basic_logstash_pipeline瀏覽器

在這裏inputs表明數據的輸入通道,你們能夠簡單理解爲來源。常見的能夠從kafka,FileBeat, DB等獲取日誌數據,這些數據通過fliter過濾後(好比說:日誌過濾,json格式解析等)經過outputs傳輸到指定的位置進行存儲(Elasticsearch,Mogodb,Redis等)springboot

簡單的實例:

cd logstash-6.4.1
    bin/logstash -e 'input { stdin { } } output { stdout {} }'

1.三、Kibana

kibana是用於Elasticsearch檢索數據的開源分析和可視化平臺。咱們可使用Kibana搜索、查看或者與存儲在Elasticsearch索引中的數據交互。同時也能夠輕鬆地執行高級數據分析並在各類圖表、表和映射中可視化數據。基於瀏覽器的Kibana界面使您可以快速建立和共享動態儀表板,實時顯示對Elasticsearch查詢的更改。

1.四、處理方案

_

用戶經過java應用程序的Slf4j寫入日誌,SpringBoot默認使用的是logback。咱們經過實現自定義的Appender將日誌寫入kafka,同時logstash經過input插件操做kafka訂閱其對應的主題。當有日誌輸出後被kafka的客戶端logstash所收集,通過相關過濾操做後將日誌寫入Elasticsearch,此時用戶能夠經過kibana獲取elasticsearch中的日誌信息

2、SpringBoot中的配置

在SpringBoot當中,咱們能夠經過logback-srping.xml來擴展logback的配置。不過咱們在此以前應當先添加logback對kafka的依賴,代碼以下:

compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.2.0-RC1'

添加好依賴以後咱們須要在類路徑下建立logback-spring.xml的配置文件並作以下配置(添加kafka的Appender):

<configuration>
        <!-- springProfile用於指定當前激活的環境,若是spring.profile.active的值是哪一個,就會激活對應節點下的配置 -->
        <springProfile name="default">
            <!-- configuration to be enabled when the "staging" profile is active -->
            <springProperty scope="context" name="module" source="spring.application.name"
            defaultValue="undefinded"/>
            <!-- 該節點會讀取Environment中配置的值,在這裏咱們讀取application.yml中的值 -->
            <springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers"
                            defaultValue="localhost:9092"/>
            <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
                <!-- encoders are assigned the type
                     ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
                <encoder>
                    <pattern>%boldYellow(${module}) | %d | %highlight(%-5level)| %cyan(%logger{15}) - %msg %n</pattern>
                </encoder>
            </appender>
            <!-- kafka的appender配置 -->
            <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
                <encoder>
                    <pattern>${module} | %d | %-5level| %logger{15} - %msg</pattern>
                </encoder>
                <topic>logger-channel</topic>
                <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
                <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
    
                <!-- Optional parameter to use a fixed partition -->
                <!-- <partition>0</partition> -->
    
                <!-- Optional parameter to include log timestamps into the kafka message -->
                <!-- <appendTimestamp>true</appendTimestamp> -->
    
                <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
                <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
                <!-- bootstrap.servers is the only mandatory producerConfig -->
                <producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig>
    
                <!-- 若是kafka不可用則輸出到控制檯 -->
                <appender-ref ref="STDOUT"/>
    
            </appender>
            <!-- 指定項目中的logger -->
            <logger name="org.springframework.test" level="INFO" >
                <appender-ref ref="kafka" />
            </logger>
            <root level="info">
                <appender-ref ref="STDOUT" />
            </root>
        </springProfile>
    </configuration>

在這裏面咱們主要注意如下幾點:

  • 日誌輸出的格式是爲模塊名 | 時間 | 日誌級別 | 類的全名 | 日誌內容
  • SpringProfile節點用於指定當前激活的環境,若是spring.profile.active的值是哪一個,就會激活對應節點下的配置
  • springProperty能夠讀取Environment中的值

3、ELK搭建過程

3.一、檢查環境

ElasticSearch須要jdk8,官方建議咱們使用JDK的版本爲1.8.0_131,原文以下:

Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131

檢查完畢後,咱們能夠分別在官網下載對應的組件

3.二、啓動zookeeper

首先進入啓動zookeeper的根目錄下,將conf目錄下的zoo_sample.cfg文件拷貝一份從新命名爲zoo.cfg

mv zoo_sample.cfg zoo.cfg

配置文件以下:

# The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=../zookeeper-data
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1

緊接着咱們進入bin目錄啓動zookeeper:

./zkServer.sh start

3.三、啓動kafka

在kafka根目錄下運行以下命令啓動kafka:

./bin/kafka-server-start.sh config/server.properties

啓動完畢後咱們須要建立一個logger-channel主題:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel

3.四、配置並啓動logstash

進入logstash跟目錄下的config目錄,咱們將logstash-sample.conf的配置文件拷貝到根目錄下從新命名爲core.conf,而後咱們打開配置文件進行編輯:
```ruby
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  kafka {
    id => "my_plugin_id"
    bootstrap_servers => "localhost:9092"
    topics => ["logger-channel"]
    auto_offset_reset => "latest" 
  }
}
filter {

    grok {
      patterns_dir => ["./patterns"]
        match => { "message" => "%{WORD:module} \| %{LOGBACKTIME:timestamp} \| %{LOGLEVEL:level} \| %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }
    }
    
    
}
output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts =>["localhost:9200"]
  }
}
```

咱們分別配置logstash的input,filter和output(懂ruby的童鞋們確定對語法結構不陌生吧):

  • 在input當中咱們指定日誌來源爲kafka,具體含義能夠參考官網:kafka-input-plugin
  • 在filter中咱們配置grok插件,該插件能夠利用正則分析日誌內容,其中patterns_dir屬性用於指定自定義的分析規則,咱們能夠在該文件下創建文件配置驗證的正則規則。舉例子說明:55.3.244.1 GET /index.html 15824 0.043的 日誌內容通過以下配置解析:
grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }

解析事後會變成:

client: 55.3.244.1
    method: GET
    request: /index.html
    bytes: 15824
    duration: 0.043

這些屬性都會在elasticsearch中存爲對應的屬性字段。更詳細的介紹請參考官網:grok ,固然該插件已經幫咱們定義好了好多種核心規則,咱們能夠在這裏查看全部的規則。

  • 在output當中咱們將過濾事後的日誌內容打印到控制檯並傳輸到elasticsearch中,咱們能夠參考官網上關於該插件的屬性說明:[地址]((https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html)
  • 另外咱們在patterns文件夾中建立好自定義的規則文件logback,內容以下:

    # yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527
      LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})

編輯好配置後咱們運行以下命令啓動logstash:

bin/logstash -f first-pipeline.conf --config.reload.automatic

該命令會實時更新配置文件而不需啓動

3.五、啓動ElasticSearch

啓動ElasticSearch很簡單,咱們能夠運行以下命令:

./bin/elasticsearch

咱們能夠發送get請求來判斷啓動成功:

GET http://localhost:9200

咱們能夠獲得相似於以下的結果:

{
      "name" : "Cp8oag6",
      "cluster_name" : "elasticsearch",
      "cluster_uuid" : "AT69_T_DTp-1qgIJlatQqA",
      "version" : {
        "number" : "6.4.0",
        "build_flavor" : "default",
        "build_type" : "zip",
        "build_hash" : "f27399d",
        "build_date" : "2016-03-30T09:51:41.449Z",
        "build_snapshot" : false,
        "lucene_version" : "7.4.0",
        "minimum_wire_compatibility_version" : "1.2.3",
        "minimum_index_compatibility_version" : "1.2.3"
      },
      "tagline" : "You Know, for Search"
    }

3.5.1 配置IK分詞器(可選)

咱們能夠在github上下載elasticsearch的IK分詞器,地址以下:ik分詞器,而後把它解壓至your-es-root/plugins/ik的目錄下,咱們能夠在{conf}/analysis-ik/config/IKAnalyzer.cfg.xmlor {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml 裏配置自定義分詞器:

<?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
    <properties>
        <comment>IK Analyzer 擴展配置</comment>
        <!--用戶能夠在這裏配置本身的擴展字典 -->
        <entry key="ext_dict">custom/mydict.dic;custom/single_word_low_freq.dic</entry>
         <!--用戶能夠在這裏配置本身的擴展中止詞字典-->
        <entry key="ext_stopwords">custom/ext_stopword.dic</entry>
        <!--用戶能夠在這裏配置遠程擴展字典 -->
        <entry key="remote_ext_dict">location</entry>
        <!--用戶能夠在這裏配置遠程擴展中止詞字典-->
        <entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry>
    </properties>

首先咱們添加索引:

curl -XPUT http://localhost:9200/my_index

咱們能夠把經過put請求來添加索引映射:

PUT my_index 
    {
      "mappings": {
        "doc": { 
          "properties": { 
            "title":    { "type": "text"  }, 
            "name":     { "type": "text"  }, 
            "age":      { "type": "integer" },  
            "created":  {
              "type":   "date", 
              "format": "strict_date_optional_time||epoch_millis"
            }
           "content": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_max_word"
                }
          }
        }
      }
    }

其中doc是映射名 my_index是索引名稱

3.5.2 logstash與ElasticSearch

logstash默認狀況下會在ES中創建logstash-*的索引,*表明了yyyy-MM-dd的時間格式,根據上述logstash配置filter的示例,其會在ES中創建module ,logmessage,class,level等索引。(具體咱們能夠根據grok插件進行配置)

3.6 啓動Kibana

在kibana的bin目錄下運行./kibana便可啓動。啓動以後咱們能夠經過瀏覽器訪問http://localhost:5601 來訪問kibanaUI。咱們能夠看到以下界面:
768E3949_E435_4822_A7FF_4B07BBD6D7DB

相關文章
相關標籤/搜索