大數據時代,數據實時同步解決方案的思考—最全的數據同步總結

 

一、 早期關係型數據庫之間的數據同步php

1)、全量同步html

好比從oracle數據庫中同步一張表的數據到Mysql中,一般的作法就是 分頁查詢源端的表,而後經過 jdbc的batch 方式插入到目標表,這個地方須要注意的是,分頁查詢時,必定要按照主鍵id來排序分頁,避免重複插入。java

2)、基於數據文件導出和導入的全量同步,這種同步方式通常只適用於同種數據庫之間的同步,若是是不一樣的數據庫,這種方式可能會存在問題。python

3)、基於觸發器的增量同步mysql

增量同步通常是作實時的同步,早期不少數據同步都是基於關係型數據庫的觸發器trigger來作的。git

使用觸發器實時同步數據的步驟:github

A、 基於原表創觸發器,觸發器包含insert,modify,delete 三種類型的操做,數據庫的觸發器分Before和After兩種狀況,一種是在insert,modify,delete 三種類型的操做發生以前觸發(好比記錄日誌操做,通常是Before),一種是在insert,modify,delete 三種類型的操做以後觸發。spring

B、 建立增量表,增量表中的字段和原表中的字段徹底同樣,可是須要多一個操做類型字段(分表表明insert,modify,delete 三種類型的操做),而且須要一個惟一自增ID,表明數據原表中數據操做的順序,這個自增id很是重要,否則數據同步就會錯亂。sql

C、 原表中出現insert,modify,delete 三種類型的操做時,經過觸發器自動產生增量數據,插入增量表中。shell

D、處理增量表中的數據,處理時,必定是按照自增id的順序來處理,這種效率會很是低,沒辦法作批量操做,否則數據會錯亂。  有人可能會說,是否是能夠把insert操做合併在一塊兒,modify合併在一塊兒,delete操做合併在一塊兒,而後批量處理,我給的答案是不行,由於數據的增刪改是有順序的,合併後,就沒有順序了,同一條數據的增刪改順序一旦錯了,那數據同步就確定錯了。

市面上不少數據etl數據交換產品都是基於這種思想來作的。

E、 這種思想使用kettle 很容易就能夠實現,筆者曾經在本身的博客中寫過 kettle的文章,http://www.javashuo.com/article/p-vjidbsky-dy.html

4)、基於時間戳的增量同步

A、首先咱們須要一張臨時temp表,用來存取每次讀取的待同步的數據,也就是把每次從原表中根據時間戳讀取到數據先插入到臨時表中,每次在插入前,先清空臨時表的數據

B、咱們還須要建立一個時間戳配置表,用於存放每次讀取的處理完的數據的最後的時間戳。

C、每次從原表中讀取數據時,先查詢時間戳配置表,而後就知道了查詢原表時的開始時間戳。

D、根據時間戳讀取到原表的數據,插入到臨時表中,而後再將臨時表中的數據插入到目標表中。

E、從緩存表中讀取出數據的最大時間戳,而且更新到時間戳配置表中。緩存表的做用就是使用sql獲取每次讀取到的數據的最大的時間戳,固然這些都是徹底基於sql語句在kettle中來配置,才須要這樣的一張臨時表。

二、    大數據時代下的數據同步

1)、基於數據庫日誌(好比mysql的binlog)的同步

咱們都知道不少數據庫都支持了主從自動同步,尤爲是mysql,能夠支持多主多從的模式。那麼咱們是否是能夠利用這種思想呢,答案固然是確定的,mysql的主從同步的過程是這樣的。

  A、master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);

  B、slave將master的binary log events拷貝到它的中繼日誌(relay log);

  C、slave重作中繼日誌中的事件,將改變反映它本身的數據。

阿里巴巴開源的canal就完美的使用這種方式,canal 假裝了一個Slave 去喝Master進行同步。

A、 canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議

 

B、 mysql master收到dump請求,開始推送binary log給slave(也就是canal)

C、 canal解析binary log對象(原始爲byte流)

另外canal 在設計時,特別設計了 client-server 模式,交互協議使用 protobuf 3.0 , client 端可採用不一樣語言實現不一樣的消費邏輯。

