如何使用 Logstash 關係型數據庫與 ElasticSearch 的數據同步

做者: Alex Marquardt | 地址 how-to-keep-elastichsearch-synchronzied-with-a-relational-database-using-logstashjava

譯者前言

近期的主要工做是在爲公司的 APP 增長搜索功能。由於也遇到了須要把關係型數據庫中的數據同步 ElasticSearch 中的問題,故抽了點時間翻譯了這篇官方的博文。最近,在數據同步方面也有些思考。mysql

本篇文章的關鍵點不在 Logstash 的 JDBC 插件的使用方,而是數據同步會遇到的一些細節問題如何處理。sql

翻譯正文

爲了利用 ElasticSearch 強大的搜索能力,大部分的業務都會在關係型數據庫的基礎上部署 ElasticSearch。這類場景下,保持 ElasticSearch 和關係型數據庫的同步是很是必要的。數據庫

本篇博文將會介紹如何經過 Logstash 實如今 MySQL 和 ElasticSearch 之間數據的高效複製與同步。json

注:文中演示的代碼和方法都通過在 MySQL 中的測試,理論上適應於全部的關係型數據庫。ruby

本文中,組件的相關信息以下:elasticsearch

  • MySQL: 8.0.16.
  • Elasticsearch: 7.1.1
  • Logstash: 7.1.1
  • Java: 1.8.0_162-b12
  • JDBC input plugin: v4.3.13
  • JDBC connector: Connector/J 8.0.16

數據同步概述

本文將會經過 Logstash 的 JDBC input 插件進行 ElasticSearch 和 MySQL 之間的數據同步。從概念上講,JDBC 插件將經過週期性的輪詢以發現上次迭代後的新增和更新的數據。爲了正常工做,幾個條件須要知足:測試

ElasticSearch 中 _id 設置必須來自 MySQL 中 id 字段。它提供了 MySQL 和 ElasticSearch 之間文檔數據的映射關係。若是一條記錄在 MySQL 更新,那麼,ElasticSearch 全部關聯文檔都應該被重寫。要說明的是,重寫 ElasticSearch 中的文檔和更新操做的效率相同。在內部實現上,一個更新操做由刪除一箇舊文檔和建立一個新文檔兩部分組成。spa

當 MySQL 中插入或更新一條記錄時,必須包含一個字段用於保存字段的插入或更新時間。如此一來, Logstash 就能夠實現每次請求只獲取上次輪詢後更新或插入的記錄。Logstash 每次輪詢都會保存從 MySQL 中讀取到的最新的插入或更新時間,該時間大於上次輪詢最新時間。插件

若是知足了上述條件,咱們就能夠配置 Logstash 週期性的從 MySQL 中讀取全部最新更新或插入的記錄,而後寫入到 Elasticsearch 中。

關於 Logstash 的配置代碼,本文稍後會給出。

MySQL 設置

MySQL 庫和表的配置以下:

CREATE DATABASE es_db

USE es_db

DROP TABLE IF EXISTS es_table

CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

配置中有幾點須要說明,以下:

  • es_table,MySQL 的數據表,咱們將把它的數據同步到 ElasticSearch 中;
  • id,記錄的惟一標識。注意,id 定義爲主鍵的同時,也定義爲惟一建,能夠保證每一個 id 在表中只出現一次。同步 ElasticSearch 時,將會轉化爲文檔的 _id;
  • client_name,表示用戶定義用來保存數據的字段,爲使博文保持簡潔,咱們只定義了一個字段,更多字段也很容易加入。接下來的演示,咱們會更新該字段,用以說明不只僅新插入記錄會同步到 MySQL,更新記錄一樣會同步到 MySQL;
  • modification_time,用於保存記錄的更新或插入時間,它使得 Logstash 能夠在每次輪詢時只請求上次輪詢後新增更新的記錄;
  • insertion_time,該字段用於一條記錄插入時間,主要是爲演示方便,對同步而言,並不是必須;

MySQL 操做

前面設置完成,咱們能夠經過以下命令插入記錄:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);

使用以下命令更新記錄:

UPDATE es_table SET client_name = <new client name> WHERE id=<id>;

使用以下命令更新插入記錄:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>

