ELK簡單安裝測試

1 介紹組件

Filebeat是一個日誌文件託運工具,在你的服務器上安裝客戶端後,filebeat會監控日誌目錄或者指定的日誌文件,追蹤讀取這些文件(追蹤文件的變化,不停的讀)。java

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。node

ElasticSearch它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch 中的 Index 是一組具備類似特徵的文檔集合,相似於關係數據庫模型中的數據庫實例,Index 中能夠指定 Type 區分不一樣的文檔,相似於數據庫實例中的關係表,Document 是存儲的基本單位,都是 JSON 格式,相似於關係表中行級對象。咱們處理後的 JSON 文檔格式的日誌都要在 Elasticsearch 中作索引,相應的 Logstash 有 Elasticsearch output 插件,對於用戶是透明的。linux

Hadoop 生態圈爲大規模數據集的處理提供多種分析功能,但實時搜索一直是 Hadoop 的軟肋。現在,Elasticsearch for Apache Hadoop(ES-Hadoop)彌補了這一缺陷,爲用戶整合了 Hadoop 的大數據分析能力以及 Elasticsearch 的實時搜索能力。web

Logstash 是一種功能強大的信息採集工具,相似於 Hadoop 生態圈裏的 Flume。一般在其配置文件規定 Logstash 如何處理各類類型的事件流,通常包含 input、filter、output 三個部分。Logstash 爲各個部分提供相應的插件,於是有 input、filter、output 三類插件完成各類處理和轉換;另外 codec 類的插件能夠放在 input 和 output 部分經過簡單編碼來簡化處理過程。spring

Kibana是ElasticSearch的用戶界面。shell

在實際應用場景下,爲了知足大數據實時檢索的場景,利用Filebeat去監控日誌文件,將Kafka做爲Filebeat的輸出端,Kafka實時接收到Filebeat後以Logstash做爲輸出端輸出,到Logstash的數據也許還不是咱們想要的格式化或者特定業務的數據,這時能夠經過Logstash的一些過了插件對數據進行過濾最後達到想要的數據格式以ElasticSearch做爲輸出端輸出,數據到ElasticSearch就能夠進行豐富的分佈式檢索了。數據庫

2 下載安裝包

下載 elasticsearch、logstash、kibana、filebeat 的壓縮包,並將四個壓縮包上傳到 /data/elk 目錄下
json


官方文檔:
Filebeat:
https://www.elastic.co/cn/products/beats/filebeat
Logstash:
https://www.elastic.co/cn/products/logstash
Kibana:
https://www.elastic.co/cn/products/kibana
Elasticsearch:
https://www.elastic.co/cn/products/elasticsearch
elasticsearch中文社區:
https://elasticsearch.cn/

3 安裝ELK

3.1 安裝jdk1.8

vim

3.2 安裝ElasticSearch

3.2.1 安裝配置es

說明:ElasticSearch的運行不能用root執行,本身用useradd命令新建一個用戶以下所示:
添加普通用戶elk並設置密碼
useradd elk
passwd elk 而後根據提示輸入密碼便可springboot

修改文件全部者
chown -R elk:elk /data/elk

解壓
tar -zxvf elasticsearch-6.7.0.tar.gz

修改 es 的配置文件
cd /data/elk/elasticsearch-6.7.0/config
vim elasticsearch.yml
修改方法參考以下:

cluster.name: my-application
node.name: node-1
node.attr.rack: r1
path.data: /data/elk/elasticsearch-6.7.0/data
path.logs: /data/elk/elasticsearch-6.7.0/logs
network.host: localhost
http.port: 9200

3.2.2 啓動

su elk
./bin/elasticsearch -d

啓動時間有點慢,耐心等待10-20s或者更長,查看9200,9300端口是否開啓

netstat -tnlp|grep 9[23]00

tcp6       0      0 localhost:9200       :::*                    LISTEN      30155/java          
tcp6       0      0 localhost:9300       :::*                    LISTEN      30155/java

訪問地址:http://localhost:9200


中止服務:

ps -ef |grep elasticsearch 
kill PID

3.3 安裝filebeat

3.3.1 安裝配置

filebeat:部署在具體的業務機器上,經過定時監控的方式獲取增量的日誌,並轉發到logstash、elasticsearch、kafka等等。

解壓
tar –zxvf filebeat-6.7.0-linux-x86_64.tar.gz
cd filebeat-6.7.0-linux-x86_64
修改kibana 的配置文件
vim filebeat.yml