canal java 客戶端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客戶端: https://github.com/dotnetcore/CanalSharp

canal go客戶端: https://github.com/CanalClient/canal-go

canal php客戶端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

另外canal 1.1.1版本以後, 默認支持將canal server接收到的binlog數據直接投遞到MQ   https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

D、在使用canal時,mysql須要開啓binlog,而且binlog-format必須爲row,能夠在mysql的my.cnf文件中增長以下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=12三、

E、 部署canal的服務端,配置canal.properties文件,而後 啓動 bin/startup.sh 或bin/startup.bat

#設置要監聽的mysql服務器的地址和端口

canal.instance.master.address = 127.0.0.1:3306

#設置一個可訪問mysql的用戶名和密碼並具備相應的權限,本示例用戶名、密碼都爲canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#鏈接的數據庫

canal.instance.defaultDatabaseName =test

#訂閱實例中全部的數據庫和表

canal.instance.filter.regex = .*\\..*

#鏈接canal的端口

canal.port= 11111

#監聽到的數據變動發送的隊列

canal.destinations= example

F、 客戶端開發,在maven中引入canal的依賴

   <dependency>
         <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.0.21</version>
      </dependency>

代碼示例:

 

package com.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 
public class CanalClientExample {

    public static void main(String[] args) {
        while (true) {
            //鏈接canal
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
            connector.connect();
            //訂閱 監控的 數據庫.表
            connector.subscribe("demo_db.user_tab");
            //一次取10條
            Message msg = connector.getWithoutAck(10);

            long batchId = msg.getId();
            int size = msg.getEntries().size();
            if (batchId < 0 || size == 0) {
                System.out.println("沒有消息,休眠5秒");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //
                CanalEntry.RowChange row = null;
                for (CanalEntry.Entry entry : msg.getEntries()) {
                    try {
                        row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                        for (CanalEntry.RowData rowdata : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                            Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                            if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                //具體業務操做
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                //具體業務操做
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    if ("id".equals(column.getName())) {
                                        //具體業務操做
                                        System.out.println("刪除的id:" + column.getValue());
                                    }
                                }
                            } else {
                                System.out.println("其餘操做類型不作處理");
                            }

                        }

                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                //確認消息
                connector.ack(batchId);
            }


        }
    }

    public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
        Map map = new HashMap();
        if (afterColumnsList != null && afterColumnsList.size() > 0) {
            for (CanalEntry.Column column : afterColumnsList) {
                map.put(column.getName(), column.getValue());
            }
        }
        return map;
    }


}

  

2)、基於BulkLoad的數據同步,好比從hive同步數據到hbase

 

咱們有兩種方式能夠實現,

A、 使用spark任務,經過HQl讀取數據,而後再經過hbase的Api插入到hbase中。

可是這種作法,效率很低,並且大批量的數據同時插入Hbase,對Hbase的性能影響很大。

在大數據量的狀況下,使用BulkLoad能夠快速導入,BulkLoad主要是借用了hbase的存儲設計思想,由於hbase本質是存儲在hdfs上的一個文件夾,而後底層是以一個個的Hfile存在的。HFile的形式存在。Hfile的路徑格式通常是這樣的:

/hbase/data/default(默認是這個,若是hbase的表沒有指定命名空間的話,若是指定了,這個就是命名空間的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

B、 BulkLoad實現的原理就是按照HFile格式存儲數據到HDFS上,生成Hfile可使用hadoop的MapReduce來實現。若是不是hive中的數據,好比外部的數據,那麼咱們能夠將外部的數據生成文件,而後上傳到hdfs中,組裝RowKey,而後將封裝後的數據在回寫到HDFS上,以HFile的形式存儲到HDFS指定的目錄中。

 

 

固然咱們也能夠不事先生成hfile,可使用spark任務直接從hive中讀取數據轉換成RDD,而後使用HbaseContext的自動生成Hfile文件,部分關鍵代碼以下:

…
//將DataFrame轉換bulkload須要的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)
 
      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })
…
//使用HBaseContext的bulkload生成HFile文件
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
 
    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
 
    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
 
    sc.stop()
  }