同步代碼

Logstash 的 pipeline 配置代碼以下,它實現了前面描述的功能,從 MySQL 到 ElasticSearch 的數據同步。

input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => "<my username>"
    jdbc_password => "<my password>"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *",
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  # stdout { codec => "rubydebug" }
  elasticsearch {
    index => "rdbms_sync_idx"
    document_id => "%{[%metedata][_id]}"
  }
}

關於 Pipeline 配置的幾點說明,以下:

  • tracking_column

此處配置爲 "unix_ts_in_secs"。它被用於追蹤最新的記錄,並被保存在 .logstash_jdbc_last_run 文件中,下一次輪詢將以這個邊界位置爲準進行記錄獲取。SELECT 語句中,可經過 :sql_last_value 訪問該配置字段的值。

  • unix_ts_in_secs

由 SELECT 語句生成,是 "modification_time" 的 UNIX TIMESTAMP。它被前面討論的 "track_column" 引用。使用 UNIX TIMESTAMP,而非其餘時間形式,能夠減小複雜性,防止時區致使的時間不一致問題。

  • sql_last_value

內建的配置參數,指定每次輪詢的開始位置。在 input 配置中,可被 SELECT 語句引用。在每次輪詢開始前,從 .logstash_jdbc_last_run 中讀取,此案例中,即爲 "unix_ts_in_secs" 的最近值。如此即可保證每次輪詢只獲取最新插入和更新的記錄。

  • schedule

經過 cron 語法指定輪詢的執行週期,例子中,"/5 " 表示每 5 秒輪詢一次。

  • modification_time < NOW()

SELECT 語句查詢條件的一部分,當前解釋不清,具體狀況待下面的章節再做介紹。

  • filter

該配置指定將 MySQL 中的 id 複製到 metadata 字段 _id 中,用以確保 ElasticSearch 中的文檔寫入正確的 _id。而之因此使用 metadata,由於它是臨時的,不會使文檔中產生新的字段。同時,咱們也會把不但願寫入 Elasticsearch 的字段 id 和 @version 移除。

  • output

在 output 輸出段的配置,咱們指定了文檔應該被輸出到 ElasticSearch,而且設置輸出文檔 _id 爲 filter 段建立的 metadata 的 _id。若是須要調試,註釋部分的 rubydebug 能夠實現。

SELECT 語句的正確性分析

接下來,咱們將開始解釋爲何 SELECT 語句中包含 modification_time < NOW() 是很是重要的。爲了解釋這個問題,咱們將引入兩個反例演示說明,爲何下面介紹的兩種最直觀的方法是錯誤的。還有,爲何 modification_time < Now() 能夠克服這些問題。

直觀場景一

當 where 子句中僅僅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而沒有 modification < Now() 的狀況下,工做是否正常。這個場景下,SELECT 語句是以下形式:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value) ORDER BY modification_time ASC"

粗略一看,彷佛沒發現什麼問題,應該能夠正常工做。但其實,這裏有一些邊界狀況,可能致使一些文檔的丟失。舉個例子,假設 MySQL 每秒插入兩個文檔,Logstash 每 5 秒執行一次。以下圖所示,時間範圍 T0 至 T10,數據記錄 R1 至 R22。

Logstash 的第一次輪詢發生在 T5 時刻,讀取記錄 R1 至 R11,即圖中青色區域。此時,sql_last_value 即爲 T5,這個時間是從 R11 中獲取到的。

若是,當 Logstash 完成從 MySQL 讀取數據後,一樣在 T5 時刻,又有一條記錄插入到 MySQL 中。 而下一次的輪詢只會拉取到大於 T5 的記錄,這意味着 R12 將會丟失。如圖所示,青色和灰色區域分別表示當次和上次輪詢獲取到的記錄。

注意,這類場景下的 R12 將永遠不會再被寫入到 ElasticSearch。

直觀場景二

爲了解決這個問題,或許有人會想,若是把 where 子句中的大於(>)改成大於等於(>=)是否可行。SELECT 語句以下:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"

這種方式其實也不理想。這種狀況下,某些文檔可能會被兩次讀取,重複寫入到 ElasticSearch 中。雖然這不影響結果的正確性,但卻作了多餘的工做。以下圖所示,Logstash 的首次輪詢和場景一相同,青色區域表示已經讀取的記錄。

