4. 安裝JDK八、MySQL5.6驅動以及Logstash -6.0.0
ECS中分別安裝JDK八、MySQL5.6驅動以及Logstash -6.0.0。以下圖:html
安裝Logstash input、output插件,此案例數據輸入是MySQL,輸出是ES,so相應的插件應該是logstash-input-jdbc和logstash-output-elasticsearch。java
安裝插件的命令分別是(在Logstash主目錄下運行):
./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearchmysql
5. MySQL中建立數據庫、測試的數據表
以下圖所示sql
建表語句(其中updatetime用於記錄數據更新時間戳):數據庫
create table jm_es_employee ( id varchar(10), first_name varchar(20), last_name varchar(20), age int(10), about varchar(100), interests varchar(100), updatetime timestamp null default current_timestamp on update current_timestamp );
6. 配置Logstash做業文件
ECS中建立Logstash做業配置文件,文件名爲logstash-mysql-es.conf。json
配置文件內容:curl
input{ jdbc { jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "1000" jdbc_default_timezone =>"Asia/Shanghai" schedule => "* * * * *" statement => "select * from jm_es_employee where updatetime > :sql_last_value" use_column_value => true tracking_column => "updatetime" last_run_metadata_path => "./logstash_jdbc_last_run" } } output{ elasticsearch { hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "employee" document_id => "%{id}" } stdout { codec => json_lines } }
其中紅色字體部分要作相應的替換,input中的 schedule參數用於配置數據刷新頻率,schedule => " *"表示每分鐘刷新一次,這也是MySQL數據同步的最小頻率。Logstash支持豐富的參數配置,詳情請參考Elasitc官網文檔。elasticsearch
7. 同步數據
ECS中指定參數啓動Logstash服務,執行命令:ide
logstash -f logstash-mysql-es.conf
以後每分鐘會去MySQL中刷新數據測試
RDS中寫入幾條測試數據,腳本以下:
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]');
因爲以前在Logstash配置文件中,output部分既配置了輸出到ES,同時也輸出到控制檯。因此當檢測到MySQL中有更新時,數據會輸出到控制檯中,以下圖:
此時說明MySQL中的數據更新已經被Logstash推送到ES服務。經過在ECS執行命令檢查ES服務中的索引是否被建立。執行命令:
curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'
紅框內的employee即咱們在配置文件中指定的索引名,說明ES中的索引已經被成功建立。
8. 結果驗證
經過關鍵字檢索ES服務,驗證寫入Mysql的數據是否被成功索引到ES並被檢索到,執行命令經過關鍵字「Smith 「來檢索數據:
curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty'
至此,MySQL中的數據已經被成功索引到Elasticsearch,並也能夠被準實時的檢索到。