…
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

…

  

C、pg_bulkload的使用

這是一個支持pg庫(PostgreSQL)批量導入的插件工具,它的思想也是經過外部文件加載的方式,這個工具筆者沒有親自去用過,詳細的介紹能夠參考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload項目的地址:http://pgfoundry.org/projects/pgbulkload/

3)、基於sqoop的全量導入

Sqoop 是hadoop生態中的一個工具,專門用於外部數據導入進入到hdfs中,外部數據導出時,支持不少常見的關係型數據庫,也是在大數據中經常使用的一個數據導出導入的交換工具。

 

 

Sqoop從外部導入數據的流程圖以下:

Sqoop將hdfs中的數據導出的流程以下:

本質都是用了大數據的數據分佈式處理來快速的導入和導出數據。

 

4)、HBase中建表,而後Hive中建一個外部表,這樣當Hive中寫入數據後,HBase中也會同時更新,可是須要注意

A、hbase中的空cell在hive中會補null

B、hive和hbase中不匹配的字段會補null

咱們能夠在hbase的shell 交互模式下,建立一張hbse表

create 'bokeyuan','zhangyongqing'

使用這個命令,咱們能夠建立一張叫bokeyuan的表,而且裏面有一個列族zhangyongqing,hbase建立表時,能夠不用指定字段,可是須要指定表名以及列族

咱們可使用的hbase的put命令插入一些數據

put 'bokeyuan','001','zhangyongqing:name','robot'

put 'bokeyuan','001','zhangyongqing:age','20'

put 'bokeyuan','002','zhangyongqing:name','spring'

put 'bokeyuan','002','zhangyongqing:age','18'

能夠經過hbase的scan 全表掃描的方式查看咱們插入的數據

scan ' bokeyuan'

咱們繼續建立一張hive外部表

create external table bokeyuan (id int, name string, age int) 

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age") 

TBLPROPERTIES("hbase.table.name" = " bokeyuan");

外部表建立好了後,咱們可使用HQL語句來查詢hive中的數據了

select * from classes;

OK

1 robot 20

2 spring 18

5)、Debezium+bireme:Debezium for PostgreSQL to Kafka  Debezium也是一個經過監控數據庫的日誌變化,經過對行級日誌的處理來達到數據同步,並且Debezium 能夠經過把數據放入到kafka,這樣就能夠經過消費kafka的數據來達到數據同步的目的。並且還能夠給多個地方進行消費使用。

 

Debezium是一個開源項目,爲捕獲數據更改(change data capture,CDC)提供了一個低延遲的流式處理平臺。你能夠安裝而且配置Debezium去監控你的數據庫,而後你的應用就能夠消費對數據庫的每個行級別(row-level)的更改。只有已提交的更改纔是可見的,因此你的應用不用擔憂事務(transaction)或者更改被回滾(roll back)。Debezium爲全部的數據庫更改事件提供了一個統一的模型,因此你的應用不用擔憂每一種數據庫管理系統的錯綜複雜性。另外,因爲Debezium用持久化的、有副本備份的日誌來記錄數據庫數據變化的歷史,所以,你的應用能夠隨時中止再重啓,而不會錯過它中止運行時發生的事件,保證了全部的事件都能被正確地、徹底地處理掉。

 

 

 

 

該項目的GitHub地址爲:https://github.com/debezium/debezium   這是一個開源的項目。

 

 

 

 

  原本監控數據庫,而且在數據變更的時候得到通知其實一直是一件很複雜的事情。關係型數據庫的觸發器能夠作到,可是隻對特定的數據庫有效,並且一般只能更新數據庫內的狀態(沒法和外部的進程通訊)。一些數據庫提供了監控數據變更的API或者框架,可是沒有一個標準,每種數據庫的實現方式都是不一樣的,而且須要大量特定的知識和理解特定的代碼才能運用。確保以相同的順序查看和處理全部更改,同時最小化影響數據庫仍然很是具備挑戰性。

 

       Debezium正好提供了模塊爲你作這些複雜的工做。一些模塊是通用的,而且可以適用多種數據庫管理系統,但在功能和性能方面仍有一些限制。另外一些模塊是爲特定的數據庫管理系統定製的,因此他們一般能夠更多地利用數據庫系統自己的特性來提供更多功能,Debezium提供了對MongoDB,mysql,pg,sqlserver的支持。

