使用logstash同步至ES的幾個坑

1.前言

記錄使用logstash從sqlserver同步數據到ES中遇到的幾點問題。使用的版本是es6.8.3+logstash6.8.3html

2.logstash配置文件

2.1input

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驅動jar包
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;"
        jdbc_user => "sa"
        jdbc_password => "123456"
        jdbc_default_timezone => "Asia/Shanghai"
		jdbc_paging_enabled => "true"#分頁
		record_last_run => true#記錄上一次運行的值
		use_column_value => true#使用數據庫中的字段追蹤
		tracking_column => "update_time"#追蹤的字段名稱
		tracking_column_type => "timestamp"#追蹤的字段類型
		last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#上一次運行的值存儲的文件地址
		clean_run => false#使用數據庫中的字段追蹤
		statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql語句
		schedule => "*/5 * * * * *"#每5s執行一次
    }
}
  • statement

因爲要查的數據是表關聯的數據,一開始想的是創建多個jdbc,把數據存到es的不一樣的索引中,利用父子文檔進行關聯查詢,git

後來發現這種辦法效率差,並且影響ES的性能,因此解決辦法就是在sqlserver中創建好多表聯查好的視圖,es6

這裏的statement 中的v_test就是建立好的視圖.sql

因爲設置了Logstash 增量更新, 必需要使用 update_time>:sql_last_value and update_time<GETDATE()這種限制條件,這樣才能夠保證數據不丟失也不重複數據庫

具體緣由見:如何使用 Logstash 實現關係型數據庫與 ElasticSearch 之間的數據同步json

  • schedule

網上的不少教程都說最小間隔是1min,其實是能夠作到秒級的.ruby

schedule => "*/5 * * * * *"只要在前面再加一個* 單位就是秒,這裏就是每5s執行一次app

2.2filter

filter {
	if ![test]{ruby{code =>'event.set("test","")'}}	
	mutate{
		convert => { "id" => "integer" }
		remove_field => ["@timestamp"]
		remove_field => ["@version"]
	}
}

這裏主要是對從sqlserver數據庫查出來的數據進行一些處理,我這裏刪去了大多數的內容,僅保留一些表明性的.curl

  • if ![test]{ruby{code =>'event.set("test","")'}}

這個的意思是 test字段爲null時,使用ruby這個語言進行處理,code =>'' 這裏面就是寫代碼的elasticsearch

event.set("test","")意思就是 設置test字段的內容爲""

固然咱們也能夠先event.get("test"),獲取test字段的內容,而後在進行一系列處理後,再event.set,這樣就能夠保存處理後的字段的值

ruby語言的具體語法能夠參考這個:Ruby教程

  • convert => { "id" => "integer" }

這個的意思就是將id字段的類型轉化爲integer,若是某個字段是時間類型能夠轉化爲timestamp類型

2.3output

output {
		elasticsearch {
			hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"]
			index => "consumer_statistics"#索引名稱
			document_id => "%{id}"#索引的id
			document_type => "consumer_statistics"#索引的type,這個在6.x版本之後就已經被廢棄,能夠忽略這個
			template_name => "consumer_statistics"#索引模板的名稱
		}
}
  • document_id => "%{id}"

文檔的id就是導入數據的id,這樣設置能夠實現冪等性

  • template_name => "consumer_statistics"

索引模板的名稱consumer_statistics,ES會調用模板名稱爲consumer_statistics建立索引.

固然前提是你得先建立好這個模板

3.索引模板的建立

  • 指令

curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '在這裏輸入你建立的模板'
  • 模板

{
	"template": "consumer_statistics",
	"order": 2,
	"version": 60001,
	"index_patterns": ["consumer_statistics"],
	"settings": {
		"index": {
			"refresh_interval": "5s",
			"max_result_window": "2147483647"#設置from+size的最大值
		}
	},
	"mappings": {
		"_default_": {
			"dynamic_templates": [{
				"message_field": {
					"path_match": "message",
					"mapping": {
						"norms": false,
						"type": "text"
					},
					"match_mapping_type": "string"
				}
			}, {
				"string_fields": {
					"mapping": {
						"norms": false,
						"type": "text",
						"fields": {
							"keyword": {
								"ignore_above": 1024,#設置不被索引的字段長度
								"type": "keyword"
							}
						}
					},
					"match_mapping_type": "string",
					"match": "*"
				}
			}],
			"properties": {
				"@timestamp": {
					"type": "date"
				},
				"geoip": {
					"dynamic": true,
					"properties": {
						"ip": {
							"type": "ip"
						},
						"latitude": {
							"type": "half_float"
						},
						"location": {
							"type": "geo_point"
						},
						"longitude": {
							"type": "half_float"
						}
					}
				},
				"@version": {
					"type": "keyword"
				}
			}
		}
	},
	"aliases": {}
}
  • "max_result_window": "2147483647"

在業務處理的過程當中每每須要分頁,ES的JAVA-API是經過from,size來設置分頁數量和每頁的數量,

在默認的狀況下from+size必需要小於10000,可是若是實際需求大於10000,則必須在這裏設置

我這裏設置的是max_result_window的最大值,實際狀況中不須要設置如此之大,

由於ES會在內存中進行排序,若是一次返回的結果過大,可能會致使服務宕機.

  • "ignore_above": 1024

這裏默認是256,意思是若是某一個字段的內容超過256字節的話,那麼將不會被索引.

也就是說從ES中是可以看到這條數據的存在,可是若是你指定查詢條件,是查不出來的.

舉個例子,如今ES中有id,test兩個字段,一共100條數據

test字段中只有一條數據超過了256字節,如今我查詢test字段中包含"1"的數據,

即便這個超過256字節的數據含有1,可是也不會被查詢到.

爲了可以讓他被索引到,這裏將256改爲1024,即只有超過1024字節纔會不被索引.

  • 完整命令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '
{"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'

在建立模板的過程當中,發現老是建立失敗,後來發現弄成這樣的兩行,就不會出錯了.

相關文章
相關標籤/搜索