===================Filebeat prospectors ===========

filebeat.prospectors:

#
 Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log

  #
 Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/elk/logfile
#- c:\programdata\elasticsearch\logs\*

#
輸出到logstash
#-------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

3.3.2 啓動

./filebeat -e -c filebeat.yml -d "publish"

3.4 安裝logstash

3.4.1 安裝配置

解壓
tar -zxvf logstash-6.7.0.tar.gz
cd logstash-6.7.0
配置
在logstash文件夾的下bin目錄建立配置文件logstash.conf ,內容以下:
input:對應輸入的配置,其中path是監控的文**件路勁, codec編碼個格式
output:elasticsearch : { hosts => "localhost:9200" }
對應輸出到elasticsearch,hosts是elasticsearch安裝地址

input {
    beats {
        host => "localhost"
        port => 5044 
        codec => json
    }
}

output {

    stdout {
        codec => rubydebug
    }

    elasticsearch {
        hosts => "localhost:9200"
        index="test_index"
    }
}

3.4.2 啓動

測試你的配置文件 是否正確( 解析配置文件並報告任何錯誤。)
./logstash -f logstash.conf --config.test_and_exit

Sending Logstash logs to /data/elk/logstash-6.7.0/logs which is now configured via log4j2.properties
[2019-03-28T17:55:34,114][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/data/elk/logstash-6.7.0/data/queue"}
[2019-03-28T17:55:34,169][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/data/elk/logstash-6.7.0/data/dead_letter_queue"}
[2019-03-28T17:55:34,856][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2019-03-28T17:55:43,327][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

啓動命令(啓用自動配置從新加載,這樣就沒必要每次修改配置文件時都中止並從新啓動Logstash )
./logstash -f logstash.conf --config.reload.automatic

中止服務
ps aux | grep logstash
kill -9 pid

3.5 安裝kibana

3.5.1 安裝配置

解壓
tar –zxvf kibana-6.7.0-linux-x86_64.tar.gz
cd kibana-6.7.0-linux-x86_64/config
配置
vim kibana.yml

server.port: 5601
server.host: "localhost"

#
elasticsearch.username: "elastic"
#elasticsearch.password: "changeme"

elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"

3.5.2 啓動

./bin/kibana

訪問地址:
http://localhost:5601

netstat -anltp|grep 5601查找對應端口
kill -9 pid

4 測試

4.1 監測指定文件,Kibana展現

4.1.1 Logstash監控文件

咱們經過echo往這文件/data/elk/test.log追加內容,來測試整個日誌收集系統是否可行
vim test2.conf

input {
  file {
    type =>"syslog"
     path => ["/data/elk/test.log" ]
  }
  syslog {
    type =>"syslog"
    port =>"5544"
  }
}
output {
  stdout { codec=> rubydebug }
  elasticsearch {hosts => "http:// localhost:9200"}
}

啓動
./logstash -f test2.conf --config.reload.automatic

向監控文件中寫入數據

echo "{ "firstName": "1", "lastName":"McLaughlin", "email": "aaaa" }" >> test.log 

Logstash打印接收到的數據:

{
    "message" => "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
    "@timestamp" => 2019-03-29T02:39:35.229Z,
      "@version" => "1",
          "host" => "node223",
          "path" => "/data/elk/test.log"
}

4.1.2 es 查看接收數據

查看es全部index
curl -X GET 'http://localhost:9200/_cat/indices?v'

health status index                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_task_manager ZYN7zZl3QLuB1d-EE_ysHA   1   0          2            0     12.5kb         12.5kb
yellow open   logstash-2019.03.29  rfkkxW7rQDKnl40_Ekaf7g   5   1          2            0     10.8kb         10.8kb
yellow open   test-index           hFBQF-jZSkqajPRYzegfwg   5   1          0            0      1.2kb          1.2kb
green  open   .kibana_1            sEUUnVMxQ0amHbQGcOedOQ   1   0          4            1     19.8kb         19.8kb

查看單個index數據
curl 'localhost:9200/logstash-2019.03.29/_search?pretty=true'

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-2019.03.29",
        "_type" : "doc",
        "_id" : "W85Rx2kBAxlZ_OvPIlZf",
        "_score" : 1.0,
        "_source" : {
          "message" : "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
          "@timestamp" : "2019-03-29T02:39:35.229Z",
          "@version" : "1",
          "host" : "node223",
          "path" : "/data/elk/test.log"
        }
      }
    ]
  }
}