Debezium是一個捕獲數據更改(CDC)平臺,而且利用Kafka和Kafka Connect實現了本身的持久性、可靠性和容錯性。每個部署在Kafka Connect分佈式的、可擴展的、容錯性的服務中的connector監控一個上游數據庫服務器,捕獲全部的數據庫更改,而後記錄到一個或者多個Kafka topic(一般一個數據庫表對應一個kafka topic)。Kafka確保全部這些數據更改事件都可以多副本而且整體上有序(Kafka只能保證一個topic的單個分區內有序),這樣,更多的客戶端能夠獨立消費一樣的數據更改事件而對上游數據庫系統形成的影響降到很小(若是N個應用都直接去監控數據庫更改,對數據庫的壓力爲N,而用debezium彙報數據庫更改事件到kafka,全部的應用都去消費kafka中的消息,能夠把對數據庫的壓力降到1)。另外,客戶端能夠隨時中止消費,而後重啓,從上次中止消費的地方接着消費。每一個客戶端能夠自行決定他們是否須要exactly-once或者at-least-once消息交付語義保證,而且全部的數據庫或者表的更改事件是按照上游數據庫發生的順序被交付的。

       對於不須要或者不想要這種容錯級別、性能、可擴展性、可靠性的應用,他們可使用內嵌的Debezium connector引擎來直接在應用內部運行connector。這種應用仍須要消費數據庫更改事件,但更但願connector直接傳遞給它,而不是持久化到Kafka裏。

更詳細的介紹能夠參考:https://www.jianshu.com/p/f86219b1ab98

 

bireme 的github 地址  https://github.com/HashDataInc/bireme

bireme 的介紹:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

另外Maxwell也是能夠實現MySQL到Kafka的消息中間件,消息格式採用Json:

Download:
https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz 
Source:
https://github.com/zendesk/maxwell 

 

6)、datax

datax 是阿里開源的etl 工具,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各類異構數據源之間高效的數據同步功能,採用java+python進行開發,核心是java語言實現。

github地址:https://github.com/alibaba/DataX    

A、設計架構:

 

數據交換經過DataX進行中轉,任何數據源只要和DataX鏈接上便可以和已實現的任意數據源同步

B、框架

 

 

 

 

核心模塊介紹:

  1. DataX完成單個數據同步的做業,咱們稱之爲Job,DataX接受到一個Job以後,將啓動一個進程來完成整個做業同步過程。DataX Job模塊是單個做業的中樞管理節點,承擔了數據清理、子任務切分(將單一做業計算轉化爲多個子Task)、TaskGroup管理等功能。
  2. DataXJob啓動後,會根據不一樣的源端切分策略,將Job切分紅多個小的Task(子任務),以便於併發執行。Task即是DataX做業的最小單元,每個Task都會負責一部分數據的同步工做。
  3. 切分多個Task以後,DataX Job會調用Scheduler模塊,根據配置的併發數據量,將拆分紅的Task從新組合,組裝成TaskGroup(任務組)。每個TaskGroup負責以必定的併發運行完畢分配好的全部Task,默認單個任務組的併發數量爲5。
  4. 每個Task都由TaskGroup負責啓動,Task啓動後,會固定啓動Reader—>Channel—>Writer的線程來完成任務同步工做。
  5.  

    DataX做業運行起來以後, Job監控並等待多個TaskGroup模塊任務完成,等待全部TaskGroup任務完成後Job成功退出。不然,異常退出,進程退出值非0

DataX調度流程:

舉例來講,用戶提交了一個DataX做業,而且配置了20個併發,目的是將一個100張分表的mysql數據同步到odps裏面。 DataX的調度決策思路是:

  1. DataXJob根據分庫分表切分紅了100個Task。
  2. 根據20個併發,DataX計算共須要分配4個TaskGroup。
  3. 4個TaskGroup平分切分好的100個Task,每個TaskGroup負責以5個併發共計運行25個Task。

 

