以前公司小王在工做中清理elasticSearch 索引,不當心使用腳本清空了最近使用的重要索引,致使開發沒法準確的進行定位生產問題,形成了很大困擾。git
當時咱們的生產環境中是這樣配置日誌系統的:服務器 -> filebeat -> kakfa -> logstash -> elasticsearch -> kibana 頁面。其中在緩存配置中咱們配置了kafka集羣。github
es 數據丟失以後,咱們考慮使用拉去kafka 集羣中的存儲的消息,讓它從新消費一邊,更換消費組,從頭開始進行消費 配置 auto_offset_reset => "earliest" 設定。json
具體實踐以下:bootstrap
以下配置是我測試經過的緩存
input { kafka { bootstrap_servers => ["192.168.39.23:9082,192.168.39.24:9082,192.168.39.25:9082"] topics_pattern => ["MY_REPORT_TOPIC_.*"] client_id => "logstash-1" group_id => "logstash_to_elastic2" # 這裏的組要更新一下,原先是logstash_to_elastic auto_offset_reset => "earliest" # 這裏設定消費模式爲從頭開始消費,也就是從最初開始消費 consumer_threads => 1 decorate_events => true codec => "json" } } output { elasticsearch { hosts => "http://192.168.39.27:9200" index => "logtype-%{[logType]}" document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}" } }
重點說下讀取kafka的配置:服務器
bootstrap_servers:kafka集羣地址
topics_pattern:我這裏是通配的前綴爲」MY_REPORT_TOPIC_」的topic,注意後面跟的是.*,不要忘了這個點,正則的語法,通配全部字符用的
client_id:自定義的客戶端id
group_id:自定義組id,兩個消費者同時消費同一組會形成其中一個沒法消費,可是隻要建立多個partition就能夠了,即組內消費者數量和分區數相同就均可以消費了,這樣一個組內全部消費者可以分擔負載
auto_offset_reset:
因爲我有些歷史數據要消費,因此設置爲earliest從頭讀的網絡
consumer_threads:通常最佳配置是同一個組內consumer個數(或線程數)等於topic的分區數,若是大於分區數會造logstash進程閒置,不然一個進程訪問多個分區。若是有多個Logstash實例,那就讓實例個數 * consumer_threads等於分區數便可
數據寫入到Kafka時沒有重複,但後續流程可能由於網絡抖動、傳輸失敗等致使重試形成數據重複。如何解決呢?不須要依賴業務數據就能夠去重。去重的原理也很簡單,利用es document id便可。elasticsearch
對於es,若是寫入數據時沒有指定document id,就會隨機生成一個uuid,若是指定了,就使用指定的值。對於須要去重的場景,咱們指定document id便可。測試
在output elasticsearch中能夠經過document_id字段指定document id,咱們須要構造出一個」uuid」能唯一標識kafka中的一條數據,這個很是簡單:<topic>+<partition>+<offset>
,這三個值的組合就能夠惟一標識kafka集羣中的一條數據。ui
input kafka插件也已經幫咱們把消息對應的元數據信息記錄到了@metadata(Logstash的元數據字段,不會輸出到output裏面去)字段裏面:
所以就有了上面的配置:
document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
decorate_events:只有當decorate_events選項配置爲true的時候,上面的[@metadata](https://github.com/metadata)纔會記錄那些元數據,不然不會記錄。而該配置項的默認值是false,即不記錄