一、由於本項目採用的log4j2,因此在log4j2中直接配置 web
<Kafka name="Kafka" topic="XX_log"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/> <Property name="bootstrap.servers">127.0.0.1:9092</Property> <Property name="timeout.ms">500</Property> </Kafka>
PatternLayout 中格式採用了||將內容鏈接起來目的爲了logstash進行切分,其中增長timeout.ms屬性爲了保證日誌系統掛掉的狀況不會對業務系統產生較大影響,固然kafka能夠採用集羣的方式,bootstrap.servers多個地址用「,」分隔。XX_web表明當前業務平臺。 json
二、搭建kafka集羣這裏就很少介紹了官網很全, bootstrap
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
三、建立logstash動態模板 app
{ "template": "*", "settings": { "index.refresh_interval": "5s", "number_of_replicas": "0", "number_of_shards": "3" }, "mappings": { "_default_": { "_all": { "enabled": false }, "dynamic_templates": [ { "message_field": { "match": "message", "match_mapping_type": "string", "mapping": { "type": "string", "index": "analyzed" } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "string", "index": "not_analyzed" } } } ], "properties": { "dateTime": { "type": "date", "format": "yyy-MM-dd HH:mm:ss" }, "@version": { "type": "integer", "index": "not_analyzed" }, "context": { "type": "string", "index": "analyzed" }, "level": { "type": "string", "index": "not_analyzed" }, "class": { "type": "string", "index": "not_analyzed" }, "server": { "type": "string", "index": "not_analyzed" } } } } }
四、配置logstash elasticsearch
input{ kafka { zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183" group_id =>"logstash" topic_id =>"XX_log" reset_beginning => false consumer_threads => 5 decorate_events => true } } filter { mutate{ split=>["message","||"] add_field => { "dateTime" => "%{[message][0]}" } add_field => { "level" => "%{[message][1]}" } add_field => { "class" => "%{[message][2]}" } add_field => { "server" => "%{[message][3]}" } add_field => { "context" => "%{[message][4]}" } remove_field => ["message"] } date { match => ["logdate", "yyyy-MM-dd HH:mm:ss"] } } output{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "XX_log-%{+YYYY-MM}" codec => "json" manage_template => true template_overwrite => true flush_size => 50000 idle_flush_time => 10 workers => 2 template => "E:\logstash\template\template_log.json" } }
按照年月將日誌保存進ES索引中index => "XX_log-%{+YYYY-MM}",logstash從kafka集羣中讀取日誌信息。 url
五、搭建ZK集羣,這裏就很少介紹了,網上資料比較多日誌
六、搭建ES集羣,ES集羣比較簡單,設置的參數不要太多就可使用。code
七、配置kibana orm
server.port: 5601 # 服務端口 # The host to bind the server to. server.host: "115.28.240.113" elasticsearch.url: http://127.0.0.1:9200 ES地址-集羣 kibana.index: "kibana"
八、版本 JKD 1.7 ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4server