優點:

  • 每種插件都有本身的數據轉換策略,放置數據失真;
  • 提供做業全鏈路的流量以及數據量運行時監控,包括做業自己狀態、數據流量、數據速度、執行進度等。
  • 因爲各類緣由致使傳輸報錯的髒數據,DataX能夠實現精確的過濾、識別、採集、展現,爲用戶提過多種髒數據處理模式;
  • 精確的速度控制
  • 健壯的容錯機制,包括線程內部重試、線程級別重試;

 

從插件視角看框架

  • Job:是DataX用來描述從一個源頭到目的的同步做業,是DataX數據同步的最小業務單元;
  • Task:爲最大化而把Job拆分獲得最小的執行單元,進行併發執行;
  • TaskGroup:一組Task集合,在同一個TaskGroupContainer執行下的Task集合稱爲TaskGroup;
  • JobContainer:Job執行器,負責Job全局拆分、調度、前置語句和後置語句等工做的工做單元。相似Yarn中的JobTracker;
  • TaskGroupContainer:TaskGroup執行器,負責執行一組Task的工做單元,相似Yarn中的TAskTacker。

    總之,Job拆分爲Task,分別在框架提供的容器中執行,插件只須要實現Job和Task兩部分邏輯。

    物理執行有三種運行模式:

  • Standalone:單進程運行,沒有外部依賴;
  • Local:單進程運行,統計信息,錯誤信息彙報到集中存儲;
  • Distrubuted:分佈式多線程運行,依賴DataX Service服務;

    整體來講,當JobContainer和TaskGroupContainer運行在同一個進程內的時候就是單機模式,在不一樣進程執行就是分佈式模式。

若是須要開發插件,能夠看zhege這個插件開發指南:   https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md 

數據源支持狀況:

類型 數據源 Reader(讀) Writer(寫) 文檔
RDBMS 關係型數據庫 MySQL  、
            Oracle         √         √      、
  SQLServer  、
  PostgreSQL  、
  DRDS  、
  通用RDBMS(支持全部關係型數據庫)  、
阿里雲數倉數據存儲 ODPS  、
  ADS  
  OSS  、
  OCS  、
NoSQL數據存儲 OTS  、
  Hbase0.94  、
  Hbase1.1  、
  Phoenix4.x  、
  Phoenix5.x  、
  MongoDB  、
  Hive  、
無結構化數據存儲 TxtFile  、
  FTP  、
  HDFS  、
  Elasticsearch  
時間序列數據庫 OpenTSDB  
  TSDB  

7)、OGG

OGG 通常主要用於Oracle數據庫。即Oracle GoldenGate是Oracle的同步工具 ,能夠實現兩個Oracle數據庫之間的數據的同步,也能夠實現Oracle數據同步到Kafka,相關的配置操做能夠參考以下:

 

http://www.javashuo.com/article/p-faeyfdkn-b.html

https://www.jianshu.com/p/446ed2f267fa

http://blog.itpub.net/15412087/viewspace-2154644/

8)、databus

Databus是一個實時的、可靠的、支持事務的、保持一致性的數據變動抓取系統。 2011年在LinkedIn正式進入生產系統,2013年開源。

Databus經過挖掘數據庫日誌的方式,將數據庫變動實時、可靠的從數據庫拉取出來,業務能夠經過定製化client實時獲取變動。

Databus的傳輸層端到端延遲是微秒級的,每臺服務器每秒能夠處理數千次數據吞吐變動事件,同時還支持無限回溯能力和豐富的變動訂閱功能。

github:https://github.com/linkedin/databus