Logstash 的第二次輪詢將會讀取全部大於等於 T5 的記錄。以下圖所示,注意 R11,即紫色區域,將會被再次發送到 ElasticSearch 中。

這兩種場景的實現效果都不理想。場景一會致使數據丟失,這是沒法容忍的。場景二,存在重複讀取寫入的問題,雖然對數據正確性沒有影響,但執行了多餘的 IO。

終極方案

前面的兩場方案都不可行,咱們須要繼續尋找其餘解決方案。其實也很簡單,經過指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),咱們就能夠保證每條記錄有且只發送一次。

以下圖所示,Logstash 輪詢發生在 T5 時刻。由於指定了 modification_time < NOW(),文檔只會讀取到 T4 時刻,而且 sql_last_value 的值也將會被設置爲 T4。

開始下一次的輪詢,當前時間 T10。

因爲設置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而且當前 sql_last_value 爲 T4,所以,本次的輪詢將從 T5 開始。而 modification_time < NOW() 決定了只有時間小於等於 T9 的記錄纔會被讀取。最後,sql_last_value 也將被設置爲 T9。

如此,MySQL 中的每一個記錄就能夠作到都能被精確讀取了一次,如此就能夠避免每次輪詢可能致使的當前時間間隔內數據丟失或重複讀取的問題。

系統測試

簡單的測試能夠幫助咱們驗證配置是否如咱們所願。咱們能夠寫入一些數據至數據庫,以下:

INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');

一旦 JDBC 輸入插件觸發執行,將會從 MySQL 中讀取全部記錄,並寫入到 ElasticSearch 中。咱們能夠經過查詢語句查看 ElasticSearch 中的文檔。

GET rdbms_sync_idx/_search

執行結果以下:

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "insertion_time" : "2019-06-18T12:58:56.000Z",
          "@timestamp" : "2019-06-18T13:04:27.436Z",
          "modification_time" : "2019-06-18T12:58:56.000Z",
          "client_name" : "Jim Carrey"
        }
      },
Etc …

更新 id=1 的文檔,以下:

UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;

經過 _id = 1,能夠實現文檔的正確更新。經過執行以下命令查看文檔:

GET rdbms_sync_idx/_doc/1

響應結果以下:

{
  "_index" : "rdbms_sync_idx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "insertion_time" : "2019-06-18T12:58:56.000Z",
    "@timestamp" : "2019-06-18T13:09:30.300Z",
    "modification_time" : "2019-06-18T13:09:28.000Z",
    "client_name" : "Jimbo Kerry"
  }
}

文檔 _version 被設置爲 2,而且 modification_time 和 insertion_time 已經不同了,client_name 已經正確更新。而 @timestamp,不是咱們須要關注的,它是 Logstash 默認添加的。

更新添加 upsert 執行語句以下:

INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';

和以前同樣,咱們能夠經過查看 ElasticSearch 中相應文檔,即可驗證同步的正確性。

文檔刪除

不知道你是否已經發現,若是一個文檔從 MySQL 中刪除,該操做並不會同步到 ElasticSearch 中。列舉一些可供咱們考慮的方案,以下:

MySQL 中的記錄可經過包含 is_deleted 字段用以代表該條記錄是否有效。一旦發生更新,is_deleted 也會同步更新到 ElasticSearch 中。若是經過這種方式,在執行 MySQL 或 ElasticSearch 查詢時,咱們須要重寫查詢語句來過濾掉 is_deleted 爲 true 的記錄。同時,須要一些後臺進程將 MySQL 和 ElasticSearch 中的這些文檔刪除。

另外一個可選方案,應用系統負責 MySQL 和 ElasticSearch 中數據的刪除,即應用系統在刪除 MySQL 中數據的同時,也要負責將 ElasticSearch 中相應的文檔刪除。

總結

本文介紹瞭如何經過 Logstash 進行關係型數據庫和 ElasticSearch 之間的數據同步。文中以 MySQL 爲例,但理論上,演示的方法和代碼也應該一樣適應於其餘的關係型數據庫。

相關文章
相關標籤/搜索