4.1.3 用ES鏈接到Kibana

在你開始用Kibana以前,你須要告訴Kibana你想探索哪一個Elasticsearch索引。第一次訪問Kibana是,系統會提示你定義一個索引模式以匹配一個或多個索引的名字。

一、訪問Kibana UI。例如,localhost:56011 或者 http://YOURDOMAIN.com:5601
二、指定一個索引模式來匹配一個或多個你的Elasticsearch索引。當你指定了你的索引模式之後,任何匹配到的索引都將被展現出來。
  (畫外音:*匹配0個或多個字符; 指定索引默認是爲了匹配索引,確切的說是匹配索引名字)
三、點擊「Next Step」以選擇你想要用來執行基於時間比較的包含timestamp字段的索引。若是你的索引沒有基於時間的數據,那麼選擇「I don’t want to use the Time Filter」選項。
四、點擊「Create index pattern」按鈕來添加索引模式。第一個索引模式自動配置爲默認的索引默認,之後當你有多個索引模式的時候,你就能夠選擇將哪個設爲默認。(提示:Management > Index Patterns)


如今,Kibana已經鏈接到你的Elasticsearch數據。Kibana展現了一個只讀的字段列表,這些字段是匹配到的這個索引配置的字段。

4.1.4 Kibana展現

http://localhost:5601

4.2 監控Spring Boot服務日誌

要求日誌格式爲json格式

4.2.1 json日誌格式配置

在Spring Boot工程resources目錄下配置logback.xml文件,旨在將log4j日誌按json格式輸出到指定目錄,logback.xml文件內容以下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
   <!--定義日誌文件的存儲地址 勿在 LogBack 的配置中使用相對路徑 
   <property name="LOG_HOME" value="E:/demo/home" />
   -->

   <!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日誌消息,%n是換行符 -->
   <!--<property name="LOG_PATTERN" value="[  %d{yyyy-MM-dd HH:mm:ss.SSS}] [UUID:%X{requestId}] [%level]|[${HOSTNAME}] [%thread]|[%logger{36}] | %msg|%n " />-->
   <property name="TIME_PATTERN" value="yyyy-MM-dd'T'HH:mm:ss.SSS+08:00" />
   <property name="LOG_PATTERN" value='{"event":"log","timestamp":"%d{${TIME_PATTERN}}","level":"%level","serviceName":"${springAppName:-}","pid": "${PID:-}","host":"${HOSTNAME}","class": "%logger{40}","thread":"%thread","message":"%msg"}%n ' />

   <!-- Console 輸出設置 -->
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
      <encoder>
         <pattern>${LOG_PATTERN}</pattern>
         <charset>utf8</charset>
      </encoder>
   </appender>

   <!-- 按照天天生成日誌文件 -->
   <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
      <file>logs/newsinfo-dataservice.log</file>
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <FileNamePattern>logs/newsinfo-dataservice.%d{yyyy-MM-dd}.log</FileNamePattern>
      </rollingPolicy>
      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <pattern>${LOG_PATTERN}</pattern>
      </encoder>
   </appender>

   <!--log4jdbc -->
   <logger name="com.sohu" level="debug" additivity="false">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />    
    </logger>

   <!-- 日誌輸出級別 -->
   <root level="info">
      <appender-ref ref="STDOUT" />
      <appender-ref ref="FILE" />
   </root>
</configuration>

輸出日誌示例以下:

{"event":"log","timestamp":"2019-04-09T19:47:07.908Z","level":"INFO","serviceName":"springAppName_IS_UNDEFINED","pid": "7196","host":"node1","class": "o.s.b.web.servlet.FilterRegistrationBean","thread":"localhost-startStop-1","message":"Mapping filter: 'MyFilter' to urls: [/getUser, /hello]"}

4.2.2 DomeOS啓動Spring Boot服務

啓動服務的同時,配置日誌收集到kafka,而後調用ELK讀取kafka日誌存入ES,進行日誌的全文檢索和實時告警分析。


建立新版本時,如上配置下日誌收集模塊,日誌就自動經過Flume接入Kafka了。

路徑:/code/logs/newsinfo-dataservice.log自動收集 :自動將日誌收集到 KafkaKafka Topic:newsinfo_appdata_service_log
相關文章
相關標籤/搜索