ELK補充之logstash

ELK補充之logstash

經過logtsash收集tomcat和java日誌

server.xml
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="tomcat_access_log" suffix=".log"
               pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>

/etc/init.d/tomcat stop
rm -rf /apps/tomcat/logs/*
/etc/init.d/tomcat start
tail -f tomcat_access_log.2019-08-19.log 
{"clientip":"192.168.10.1","ClientUser":"-","authenticated":"-","AccessTime":"[19/Aug/2019:01:42:03 +0000]","method":"GET /testapp/ HTTP/1.1","status":"304","SendBytes":"-","Query?string":"","partner":"-","AgentVersion":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"}

驗證日誌是否json格式

https://www.json.cn/html

如何獲取日誌行中的IP

[root@tomcat1 ~]# cat 1
#!/usr/bin/env python
#coding:utf-8

data={"clientip":"192.168.10.1","ClientUser":"-","authenticated":"-","AccessTime":"[19/Aug/2019:01:42:03 +0000]","method":"GET /testapp/ HTTP/1.1","status":"304","SendBytes":"-","Query?string":"","partner":"-","AgentVersion":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"}
ip=data["clientip"]
print ip
[root@tomcat1 ~]# python 1
192.168.10.1

在tomcat服務器安裝logstash收集tomcat和系統日誌

須要部署tomcat並安裝配置logstashjava

vim /etc/logstash/conf.d/tomcat-es.conf
input{
  file{
    path => "/apps/tomcat/logs/tomcat_access_log.*.log"
    type => "tomcat-accesslog"
    start_position => "beginning"
    stat_interval => "3"
    codec => "json"
  }
}

output{
  if [type] == "tomcat_access_log" {
    elasticsearch {
      hosts => ["192.168.10.100:9200"]
      index => "192.168.10.230-tomcat-accesslog-%{+YYYY.MM.dd}"
    }
  }
}
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/tomcat-es.conf
systemctl start logstash

[root@tomcat1 conf.d]# systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
   Active: failed (Result: start-limit) since Mon 2019-08-19 10:31:22 CST; 3min 4s ago
  Process: 1292 ExecStart=/usr/share/logstash/bin/logstash --path.settings /etc/logstash (code=exited, status=1/FAILURE)
 Main PID: 1292 (code=exited, status=1/FAILURE)

Aug 19 10:31:22 tomcat1 systemd[1]: Unit logstash.service entered failed state.
Aug 19 10:31:22 tomcat1 systemd[1]: logstash.service failed.
Aug 19 10:31:22 tomcat1 systemd[1]: logstash.service holdoff time over, scheduling restart.
Aug 19 10:31:22 tomcat1 systemd[1]: Stopped logstash.
Aug 19 10:31:22 tomcat1 systemd[1]: start request repeated too quickly for logstash.service
Aug 19 10:31:22 tomcat1 systemd[1]: Failed to start logstash.
Aug 19 10:31:22 tomcat1 systemd[1]: Unit logstash.service entered failed state.
Aug 19 10:31:22 tomcat1 systemd[1]: logstash.service failed.

vim /var/log/messages
could not find java; set JAVA_HOME or ensure java is in PATH

ln /apps/jdk/bin/java /usr/bin/java -sv

[root@tomcat1 conf.d]# systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
   Active: active (running) since Mon 2019-08-19 10:34:30 CST; 3s ago
 Main PID: 1311 (java)
   CGroup: /user.slice/user-0.slice/session-1.scope/system.slice/logstash.service
           └─1311 /bin/java -Xms300m -Xmx300m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:C...

Aug 19 10:34:30 tomcat1 systemd[1]: Started logstash.

建立一個索引

logstash
logstash

logstash
logstash

logstash
logstash

收集java日誌

https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.htmlpython

在elasticsearch服務器部署logstash

cat /etc/logstash/conf.d/java-es.conf
input {
        stdin {
        codec => multiline {
        pattern => "^\[" #當遇到[開頭的行時候將多行進行合併
        negate => true  #true爲匹配成功進行操做,false爲不成功進行操做
        what => "previous"  #與上面的行合併,若是是下面的行合併就是next
        }}
}
filter { #日誌過濾,若是全部的日誌都過濾就寫這裏,若是隻針對某一個過濾就寫在input裏面的日誌輸入裏面
}
output {
        stdout {
        codec => rubydebug
  }
}

測試是否正常啓動

/usr/share/logstash/bin/logstash -f java-es.conf 
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2019-08-19 05:56:58.580 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2019-08-19 05:56:58.604 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.5.4"}
[INFO ] 2019-08-19 05:57:04.238 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2019-08-19 05:57:04.875 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x44c5ad22 run>"}
The stdin plugin is now waiting for input:
[INFO ] 2019-08-19 05:57:05.267 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2019-08-19 05:57:05.689 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9601}

測試標準輸入和標準輸出
輸出
1234
1234[1234]
[233
輸出
{
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "tomcat1",
    "@timestamp" => 2019-08-19T05:58:13.616Z,
       "message" => "1234\n1234[1234]"
}

5678[90]12
[333[444]66
{
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "tomcat1",
    "@timestamp" => 2019-08-19T05:59:08.507Z,
       "message" => "[233\n5678[90]12"
}

[
{
      "@version" => "1",
          "host" => "tomcat1",
    "@timestamp" => 2019-08-19T05:59:42.467Z,
       "message" => "[333[444]66"
}

將輸出改成elasticsearch

cat java-es.conf 
input {
  file {
    path => "/tmp/cluster-e.log"
    type => "javalog"
    start_position => "beginning"
    codec => multiline {
    pattern => "^\["
    negate => true
    what => "previous"
  }}
}

output {
  if [type] == "javalog" {
  elasticsearch {
    hosts =>  ["192.168.10.100:9200"]
    index => "192.168.10.230-javalog-%{+YYYY.MM.dd}"
  }}
}

添加索引

logstash
logstash

logstash
logstash

生成數據

cp cluster-e.log 1
cat 1 >> cluster-e.log

kibana界面查看數據

kibana
kibana

收集TCP/UDP日誌

經過logstash的tcp/udp插件收集日誌,一般用於在向elasticsearch日誌補錄丟失的部分日誌,能夠將丟失的日誌經過一個TCP端口直接寫入到elasticsearch服務器。nginx

logstash配置文件,先進行收集測試

cat tcp-es.conf 
input {
  tcp {
    port => 2333
    type => "tcplog"
    mode => "server"  
  }
}

output {
  stdout {
    codec => "rubydebug"
  }
}

驗證端口啓動成功

/usr/share/logstash/bin/logstash -f  /etc/logstash/conf.d/tcp.conf
ss -tnl|grep 2333

在其餘服務器安裝nc命令:

NetCat簡稱nc,在網絡工具中有「瑞士軍刀」美譽,其功能實用,是一個簡單、可靠的網絡工具,可經過TCP或UDP協議傳輸讀寫數據,另外還具備不少其餘功能。web

yum instll nc –y
echo "nc test tcplog"|nc 192.168.10.230 2333
驗證logstash是否接收到數據
{
          "host" => "192.168.10.230",
       "message" => "nc test tcplog",
          "type" => "tcplog",
    "@timestamp" => 2019-08-19T06:28:56.464Z,
          "port" => 48222,
      "@version" => "1"
}

經過nc命令發送一個文件

nc 192.168.10.230 2333 < /etc/passwd
驗證logstash是否接收到數據
{
          "host" => "192.168.10.230",
       "message" => "ftp:x:14:50:FTP User:/var/ftp:/sbin/nologin",
          "type" => "tcplog",
    "@timestamp" => 2019-08-19T06:30:39.686Z,
          "port" => 48224,
      "@version" => "1"
}

將輸出改成elasticsearch

cat tcp-es.conf 
input {
  tcp {
    port => 2333
    type => "tcplog"
    mode => "server"
  }
}

output {
  elasticsearch {
    hosts => ["192.168.10.100:9200"]
    index =>  "192.168.10.230-tcplog-%{+YYYY.MM.dd}"
  }
}

systemctl  restart logstash

在kibana界面添加索引

導入數據 nc 192.168.10.230 2333 < /etc/passwd
kibanaredis

kibana
kibana

驗證數據

kibana
kibana

經過rsyslog收集haproxy日誌

在centos 6及以前的版本叫作syslog,centos 7開始叫作rsyslog,根據官方的介紹,rsyslog(2013年版本)能夠達到每秒轉發百萬條日誌的級別,官方網址:http://www.rsyslog.com/shell

vim /etc/haproxy/haproxy.cfg
log         127.0.0.1 local2
listen stats
 mode http
 bind 0.0.0.0:8888
 stats enable
 log global
 stats uri     /hastatus
 stats auth    admin:admin
vim /etc/rsyslog.conf 
# Provides UDP syslog reception
$ModLoad imudp
$UDPServerRun 514
 # Provides TCP syslog reception
$ModLoad imtcp
$InputTCPServerRun 514

local2.* @@192.168.10.230:514

從新啓動haproxy和rsyslog服務

systemctl restart haproxy rsyslog
vim rsyslog-es.conf
input{
      syslog {
        type => "rsyslog"
        port => "514"
}}

output{
  elasticsearch {
    hosts => ["192.168.10.100:9200"]
    index =>  "l92.168.10.230-rsyslog-%{+YYYY.MM.dd}"
  }
}

systemctl restart logstashapache

創建索引

kibana
kibana

kibana驗證數據

kibana
kibana

logstash收集日誌並寫入redis

用一臺或多臺服務器按照部署redis服務,專門用於日誌緩存使用,用於web服務器產生大量日誌的場景,例以下面的服務器內存即將被使用完畢,查看是由於redis服務保存了大量的數據沒有被讀取而佔用了大量的內存空間。
日誌文件>>logstash>>redis>>logstash>>elasticsearch>>kibanajson

安裝redis

yum install redis -y

配置redis

grep "^[a-Z]" /etc/redis.conf  
bind 0.0.0.0
port 6379
daemonize yes #後臺啓動
save ""
requirepass password #設置redis訪問密碼
rdbcompression no  #是否壓縮
rdbchecksum no  #是否校驗

驗證redis

[root@Final conf.d]# redis-cli 
127.0.0.1:6379> KEYS *
(error) NOAUTH Authentication required.
127.0.0.1:6379> keys *
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth password
OK
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> keys *
(empty list or set)
127.0.0.1:6379[1]> keys *
1) "redis-nginx-accesslog"

配置logstash將日誌寫入至redis

[root@logstash1 conf.d]# cat redis-es.conf 
input {
  file {
    path => "/usr/local/nginx/logs/access_json.log"
    type => "nginx-accesslog"
    start_position => "beginning"
    stat_interval => "3"
    codec => "json"
  }
}

output {
  if [type] == "nginx-accesslog" {
    redis {
      host => "192.168.10.254"
      port => 6379
      password => password
      key => "redis-nginx-accesslog"
      db => 1
      data_type => list
  }}
}

systemctl restart logstash

配置其餘logstash服務器從redis讀取數據

[root@tomcat1 conf.d]# cat redis-es.conf 
input {
  redis {
    data_type => "list"
    key => "redis-nginx-accesslog"
    host => "192.168.10.254"
    port => "6379"
    db => "1"
    password => "password"
    codec => "json"
  }
}

output {
  if [type] == "nginx-accesslog" {
    elasticsearch {
      hosts => ["192.168.10.100:9200"]
      index => "redis-nginx-accesslog-%{+YYYY.MM.dd}"
}}
}

systemctl restart logstash

驗證redis是否有數據

127.0.0.1:6379[1]> keys *
1) "redis-nginx-accesslog"
127.0.0.1:6379[1]> LLEN redis-nginx-accesslog     若是值大於0說明logstash服務有問題
(integer) 2
127.0.0.1:6379[1]> rpop redis-nginx-accesslog
"{\"status\":\"sstatus\",\"type\":\"nginx-accesslog\",\"host\":\"192.168.10.102\",\"upstreamhost\":\"-\",\"upstreamtime\":\"-\",\"xff\":\"shttp_x_forwarded_for\",\"clientip\":\"192.168.10.254\",\"@timestamp\":\"2019-08-20T01:43:46.000Z\",\"aomain\":\"192.168.10.102\",\"size\":6,\"path\":\"/usr/local/nginx/logs/access_json.log\",\"@version\":\"1\",\"ei\":\"suri\",\"referer\":\"-\",\"tcp_xff\":\"\",\"responsetime\":0.0,\"http_host\":\"192.168.10.102\",\"http_user_agent\":\"http_user_agent\"}"
127.0.0.1:6379[1]> LLEN redis-nginx-accesslog
(integer) 0

systemctl restart logstash

添加日誌索引

redis
redis

redis
redis

驗證日誌

redis
redis

logstash收集日誌並寫入kafka

配置logstash將日誌寫入至kafka

[root@logstash1 conf.d]# cat kafka-es.conf 
input {
  file {
    path => "/usr/local/nginx/logs/access_json.log"
    type => "nginx-accesslog"
    start_position => "beginning"
    stat_interval => "3"
    codec => "json"
  }
}

output {
  if [type] == "nginx-accesslog" {
   kafka {
    bootstrap_servers => "192.168.10.211:9092"
    topic_id => "kafka-nginx-accesslog"
    codec => "json"
}}}

systemctl restart logstash

配置其餘logstash服務器從kafka讀取數據

cat kafka-es.conf 
input {
  kafka {
    bootstrap_servers => "192.168.10.211:9092"
    topics => "kafka-nginx-accesslog"
    codec => "json"
}
}

output {
  if [type] == "nginx-accesslog" {
    elasticsearch {
      hosts => ["192.168.10.100:9200"]
      index => "kafka-nginx-accesslog-%{+YYYY.MM.dd}"
      codec => "json"
}}
# stdout {
#  codec => "rubydebug"
#}
}

systemctl restart logstash

生成日誌

while true;do curl http://192.168.10.102;sleep 1;done

添加日誌索引

kafka
kafka

kafka
kafka

驗證日誌

kafka
kafka
相關文章
相關標籤/搜索