站點上線聯動ELK日誌錄入

ELK架構日誌處理邏輯:html

一、業務層Filebeat安裝時會自動獲取主機運行站點域名及環境信息新增channel及env標籤,並將channel的值做爲kafka的topic信息java

二、Kafka收到Filebeat的新增字段及Topic信息,自動建立Topic信息,以等待logstash消費python

三、Logstash根據腳本自動生成input及output配置spring

這裏的topic必定和filebeat的channel一致。json

示範:bootstrap

filebeat層:vim

- type: log
  processors:
  - add_fields:
      fields:
        env: "prod"         ## ansible調用Python根據網段信息自動判斷生成
        ip: "10.12.11.27"   ## ansible調用Python根據網段信息自動判斷生成
        apptype: "service"  ## ansible調用Python根據域名自動判斷生成
        channel: "cms.prod.tarscorp.com"   ##ansible調用Python根據站點目錄生成
  enabled: true
  paths:
    - /data1/logs/cms.prod.tarscorp.com/*.log

output.kafka:

  codec.json:
    pretty: true
    escape_html: false

  hosts: ["kafka1.mgt.tarscorp.com:9092", "kafka2.mgt.tarscorp.com:9092", "kafka3.mgt.tarscorp.com:9092"]

  topic: 'cms.prod.tarscorp.com'        ## topic和channel取自同一數據
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

Kafka層:忽略,集羣+開啓自動建立Topic便可服務器

logstash層:架構

vim prod-input.conf    ##輸入信息

kafka {
      topics => "cms.prod.tarscorp.com"     ## kafka中的topic信息
      bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092"
      decorate_events => false
      group_id => "logstash-tars"
      # consumer_threads => 5
      client_id => "mgt-elk-logstash1-prod"
      codec => "json"
      add_field => {"topic"=>"cms.prod.tarscorp.com"}   ##這裏主要是爲了方便logstash作判斷
   }

vim prod-javasite.conf    ##輸出信息
if [topic] == "cms.prod.tarscorp.com" {
                elasticsearch {
                        hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"]
                        manage_template => false
                        index => "prod-javasite-%{+YYYY.MM.dd}"
                }
        }

說明:咱們這裏技術棧爲Java spring,因此全部的Java站點都會放在[prod-javasite-%{+YYYY.MM.dd}]的索引下,因爲配置環節衆多,因此在站點上架時,採用分發機器先部署服務,而後ansible部署filebeat,其中ansible會經過Python腳原本獲取服務器網段及站點信息經過templates來補充channel、Topic、apptype、env、ip標籤生成配置,實現自動判斷,減輕運維參與環節負擔。app

使用腳本生成舉例:

./add_info.py --env prod --topic cms.prod.tarscorp.com --module javasite

vim add_info.py

#!/usr/bin/env python3

import os,sys,argparse

parser = argparse.ArgumentParser(description='Logstash configuration file add tools')

parser.add_argument('--env',type=str,required=True,help='環境信息')
parser.add_argument('--topic',type=str,required=True,help='Topic信息')
parser.add_argument('--module',type=str,required=True,help='模塊信息')

args = parser.parse_args()

env_info = args.env
topic_name = args.topic
module_info = args.module
date = "%{+YYYY.MM.dd}"

template_input = '''
   kafka {
      topics => "%s"
      bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092"
      decorate_events => false
      group_id => "logstash-tars"
      # consumer_threads => 5
      client_id => "mgt-elk-logstash1-dev"
      codec => "json"
      add_field => {"topic"=>"%s"}
   }
}
''' %(topic_name,topic_name)

template_output = '''
    if [topic] == "%s" {
        elasticsearch {
            hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"]
            manage_template => false
            index => "%s-%s-%s"
        }
    }
}
''' %(topic_name,env_info,module_info,date)

init_input = '''
input {

'''
init_output = '''
output {

'''

path_home = "/etc/logstash/conf.d/"
input_file = "/etc/logstash/conf.d/%s-input.conf" % (env_info)
output_file = "/etc/logstash/conf.d/%s-%s.conf" % (env_info, module_info)

if os.path.exists(path_home) == False:
    print('請在logstash主機運行該腳本')
    exit(code=255)

if os.path.exists(input_file) == False:
    with open(input_file, mode='w', encoding='utf-8') as f:
        f.write(init_input)

if os.path.exists(output_file) == False:
    with open(output_file, mode='w', encoding='utf-8') as f:
        f.write(init_output)

with open(input_file,mode='rb+') as f:
    f.seek(-2,2)
    f.write(template_input.encode('utf-8'))

with open(output_file,mode='rb+') as f:
    f.seek(-2,2)
    f.write(template_output.encode('utf-8'))
相關文章
相關標籤/搜索