databus架構設計:

 

 

 

 

  • 來源獨立:Databus支持多種數據來源的變動抓取,包括Oracle和MySQL。
  • 可擴展、高度可用:Databus能擴展到支持數千消費者和事務數據來源,同時保持高度可用性。
  • 事務按序提交:Databus能保持來源數據庫中的事務完整性,並按照事務分組和來源的提交順尋交付變動事件。
  • 低延遲、支持多種訂閱機制:數據源變動完成後,Databus能在微秒級內將事務提交給消費者。同時,消費者使用Databus中的服務器端過濾功能,能夠只獲取本身須要的特定數據。
  • 無限回溯:這是Databus最具創新性的組件之一,對消費者支持無限回溯能力。當消費者須要產生數據的完整拷貝時(好比新的搜索索引),它不會對數據庫產生任何額外負擔,就能夠達成目的。當消費者的數據大大落後於來源數據庫時,也可使用該功能。
    • Databus Relay中繼的功能主要包括:
    1. 從Databus來源讀取變動行,並在內存緩存內將其序列化爲Databus變動事件
    2. 監聽來自Databus客戶端(包括Bootstrap Producer)的請求,並傳輸新的Databus數據變動事件
    • Databus客戶端的功能主要包括:
    1. 檢查Relay上新的數據變動事件,並執行特定業務邏輯的回調
    2. 若是落後Relay太多,向Bootstrap Server發起查詢
    3. 新Databus客戶端會向Bootstrap Server發起bootstrap啓動查詢,而後切換到向中繼發起查詢,以完成最新的數據變動事件
    4. 單一客戶端能夠處理整個Databus數據流,或者能夠成爲消費者集羣的一部分,其中每一個消費者只處理一部分流數據
    • Databus Bootstrap Producer的功能有:
    1. 檢查中繼上的新數據變動事件
    2. 將變動存儲在MySQL數據庫中
    3. MySQL數據庫供Bootstrap和客戶端使用
    • Databus Bootstrap Server的主要功能,監聽來自Databus客戶端的請求,並返回長期回溯數據變動事件。
    • 更多能夠參考 databus社區wiki主頁:https://github.com/linkedin/Databus/wiki
    • Databus和canal的功能對比:

 

對比項

 

Databus

canal

結論

支持的數據庫

 

mysql, oracle

mysql(聽說內部版本支持oracle)

Databus目前支持的數據源更多

業務開發

 

業務只須要實現事件處理接口

事件處理外,須要處理ack/rollback,

反序列化異常等

Databus開發接口用戶友好度更高

服務模型

 relay

relay能夠同時服務多個client

一個server instance只能服務一個client

(受限於server端保存拉取位點)

Databus服務模式更靈活

 

client

client能夠拉取多個relay的變動,

訪問的relay能夠指定拉取某些表某些分片的變動

client只能從一個server拉取變動,

並且只能是拉取全量的變動

可擴展性

 

client能夠線性擴展,處理能力也能線性擴展

(Databus可識別pk,自動作數據分片)

client沒法擴展

Databus擴展性更好

可用性

client ha

client支持cluster模式,每一個client處理一部分數據,

某個client掛掉,其餘client自動接管對應分片數據

主備client模式,主client消費,

若是主client掛掉,備client可自動接管

Databus實時熱備方案更成熟

 

relay/server ha

多個relay可鏈接到同一個數據庫,

client能夠配置多個relay,relay故障啓動切換

主備relay模式,relay經過zk進行failover

canal主備模式對數據庫影響更小

 

故障對上游

數據庫的影響

client故障,bootstrap會繼續拉取變動,

client恢復後直接從bootstrap拉取歷史變動

client故障會阻塞server拉取變動,

client恢復會致使server瞬時從數據庫拉取大量變動

Databus自己的故障對數據庫影響幾乎爲0

系統狀態監控

 

程序經過http接口將運行狀態暴露給外部

暫無

Databus程序可監控性更好

開發語言

 

java,核心代碼16w,測試代碼6w

java,4.2w核心代碼,6k測試代碼

Databus項目更成熟,固然學習成本也更大

總結:

一、databus活躍度不高,datax和canal 相對比較活躍。

二、datax 通常比較適合於全量數據同步,對全量數據同步效率很高(任務能夠拆分,併發同步,因此效率高),對於增量數據同步支持的不太好(能夠依靠時間戳+定時調度來實現,可是不能作到實時,延遲較大)。

三、canal 、databus 等因爲是經過日誌抓取的方式進行同步,因此對增量同步支持的比較好。

四、以上這些工具都缺乏一個監控和任務配置調度管理的平臺來進行支撐。

相關文章
相關標籤/搜索