把 mysql 的數據遷移到 es 有不少方式,好比直接用 es 官方推薦的 logstash 工具,或者監聽 mysql 的 binlog 進行同步,能夠結合一些開源的工具好比阿里的 canal。java
這裏打算詳細介紹另外一個也是不錯的同步方案,這個方案基於 kafka 的鏈接器。流程能夠歸納爲:mysql
Kafka Connect有兩個核心概念:Source和Sink。 Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱爲Connector,也就是鏈接器。在本例中,mysql的鏈接器是source,es的鏈接器是sink。git
這些鏈接器自己已經開源,咱們之間拿來用便可。不須要再造輪子。github
我下面全部的操做都是在本身的mac上進行的。sql
首先咱們準備兩個鏈接器,分別是 kafka-connect-elasticsearch
和 kafka-connect-elasticsearch
, 你能夠經過源碼編譯他們生成jar包,源碼地址:數據庫
kafka-connect-elasticsearchbootstrap
kafka-connect-mysqlsegmentfault
我我的不是很推薦這種源碼的編譯方式,由於真的好麻煩。除非想研究源碼。elasticsearch
我是直接下載 confluent 平臺的工具包,裏面有編譯號的jar包能夠直接拿來用,下載地址:工具
我下載的是 confluent-5.3.1 版本, 相關的jar包在 confluent-5.3.1/share/java 目錄下
咱們把編譯好的或者下載的jar包拷貝到kafka的libs目錄下。拷貝的時候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相關的依賴包也要一塊兒拷貝過來,好比es這個jar包目錄下的http相關的,jersey相關的等,不然會報各類 java.lang.NoClassDefFoundError
的錯誤。
另外mysql-connector-java-5.1.22.jar也要放進去。
數據庫和es我都是在本地啓動的,這個過程具體就不說了,網上有不少參考的。
我建立了一個名爲test的數據庫,裏面有一個名爲login的表。
這部分是最關鍵的,我實際操做的時候這裏也是最耗時的。
首先配置jdbc的鏈接器。
咱們從confluent工具包裏拷貝一個配置文件的模板(confluent-5.3.1/share目錄下),自帶的只有sqllite的配置文件,拷貝一份到kafka的config目錄下,更名爲sink-quickstart-mysql.properties,文件內容以下:
# tasks to create: name=mysql-login-connector connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111 mode=timestamp+incrementing timestamp.column.name=login_time incrementing.column.name=id topic.prefix=mysql. table.whitelist=login
connection.url指定要鏈接的數據庫,這個根據本身的狀況修改。mode指示咱們想要如何查詢數據。在本例中我選擇incrementing遞增模式和timestamp 時間戳模式混合的模式, 並設置incrementing.column.name遞增列的列名和時間戳所在的列名。
混合模式仍是比較推薦的,它能儘可能的保證數據同步不丟失數據。具體的緣由你們能夠查閱相關資料,這裏就不詳述了。
topic.prefix是衆多表名以前的topic的前綴,table.whitelist是白名單,表示要監聽的表,可使組合多個表。兩個組合在一塊兒就是該表的變動topic,好比在這個示例中,最終的topic就是mysql.login。
connector.class是具體的鏈接器處理類,這個不用改。
其它的配置基本不用改。
接下來就是ES的配置了。一樣也是拷貝 quickstart-elasticsearch.properties 文件到kafka的config目錄下,而後修改,我本身的環境內容以下:
name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=mysql.login key.ignore=true connection.url=http://localhost:9200 type.name=mysqldata
topics的名字和上面mysql設定的要保持一致,同時這個也是ES數據導入的索引。從裏也能夠看出,ES的鏈接器一個實例只能監聽一張表。
type.name須要關注下,我使用的ES版本是7.1,咱們知道在7.x的版本中已經只有一個固定的type(_doc)了,使用低版本的鏈接器在同步的時候會報錯誤,我這裏使用的5.3.1版本已經兼容了。繼續看下面的章節就知道了。
關於es鏈接器和es的兼容性問題,有興趣的能夠看看下面這個issue:
https://github.com/confluenti...
固然首先啓動zk和kafka。
而後咱們啓動mysql的鏈接器,
./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &
接着手動往login表插入幾條記錄,正常狀況下這些變動已經發到kafka對應的topic上去了。爲了驗證,咱們在控制檯啓動一個消費者從mysql.login主題讀取數據:
./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning
能夠看到剛纔插入的數據。
把數據從 MySQL 移動到 Kafka 裏就算完成了,接下來把數據從 Kafka 寫到 ElasticSearch 裏。
首先啓動ES和kibana,固然後者不是必須的,只是方便咱們在IDE環境裏測試ES。你也能夠經過控制檯給ES發送HTTP的指令。
先把以前啓動的mysql鏈接器進程結束(由於會佔用端口),再啓動 ES 鏈接器,
./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &
若是正常的話,ES這邊應該已經有數據了。打開kibana的開發工具,在console裏執行
GET _cat/indices
這是獲取節點上全部的索引,你應該能看到,
green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb
說明索引已經正常建立了。而後咱們查詢下,
GET mysql.login/_search?pretty=true
結果以下,
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 4, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "mysql.login", "_type" : "mysqldata", "_id" : "mysql.login+0+0", "_score" : 1.0, "_source" : { "id" : 1, "username" : "lucas1", "login_time" : 1575870785000 } }, { "_index" : "mysql.login", "_type" : "mysqldata", "_id" : "mysql.login+0+1", "_score" : 1.0, "_source" : { "id" : 2, "username" : "lucas2", "login_time" : 1575870813000 } }, { "_index" : "mysql.login", "_type" : "mysqldata", "_id" : "mysql.login+0+2", "_score" : 1.0, "_source" : { "id" : 3, "username" : "lucas3", "login_time" : 1575874031000 } }, { "_index" : "mysql.login", "_type" : "mysqldata", "_id" : "mysql.login+0+3", "_score" : 1.0, "_source" : { "id" : 4, "username" : "lucas4", "login_time" : 1575874757000 } } ] } }
參考:
1.《kafka權威指南》
關注公衆號:思無邪了嗎
csdn博客: https://blog.csdn.net/pony_ma...