如何用gpss實現MySQL到Greenplum的增量同步

​數據同步通常分爲兩種方式:全量和增量。增量數據是一類典型的流數據,基於日誌的增量同步幾乎已是全部數據庫的標配,它能夠減小常規ETL工做對系統帶來的影響,並大大下降數據的延遲。做爲Greenplum的流計算引擎,Greenplum Stream Server(gpss)能將不一樣源端的增量數據同步到Greenplum中。爲更好的支持這一應用場景,即將發佈的gpss 1.3.6 對增量同步的功能作了加強。mysql

Greenplum Stream Server(簡稱gpss),是Greenplum的下一代數據加載解決方案,相比於gpfdist,GPSS會提供流數據支持及API接口,有更好的擴展性,支持更豐富的功能,並開放更細粒度的任務控制接口。在即將發佈gpss 1.3.6 中,對增量同步所作的的功能加強包括:sql

  • 能夠根據指定的遞增排序字段,確保最新的消息生效
  • Merge可支持insert,update和delete三種操做

本文將以MySQL爲例,簡要介紹下gpss如何實現向Greenplum的增量同步。數據庫

1測試環境

  • MySQL 8.0.13
  • Maxwell 1.25.0
  • Kafka 2.2.2
  • Greenplum 6.4.0
  • GPSS 1.3.6

咱們要完成的工做是:json

  • 經過Maxwell監聽MySQL中binlog的增量變化(略)
  • 將增量數據以json的格式發送到kafka中(略)
  • 利用gpss解析kafka中的json消息
  • 將變化的數據更新到Greenplum的目標表中

MySQSL和Maxwell的配置和使用,本文將不作深刻介紹,你們能夠自行訪問文章連接閱讀學習,訪問相關文章請點擊文章底部的「閱讀原文」。segmentfault

2 測試數據簡介

測試使用的表在MySQL中定義以下:併發

​create table t_update_delete_0 (k1 decimal,
               k2 text,
               v1 decimal,
               v2 decimal,
               v3 text,
               c1 decimal,
               c2 text);

其中 k1 和 k2 列爲鍵,用來惟一標識一條記錄, v1, v2, v3 爲每次更新的數據。學習

在源端分別對這個表進行了insert,update和delete操做,每一個語句爲單獨的transaction。 測試

Insert語句爲:網站

insert into t_update_delete_0 (k1,k2,v1,v2,v3,c1,c2) 
        values (1,'k_1', 1, 3, 'v_1', 1, 'c1');

Update語句爲:spa

update t_update_delete_0 
      set v1=100,v2=300,v3='v_100' 
      where k1='1' and k2='k_1';

Delete語句爲:

delete from t_update_delete_0 where k1='1' and k2='k_1';

3 Kafka的消息格式

Maxwell能夠將捕獲到binlog解析爲json格式併發送到kafka,不一樣的操做生成的Kafka消息有細微的區別。爲了將這些消息正確的恢復到Greenplum中,咱們先對這三種類型的消息進行簡單的分析。

Insert時生成的消息示例以下:

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "insert",
  "ts": 1586956781,
  "xid": 1398209,
  "commit": true,
  "data": {
    "k1": 41,
    "k2": "k_41",
    "v1": 818,
    "v2": 2454,
    "v3": "v_818",
    "c1": 41,
    "c2": "c_41"
  }
}

database和table表示源表的表名,ts和xid字段用於表示消息的順序,type和data表示執行的操做及對應的數據。這些是全部消息類型通用的。

Delete生成的消息以下,type爲"delete",同時data中包含了完整的內容。

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "delete",
  "ts": 1586956781,
  "xid": 1398195,
  "commit": true,
  "data": {
    "k1": 44,
    "k2": "k_44",
    "v1": 744,
    "v2": 2232,
    "v3": "v_744",
    "c1": 44,
​    "c2": "c_44"
  }
}

