如下配置可以實現從 SQL Server 數據庫中查詢數據,並增量式的把數據庫記錄導入到 ES 中。html
1. 查詢的 SQL 語句在 statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" 參數指定的文件中給出。java
2. 字段的轉換由 add_field 參數完成。sql
input { jdbc { jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar" jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://localhost:1433;databaseName=test_db" jdbc_user => "sa" jdbc_password => "123" # schedule => 分 時 天 月 年 # schedule => * 22 * * * //will execute at 22:00 every day schedule => "* * * * *" clean_run => false use_column_value => true tracking_column => BUG_ID record_last_run => true last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" lowercase_column_names => false statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" type => "my_info" add_field => {"[基本信息][客戶名稱]" => "%{客戶名稱}" "[基本信息][BUG_ID]" => "%{BUG_ID}" "[基本信息][責任部門]" => "%{責任部門}" "[基本信息][發現版本]" => "%{發現版本}" "[基本信息][發現日期]" => "%{發現日期}" "[基本信息][關閉日期]" => "%{關閉日期}"
}
}
其中,數據庫查詢操做 SQL 以下(my_info.sql):數據庫
SELECT 客戶名稱, BUG_ID, ISNULL(VIP_Level,'') AS VIP_Level, ISNULL(責任部門,'') AS 責任部門, ISNULL(發現版本,'') AS 發現版本, ISNULL(發現日期,'') AS 發現日期, ISNULL(關閉日期,發現日期) AS 關閉日期, ISNULL( CASE TD記錄人備註 WHEN 'NULL' THEN '' ELSE TD記錄人備註 END,'' ) AS TD記錄人備註, From test_bug_db.dbo.BugInfor WHERE BUG_ID > :sql_last_value
若是要了解其它數據庫,能夠參考個人 http://www.cnblogs.com/licongyu/p/5535833.htmlsqlserver
jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar" //jdbc sql server 驅動,各個數據庫都有對應的驅動,需本身下載 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" //jdbc class 不一樣數據庫有不一樣的 class 配置 jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db" //配置數據庫鏈接 ip 和端口,以及數據庫 jdbc_user => //配置數據庫用戶名 jdbc_password => //配置數據庫密碼
# schedule => 分 時 天 月 年 # schedule => * 22 * * * //will execute at 22:00 every day schedule => "* * * * *"
//是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true //是否須要記錄某個column 的值,若是 record_last_run 爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值. use_column_value => true //若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的.好比:ID. tracking_column => MY_ID //指定文件,來記錄上次執行到的 tracking_column 字段的值
//好比上次數據庫有 10000 條記錄,查詢完後該文件中就會有數字 10000 這樣的記錄,下次執行 SQL 查詢能夠從 10001 條處開始.
//咱們只須要在 SQL 語句中 WHERE MY_ID > :last_sql_value 便可. 其中 :last_sql_value 取得就是該文件中的值(10000). last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info" //是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄 clean_run => false //是否將 column 名稱轉小寫 lowercase_column_names => false //存放須要執行的 SQL 語句的文件位置 statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"