Elasticsearch(9):使用Logstash-input-jdbc同步數據庫中的數

一、數據同步方式html

全量同步與增量同步java

全量同步是指所有將數據同步到es,一般是剛創建es,第一次同步時使用。增量同步是指將後續的更新、插入記錄同步到es。python

二、經常使用的一些ES同步方法mysql

1)、 elasticsearch-jdbc : 嚴格意義上它已經不是第三方插件。已經成爲獨立的第三方工具。不支持5.5.1。。。
2)、elasticsearch-river-mysql插件:   https://github.com/scharron/elasticsearch-river-mysql 
3)、go-mysql-elasticsearch(國內做者siddontang) :  https://github.com/siddontang/go-mysql-elasticsearch
4)、python-mysql-replication:  github地址  https://github.com/noplay/python-mysql-replication
5)、MySQL Binlog:  經過 MySQL binlog 將 MySQL 的數據同步給 ES, 只能使用 row 模式的 binlog。
6)、Logstash-input-jdbc:  github地址  https://github.com/logstash-plugins/logstash-input-jdbcgit

三、Logstash-input-jdbc安裝github

因爲我用的ES版本是5.5.1,elasticsearch-jdbc不支持,只支持2.3.4,這就尷尬了。web

所用這裏用Logstash-input-jdbc來同步數據,logstash-input-jdbc插件是logstash 的一個個插件,使用ruby語言開發。因此要先安裝ruby,也是爲了好使用ruby中的gem安裝插件,下載地址: https://rubyinstaller.org/downloads/sql

下載下來以後,進行安裝數據庫

安裝好以後試下是否安裝成功,打開CMD輸入:json

OK,而後修改gem的源,使用如下命令查看gem源

gem sources -l

刪除默認的源

gem sources --remove https://rubygems.org/

添加新的源

gem sources -a http://gems.ruby-china.org/
gem sources -l

更改爲功,還的修改Gemfile的數據源地址。步驟以下:

gem install bundler
bundle config mirror.https://rubygems.org https://gems.ruby-china.org

而後就是安裝logstash-input-jdbc,在logstash-5.5.1/bin目錄下

執行安裝命令

.\logstash-plugin.bat install logstash-input-jdbc

靜等一下子,成功以後提示以下

四、Logstash-input-jdbc使用

官方文檔地址 

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

首先在bin目錄下新建一個mysql目錄,裏面包含jdbc.conf,jdbc.sql文件,加入mysql的驅動

jdbc.conf配置以下

input {
    stdin {
    }
    jdbc {
      # mysql 數據庫連接,test爲數據庫名
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
      # 用戶名和密碼
      jdbc_user => "root"
      jdbc_password => "root"
      # 驅動
      jdbc_driver_library => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\mysql-connector-java-5.1.9.jar"
      # 驅動類名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
	  # 執行的sql 文件路徑+名稱
      statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"
      # 設置監聽間隔  各字段含義(由左至右)分、時、天、月、年,所有爲*默認含義爲每分鐘都更新
	  schedule => "* * * * *"
      # 索引類型
	  type => "jdbc"
    }
}


filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}


output {
    elasticsearch {
	    # ES的IP地址及端口
        hosts => ["localhost:9200"]
	    # 索引名稱
        index => "article"
	    # 自增ID 須要關聯的數據庫中有有一個id字段,對應索引的id號
        document_id => "%{id}"
    }
    stdout {
	   # JSON格式輸出
        codec => json_lines
    }
}

各數據庫對應的連接以下:

Driver ="path/to/jdbc-drivers/mysql-connector-java-5.1.35-bin.jar"   //驅動程序

Class  ="com.mysql.jdbc.Driver"; 

URL  ="jdbc:mysql://localhost:3306/db_name";                           //鏈接的URL,db_name爲數據庫名

Driver ="path/to/jdbc-drivers/sqljdbc4.jar"

Class  ="com.microsoft.jdbc.sqlserver.SQLServerDriver";

URL   ="jdbc:microsoft:sqlserver://localhost:1433;DatabaseName=db_name";     //db_name爲數據庫名

Driver ="path/to/jdbc-drivers/ojdbc6-12.1.0.2.jar"

Class  ="oracle.jdbc.driver.OracleDriver";

