mysql數據同步es的實現原理通常有兩種:php
經過sql語句定時查詢進行同步。html
使用binlog進行同步java
這裏主要介紹logstash-jdbc這種方法主要是由於這種方法的是官方項目,更新維護的比較頻繁,使用起來也比較放心,固然它也有必定的弊端。廢話很少開始正題。node
本文是在docker環境下構建的環境,首先天然須要安裝docker,筆者的版本爲18.09.2。由於一直開發使用的是laradock的環境,這裏依舊是使用這個環境爲基礎添加的es和logstash。由於mysql沒有做任何變更,關於mysql的安裝編排自行參考laradock文檔。其實包括es和logstash的編排都很簡單。mysql
es的docker文件Dockerfilegit
FROM elasticsearch:7.3.0 EXPOSE 9200 9300
docker-compose.yml文件github
elasticsearch: build: ./elasticsearch volumes: - elasticsearch:/usr/share/elasticsearch/data environment: - cluster.name=laradock-cluster - node.name=laradock-node - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - cluster.initial_master_nodes=laradock-node ulimits: memlock: soft: -1 hard: -1 ports: - "${ELASTICSEARCH_HOST_HTTP_PORT}:9200" - "${ELASTICSEARCH_HOST_TRANSPORT_PORT}:9300" depends_on: - php-fpm networks: - frontend - backend
docker-compose up -d elasticsearch mysql
啓動鏡像,能夠測試一下發現es和mysql應該已經啓動了。sql
既然是使用logstash-jdbc插件來實現同步,那咱們的重點就是在編排logstash上了。docker
logstash-jdbc是經過java鏈接mysql的,因此咱們首先須要一個jdbc的jar文件,能夠從官網下載,獲得一個jar文件,將它copy進docker的編排目錄裏,固然也能夠編排時使用docker下載。數據庫
筆者的logstash的編排目錄
dockerfile文件
FROM logstash:7.3.0 USER root RUN rm -f /usr/share/logstash/pipeline/logstash.conf RUN curl -L -o /usr/share/logstash/lib/mysql-connector-java-5.1.47.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar ADD ./pipeline/ /usr/share/logstash/pipeline/ ADD ./config /usr/share/logstash/config/ ADD mysql-connector-java-8.0.18.jar /usr/share/logstash/logstash-core/lib/jars/mysql/mysql-connector-java-8.0.18.jar RUN logstash-plugin install logstash-input-jdbc
同步任務配置文件mysql/mysql.conf
input { jdbc { # 這裏是jdbc鏈接mysql的語句,第二個mysql是由於這個docker項目內部訪問須要網絡橋接緣由,你能夠自行修改 jdbc_connection_string => "jdbc:mysql://mysql:3306/koudai" jdbc_user => "root" jdbc_password => "root" # 驅動; /usr/share/logstash/config/mysql/ 爲logstash插件啓動是查找jar文件的默認目錄 jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-8.0.18.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" jdbc_default_timezone => "Asia/Shanghai" # sql文件名 statement_filepath => "/usr/share/logstash/config/mysql/task.sql" # 監聽間隔[分、時、天、月、年] schedule => "* * * * *" type => "user" # 是否記錄上次執行結果, 若是爲真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true # 是否須要記錄某個column 的值,若是record_last_run爲真,能夠自定義咱們須要 track 的 column 名稱,此時該參數就要爲 true. 不然默認 track 的是 timestamp 的值. use_column_value => true # 若是 use_column_value 爲真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的. 通常是mysql主鍵 tracking_column => "lastmodifiedTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_article_last_time" # 是否清除 last_run_metadata_path 的記錄,若是爲真那麼每次都至關於從頭開始查詢全部的數據庫記錄 clean_run => false # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } } output { elasticsearch { # 同理是由於這個docker項目內部訪問須要網絡橋接緣由 hosts => ["http://elasticsearch:9200"] index => "user" document_id => "%{uid}" } }
同步mysql
select * from kdgx_partner_charge_user
更多使用能夠參考這裏
自此mysql中的新增數據就能夠同步到es了。