Update除了包含新數據外,還包含了更新以前的數據(old),這裏咱們只須要新數據就夠了。

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "update",
  "ts": 1586956707,
  "xid": 1281915,
  "commit": true,
  "data": {
    "k1": 99,
    "k2": "k_99",
    "v1": 798,
    "v2": 2394,
    "v3": "v_798",
    "c1": 99,
    "c2": "c_99"
  },
  "old": {
    "v1": 800,
    "v2": 2400,
    "v3": "v_800"
  }
}

根據生成的消息,咱們須要執行以下操做:

  • 根據ts和xid對數據進行排序
  • 根據k1和k2進行匹配
  • 對type爲delete的列執行刪除操做
  • 對其它type類型執行Merge(upsert)操做

4 執行gpss的Kafka JOB

Greenplum中的定義包含了排序的字段,用來區分消息更新的前後順序,定義以下:

create table t_update_delete_0 (k1 decimal,
               k2 text,
               v1 decimal,
               v2 decimal,
               v3 text,
               c1 decimal,
               c2 text,
               ts decimal,
               xid decimal,
               del_mark boolean);

根據數據同步的需求,gpss須要的yaml配置文件以下:

DATABASE: test
USER: gpadmin
HOST: mdw
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: kafkahost:9092
        TOPIC: test
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      MODE: MERGE
      MATCH_COLUMNS:
        - k1
        - k2
      UPDATE_COLUMNS:
        - v1
        - v2
        - v3
      ORDER_COLUMNS:
        - ts
        - xid
      DELETE_CONDITION: del_mark
      TABLE: t_update_delete_0
      MAPPING:
         k1 : (c1->'data'->>'k1')::decimal
         k2 : (c1->'data'->>'k2')::text
         v1 : (c1->'data'->>'v1')::decimal
         v2 : (c1->'data'->>'v2')::decimal
         v3 : (c1->'data'->>'v3')::text
         c1 : (c1->'data'->>'c1')::decimal
         c2 : (c1->'data'->>'c2')::text
         ts : (c1->>'ts')::decimal
         xid: (c1->>'xid')::decimal
         del_mark: (c1->>'type')::text = 'delete'
   COMMIT:
      MINIMAL_INTERVAL: 2000

幾個主要的配置含義以下:

  • ORDER_COLUMNS:遞增排序的字段,每一個batch中,gpss會使用`ORDER_COLUMNS`最大的消息內容對目標表進行操做。
  • DELETE_CONDITION:軟刪除標記,gpss會刪除包含`DELETE_CONDITION`字段的匹配記錄
  • MATCH_COLUMNS:記錄的標識,也就是鍵(candidate key)
  • UPDATE_COLUMNS:須要更新的列

歸納下來,gpss執行的步驟爲:

  1. 在一個batch中,針對MATCH_COLUMNS相同的全部記錄,先根據ORDER_COLUMNS去重
  2. 目標表中存在MATCH_COLUMNS匹配的記錄時,根據UPDATE_CONDITION或者DELETE_CONDITION執行更新或者刪除操做
  3. 目標表中不存在時匹配記錄時,執行插入操做。

(因爲有去重操做,爲保證不丟失數據,在UPDATE時,Kafka的消息中須要包含整行的數據,而不單單是更新部分的數據。)

配置文件準備好後,咱們經過gpkafka來執行加載:

gpkafka load mysql.yaml

gpkafka便會從kafka中拉取對應的消息,按照設定的操做將Kafka中的增量數據同步到目標表中。

5 小結

本文簡單介紹瞭如何用gpss從MySQL進行增量同步,其它數據庫(例如Oracle,SQL Server等)也均可以利用相似的方案實現同步。不一樣的消息類型須要不一樣的處理邏輯,gpss的配置文件中有不少能夠進行後處理的部分,更詳細的內容能夠參考官方文檔:https://gpdb.docs.pivotal.io/...。因爲源端系統的多樣性,gpss的增量複製仍有不少須要完善的地方。在gpss後續版本咱們會持續加強相關功能,例如一對多(一個topic到多個目標表)的同步,自動依據topic offset排序等;歡迎你們使用,反饋,指導。也歡迎你們前往askGP(ask.greenplum.cn)交流。

得到Greenplum更多幹貨內容,歡迎前往Greenplum中文社區網站

image

相關文章
相關標籤/搜索