mysql準實時同步數據到Elasticsearch

4. 安裝JDK八、MySQL5.6驅動以及Logstash -6.0.0

ECS中分別安裝JDK八、MySQL5.6驅動以及Logstash -6.0.0。以下圖:html

_

安裝Logstash input、output插件,此案例數據輸入是MySQL,輸出是ES,so相應的插件應該是logstash-input-jdbc和logstash-output-elasticsearch。java

安裝插件的命令分別是(在Logstash主目錄下運行):
./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearch
_mysql

5. MySQL中建立數據庫、測試的數據表

以下圖所示sql

_
建表語句(其中updatetime用於記錄數據更新時間戳):數據庫

create table jm_es_employee (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );
6. 配置Logstash做業文件

ECS中建立Logstash做業配置文件,文件名爲logstash-mysql-es.conf。json

配置文件內容:curl

input{
     jdbc {
         jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name"
         jdbc_user => "db_user"
         jdbc_password => "db_password"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "select * from jm_es_employee where updatetime > :sql_last_value"
         use_column_value => true
         tracking_column => "updatetime"
         last_run_metadata_path => "./logstash_jdbc_last_run"
       } 
} 
output{
      elasticsearch {
         hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"
         user => "elastic"
         password => "es_password"
         index => "employee"
         document_id => "%{id}"
      }
      stdout {
         codec => json_lines
     }
 }

其中紅色字體部分要作相應的替換,input中的 schedule參數用於配置數據刷新頻率,schedule => " *"表示每分鐘刷新一次,這也是MySQL數據同步的最小頻率。Logstash支持豐富的參數配置,詳情請參考Elasitc官網文檔elasticsearch

7. 同步數據

ECS中指定參數啓動Logstash服務,執行命令:ide

logstash -f logstash-mysql-es.conf

_

以後每分鐘會去MySQL中刷新數據測試

_

RDS中寫入幾條測試數據,腳本以下:

INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]');

因爲以前在Logstash配置文件中,output部分既配置了輸出到ES,同時也輸出到控制檯。因此當檢測到MySQL中有更新時,數據會輸出到控制檯中,以下圖:

_

此時說明MySQL中的數據更新已經被Logstash推送到ES服務。經過在ECS執行命令檢查ES服務中的索引是否被建立。執行命令:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'

_

紅框內的employee即咱們在配置文件中指定的索引名,說明ES中的索引已經被成功建立。

8. 結果驗證

經過關鍵字檢索ES服務,驗證寫入Mysql的數據是否被成功索引到ES並被檢索到,執行命令經過關鍵字「Smith 「來檢索數據:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty'

_

至此,MySQL中的數據已經被成功索引到Elasticsearch,並也能夠被準實時的檢索到。

相關文章
相關標籤/搜索