使用kafka鏈接器遷移mysql數據到ElasticSearch

概述

把 mysql 的數據遷移到 es 有不少方式,好比直接用 es 官方推薦的 logstash 工具,或者監聽 mysql 的 binlog 進行同步,能夠結合一些開源的工具好比阿里的 canal。java

這裏打算詳細介紹另外一個也是不錯的同步方案,這個方案基於 kafka 的鏈接器。流程能夠歸納爲:mysql

  1. mysql鏈接器監聽數據變動,把變動數據發送到 kafka topic。
  2. ES 監聽器監聽kafka topic 消費,寫入 ES。

Kafka Connect有兩個核心概念:Source和Sink。 Source負責導入數據到Kafka,Sink負責從Kafka導出數據,它們都被稱爲Connector,也就是鏈接器。在本例中,mysql的鏈接器是source,es的鏈接器是sink。git

這些鏈接器自己已經開源,咱們之間拿來用便可。不須要再造輪子。github

過程詳解

準備鏈接器工具

我下面全部的操做都是在本身的mac上進行的。sql

首先咱們準備兩個鏈接器,分別是 kafka-connect-elasticsearchkafka-connect-elasticsearch, 你能夠經過源碼編譯他們生成jar包,源碼地址:數據庫

kafka-connect-elasticsearchbootstrap

kafka-connect-mysqlsegmentfault

我我的不是很推薦這種源碼的編譯方式,由於真的好麻煩。除非想研究源碼。elasticsearch

我是直接下載 confluent 平臺的工具包,裏面有編譯號的jar包能夠直接拿來用,下載地址:工具

confluent 工具包

我下載的是 confluent-5.3.1 版本, 相關的jar包在 confluent-5.3.1/share/java 目錄下

咱們把編譯好的或者下載的jar包拷貝到kafka的libs目錄下。拷貝的時候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相關的依賴包也要一塊兒拷貝過來,好比es這個jar包目錄下的http相關的,jersey相關的等,不然會報各類 java.lang.NoClassDefFoundError 的錯誤。

另外mysql-connector-java-5.1.22.jar也要放進去。

數據庫和ES環境準備

數據庫和es我都是在本地啓動的,這個過程具體就不說了,網上有不少參考的。

我建立了一個名爲test的數據庫,裏面有一個名爲login的表。

配置鏈接器

這部分是最關鍵的,我實際操做的時候這裏也是最耗時的。

首先配置jdbc的鏈接器。

咱們從confluent工具包裏拷貝一個配置文件的模板(confluent-5.3.1/share目錄下),自帶的只有sqllite的配置文件,拷貝一份到kafka的config目錄下,更名爲sink-quickstart-mysql.properties,文件內容以下:

# tasks to create:
name=mysql-login-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111
mode=timestamp+incrementing
timestamp.column.name=login_time
incrementing.column.name=id
topic.prefix=mysql.
table.whitelist=login

connection.url指定要鏈接的數據庫,這個根據本身的狀況修改。mode指示咱們想要如何查詢數據。在本例中我選擇incrementing遞增模式和timestamp 時間戳模式混合的模式, 並設置incrementing.column.name遞增列的列名和時間戳所在的列名。

混合模式仍是比較推薦的,它能儘可能的保證數據同步不丟失數據。具體的緣由你們能夠查閱相關資料,這裏就不詳述了。

topic.prefix是衆多表名以前的topic的前綴,table.whitelist是白名單,表示要監聽的表,可使組合多個表。兩個組合在一塊兒就是該表的變動topic,好比在這個示例中,最終的topic就是mysql.login。

connector.class是具體的鏈接器處理類,這個不用改。

其它的配置基本不用改。

接下來就是ES的配置了。一樣也是拷貝 quickstart-elasticsearch.properties 文件到kafka的config目錄下,而後修改,我本身的環境內容以下:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql.login
key.ignore=true
connection.url=http://localhost:9200
type.name=mysqldata

topics的名字和上面mysql設定的要保持一致,同時這個也是ES數據導入的索引。從裏也能夠看出,ES的鏈接器一個實例只能監聽一張表。

type.name須要關注下,我使用的ES版本是7.1,咱們知道在7.x的版本中已經只有一個固定的type(_doc)了,使用低版本的鏈接器在同步的時候會報錯誤,我這裏使用的5.3.1版本已經兼容了。繼續看下面的章節就知道了。

關於es鏈接器和es的兼容性問題,有興趣的能夠看看下面這個issue:

https://github.com/confluenti...

啓動測試

固然首先啓動zk和kafka。

而後咱們啓動mysql的鏈接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &

接着手動往login表插入幾條記錄,正常狀況下這些變動已經發到kafka對應的topic上去了。爲了驗證,咱們在控制檯啓動一個消費者從mysql.login主題讀取數據:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

能夠看到剛纔插入的數據。

把數據從 MySQL 移動到 Kafka 裏就算完成了,接下來把數據從 Kafka 寫到 ElasticSearch 裏。

首先啓動ES和kibana,固然後者不是必須的,只是方便咱們在IDE環境裏測試ES。你也能夠經過控制檯給ES發送HTTP的指令。

先把以前啓動的mysql鏈接器進程結束(由於會佔用端口),再啓動 ES 鏈接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &

若是正常的話,ES這邊應該已經有數據了。打開kibana的開發工具,在console裏執行

GET _cat/indices

這是獲取節點上全部的索引,你應該能看到,

green open mysql.login                  1WqRjkbfTlmXj8eKBPvAtw 1 1      4    0    12kb   7.8kb

說明索引已經正常建立了。而後咱們查詢下,

GET mysql.login/_search?pretty=true

結果以下,

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+0",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "username" : "lucas1",
          "login_time" : 1575870785000
        }
      },
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+1",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "username" : "lucas2",
          "login_time" : 1575870813000
        }
      },
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+2",
        "_score" : 1.0,
        "_source" : {
          "id" : 3,
          "username" : "lucas3",
          "login_time" : 1575874031000
        }
      },
      {
        "_index" : "mysql.login",
        "_type" : "mysqldata",
        "_id" : "mysql.login+0+3",
        "_score" : 1.0,
        "_source" : {
          "id" : 4,
          "username" : "lucas4",
          "login_time" : 1575874757000
        }
      }
    ]
  }
}

參考:

1.《kafka權威指南》

  1. https://www.jianshu.com/p/46b...

關注公衆號:思無邪了嗎

我的博客:http://www.machengyu.net

csdn博客: https://blog.csdn.net/pony_ma...

思否: https://segmentfault.com/u/ma...

相關文章
相關標籤/搜索