URL   ="jdbc:oracle:thin:@loaclhost:1521:orcl";     //orcl爲數據庫的SID

//鏈接具備DB2客戶端的Provider實例

Driver ="path/to/jdbc-drivers/jt400.jar"

Class  ="com.ibm.db2.jdbc.app.DB2.Driver";

URL   ="jdbc:db2://localhost:5000/db_name";     //db_name爲數據可名

Driver ="path/to/jdbc-drivers/postgresql-9.4.1201.jdbc4.jar"

Class  ="org.postgresql.Driver";            //鏈接數據庫的方法

URL   ="jdbc:postgresql://localhost/db_name";      //db_name爲數據可名

jdbc.sql配置以下:

select * from person

就一條查詢語句對應的表數據以下:

注意:這裏的jdbc.sql和jdbc.conf文件編碼都必須是ANSI

先啓動ES,而後經過sense建立article索引

UT http://localhost:9200/article

而後經過如下命令啓動logstash

.\logstash.bat -f  .\mysql\jdbc.conf

過一會他就會自動的往ES裏添加數據,輸出的日誌以下:

執行了SQL查詢。查看下article索引會發現多出來了不少文檔

咱們在數據庫增長一條數據,看他是否自動同步到ES中

靜等一會,發現logstash的日誌

查詢了一篇,ES中的數據會多出剛剛插入的那條

下面使用 增量 來新增數據,須要在jdbc.conf配置文件中作以下修改:

input {
    stdin {
    }
    jdbc {
      # mysql 數據庫連接,test爲數據庫名
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
      # 用戶名和密碼
      jdbc_user => "root"
      jdbc_password => "root"
      # 驅動
      jdbc_driver_library => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\mysql-connector-java-5.1.9.jar"
      # 驅動類名
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      #處理中文亂碼問題
      codec => plain { charset => "UTF-8"}
      #使用其它字段追蹤,而不是用時間
      use_column_value => true
      #追蹤的字段
      tracking_column => id
      record_last_run => true
     #上一個sql_last_value值的存放文件路徑, 必需要在文件中指定字段的初始值
     last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt"
     #開啓分頁查詢
     jdbc_paging_enabled => true
     jdbc_page_size => 300
	  
      # 執行的sql 文件路徑+名稱
      statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"
      # 設置監聽間隔  各字段含義(由左至右)分、時、天、月、年,所有爲*默認含義爲每分鐘都更新
	  schedule => "* * * * *"
      # 索引類型
	  type => "jdbc"

    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
	    # ES的IP地址及端口
        hosts => ["localhost:9200"]
		# 索引名稱
        index => "article"
		# 自增ID
        document_id => "%{id}"
    }
    stdout {
	    # JSON格式輸出
        codec => json_lines
    }
}

參數介紹:

//是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true

//是否須要記錄某個column 的值,若是 record_last_run 爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值.
use_column_value => true

//若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的.好比:ID.
tracking_column => MY_ID

//指定文件,來記錄上次執行到的 tracking_column 字段的值
//好比上次數據庫有 10000 條記錄,查詢完後該文件中就會有數字 10000 這樣的記錄,下次執行 SQL 查詢能夠從 10001 條處開始.
//咱們只須要在 SQL 語句中 WHERE MY_ID > :last_sql_value 便可. 其中 :last_sql_value 取得就是該文件中的值(10000).
last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt"


//是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄
clean_run => false

//是否將 column 名稱轉小寫
lowercase_column_names => false

//存放須要執行的 SQL 語句的文件位置
statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"

這裏使用webmagic爬蟲來爬取數據,導入到數據庫中,先運行爬蟲,爬取一些數據

這裏爬取到了277條,而後啓動logstash,經過logstash導入到ES中去

打開mysql目錄下的station_parameter.txt文件

這個文件裏記錄上次執行到的 tracking_column 字段的值,好比上次數據庫有 10000 條記錄,查詢完後該文件中就會有數字 10000 這樣的記錄,下次執行 SQL 查詢能夠從 10001 條處開始,咱們只須要在 SQL 語句中 WHERE MY_ID > :last_sql_value 便可. 其中 :last_sql_value 取得就是該文件中的值。

而後開啓爬蟲,爬取數據,往數據庫裏插,logstash會自動的識別到更新,而後導入到ES中!!

相關文章
相關標籤/搜索