ELK架構日誌處理邏輯:html
一、業務層Filebeat安裝時會自動獲取主機運行站點域名及環境信息新增channel及env標籤,並將channel的值做爲kafka的topic信息java
二、Kafka收到Filebeat的新增字段及Topic信息,自動建立Topic信息,以等待logstash消費python
三、Logstash根據腳本自動生成input及output配置spring
這裏的topic必定和filebeat的channel一致。json
示範:bootstrap
filebeat層:vim
- type: log processors: - add_fields: fields: env: "prod" ## ansible調用Python根據網段信息自動判斷生成 ip: "10.12.11.27" ## ansible調用Python根據網段信息自動判斷生成 apptype: "service" ## ansible調用Python根據域名自動判斷生成 channel: "cms.prod.tarscorp.com" ##ansible調用Python根據站點目錄生成 enabled: true paths: - /data1/logs/cms.prod.tarscorp.com/*.log output.kafka: codec.json: pretty: true escape_html: false hosts: ["kafka1.mgt.tarscorp.com:9092", "kafka2.mgt.tarscorp.com:9092", "kafka3.mgt.tarscorp.com:9092"] topic: 'cms.prod.tarscorp.com' ## topic和channel取自同一數據 partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Kafka層:忽略,集羣+開啓自動建立Topic便可服務器
logstash層:架構
vim prod-input.conf ##輸入信息 kafka { topics => "cms.prod.tarscorp.com" ## kafka中的topic信息 bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092" decorate_events => false group_id => "logstash-tars" # consumer_threads => 5 client_id => "mgt-elk-logstash1-prod" codec => "json" add_field => {"topic"=>"cms.prod.tarscorp.com"} ##這裏主要是爲了方便logstash作判斷 } vim prod-javasite.conf ##輸出信息 if [topic] == "cms.prod.tarscorp.com" { elasticsearch { hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"] manage_template => false index => "prod-javasite-%{+YYYY.MM.dd}" } }
說明:咱們這裏技術棧爲Java spring,因此全部的Java站點都會放在[prod-javasite-%{+YYYY.MM.dd}]的索引下,因爲配置環節衆多,因此在站點上架時,採用分發機器先部署服務,而後ansible部署filebeat,其中ansible會經過Python腳原本獲取服務器網段及站點信息經過templates來補充channel、Topic、apptype、env、ip標籤生成配置,實現自動判斷,減輕運維參與環節負擔。app
使用腳本生成舉例:
./add_info.py --env prod --topic cms.prod.tarscorp.com --module javasite
vim add_info.py
#!/usr/bin/env python3 import os,sys,argparse parser = argparse.ArgumentParser(description='Logstash configuration file add tools') parser.add_argument('--env',type=str,required=True,help='環境信息') parser.add_argument('--topic',type=str,required=True,help='Topic信息') parser.add_argument('--module',type=str,required=True,help='模塊信息') args = parser.parse_args() env_info = args.env topic_name = args.topic module_info = args.module date = "%{+YYYY.MM.dd}" template_input = ''' kafka { topics => "%s" bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092" decorate_events => false group_id => "logstash-tars" # consumer_threads => 5 client_id => "mgt-elk-logstash1-dev" codec => "json" add_field => {"topic"=>"%s"} } } ''' %(topic_name,topic_name) template_output = ''' if [topic] == "%s" { elasticsearch { hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"] manage_template => false index => "%s-%s-%s" } } } ''' %(topic_name,env_info,module_info,date) init_input = ''' input { ''' init_output = ''' output { ''' path_home = "/etc/logstash/conf.d/" input_file = "/etc/logstash/conf.d/%s-input.conf" % (env_info) output_file = "/etc/logstash/conf.d/%s-%s.conf" % (env_info, module_info) if os.path.exists(path_home) == False: print('請在logstash主機運行該腳本') exit(code=255) if os.path.exists(input_file) == False: with open(input_file, mode='w', encoding='utf-8') as f: f.write(init_input) if os.path.exists(output_file) == False: with open(output_file, mode='w', encoding='utf-8') as f: f.write(init_output) with open(input_file,mode='rb+') as f: f.seek(-2,2) f.write(template_input.encode('utf-8')) with open(output_file,mode='rb+') as f: f.seek(-2,2) f.write(template_output.encode('utf-8'))