1.前言
記錄使用logstash從sqlserver同步數據到ES中遇到的幾點問題。使用的版本是es6.8.3+logstash6.8.3html
2.logstash配置文件
2.1input
input { jdbc { jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驅動jar包 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;" jdbc_user => "sa" jdbc_password => "123456" jdbc_default_timezone => "Asia/Shanghai" jdbc_paging_enabled => "true"#分頁 record_last_run => true#記錄上一次運行的值 use_column_value => true#使用數據庫中的字段追蹤 tracking_column => "update_time"#追蹤的字段名稱 tracking_column_type => "timestamp"#追蹤的字段類型 last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#上一次運行的值存儲的文件地址 clean_run => false#使用數據庫中的字段追蹤 statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql語句 schedule => "*/5 * * * * *"#每5s執行一次 } }
statement
因爲要查的數據是表關聯的數據,一開始想的是創建多個jdbc,把數據存到es的不一樣的索引中,利用父子文檔進行關聯查詢,git
後來發現這種辦法效率差,並且影響ES的性能,因此解決辦法就是在sqlserver中創建好多表聯查好的視圖,es6
這裏的statement
中的v_test就是建立好的視圖.sql
因爲設置了Logstash 增量更新, 必需要使用 update_time>:sql_last_value and update_time<GETDATE()
這種限制條件,這樣才能夠保證數據不丟失也不重複數據庫
具體緣由見:如何使用 Logstash 實現關係型數據庫與 ElasticSearch 之間的數據同步json
schedule
網上的不少教程都說最小間隔是1min,其實是能夠作到秒級的.ruby
schedule => "*/5 * * * * *"
只要在前面再加一個* 單位就是秒,這裏就是每5s執行一次app
2.2filter
filter { if ![test]{ruby{code =>'event.set("test","")'}} mutate{ convert => { "id" => "integer" } remove_field => ["@timestamp"] remove_field => ["@version"] } }
這裏主要是對從sqlserver數據庫查出來的數據進行一些處理,我這裏刪去了大多數的內容,僅保留一些表明性的.curl
if ![test]{ruby{code =>'event.set("test","")'}}
這個的意思是 test字段爲null時,使用ruby這個語言進行處理,code =>''
這裏面就是寫代碼的elasticsearch
event.set("test","")
意思就是 設置test字段的內容爲""
固然咱們也能夠先event.get("test")
,獲取test字段的內容,而後在進行一系列處理後,再event.set
,這樣就能夠保存處理後的字段的值
ruby語言的具體語法能夠參考這個:Ruby教程
convert => { "id" => "integer" }
這個的意思就是將id字段的類型轉化爲integer,若是某個字段是時間類型能夠轉化爲timestamp類型
2.3output
output { elasticsearch { hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"] index => "consumer_statistics"#索引名稱 document_id => "%{id}"#索引的id document_type => "consumer_statistics"#索引的type,這個在6.x版本之後就已經被廢棄,能夠忽略這個 template_name => "consumer_statistics"#索引模板的名稱 } }
document_id => "%{id}"
文檔的id就是導入數據的id,這樣設置能夠實現冪等性
template_name => "consumer_statistics"
索引模板的名稱consumer_statistics
,ES會調用模板名稱爲consumer_statistics
建立索引.
固然前提是你得先建立好這個模板
3.索引模板的建立
-
指令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '在這裏輸入你建立的模板'
-
模板
{ "template": "consumer_statistics", "order": 2, "version": 60001, "index_patterns": ["consumer_statistics"], "settings": { "index": { "refresh_interval": "5s", "max_result_window": "2147483647"#設置from+size的最大值 } }, "mappings": { "_default_": { "dynamic_templates": [{ "message_field": { "path_match": "message", "mapping": { "norms": false, "type": "text" }, "match_mapping_type": "string" } }, { "string_fields": { "mapping": { "norms": false, "type": "text", "fields": { "keyword": { "ignore_above": 1024,#設置不被索引的字段長度 "type": "keyword" } } }, "match_mapping_type": "string", "match": "*" } }], "properties": { "@timestamp": { "type": "date" }, "geoip": { "dynamic": true, "properties": { "ip": { "type": "ip" }, "latitude": { "type": "half_float" }, "location": { "type": "geo_point" }, "longitude": { "type": "half_float" } } }, "@version": { "type": "keyword" } } } }, "aliases": {} }
"max_result_window": "2147483647"
在業務處理的過程當中每每須要分頁,ES的JAVA-API是經過from,size來設置分頁數量和每頁的數量,
在默認的狀況下from+size必需要小於10000,可是若是實際需求大於10000,則必須在這裏設置
我這裏設置的是max_result_window
的最大值,實際狀況中不須要設置如此之大,
由於ES會在內存中進行排序,若是一次返回的結果過大,可能會致使服務宕機.
"ignore_above": 1024
這裏默認是256,意思是若是某一個字段的內容超過256字節的話,那麼將不會被索引.
也就是說從ES中是可以看到這條數據的存在,可是若是你指定查詢條件,是查不出來的.
舉個例子,如今ES中有id,test兩個字段,一共100條數據
test字段中只有一條數據超過了256字節,如今我查詢test字段中包含"1"的數據,
即便這個超過256字節的數據含有1,可是也不會被查詢到.
爲了可以讓他被索引到,這裏將256改爲1024,即只有超過1024字節纔會不被索引.
- 完整命令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d ' {"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'
在建立模板的過程當中,發現老是建立失敗,後來發現弄成這樣的兩行,就不會出錯了.