一、 早期關係型數據庫之間的數據同步php
1)、全量同步html
好比從oracle數據庫中同步一張表的數據到Mysql中,一般的作法就是 分頁查詢源端的表,而後經過 jdbc的batch 方式插入到目標表,這個地方須要注意的是,分頁查詢時,必定要按照主鍵id來排序分頁,避免重複插入。java
2)、基於數據文件導出和導入的全量同步,這種同步方式通常只適用於同種數據庫之間的同步,若是是不一樣的數據庫,這種方式可能會存在問題。python
3)、基於觸發器的增量同步mysql
增量同步通常是作實時的同步,早期不少數據同步都是基於關係型數據庫的觸發器trigger來作的。git
使用觸發器實時同步數據的步驟:github
A、 基於原表創觸發器,觸發器包含insert,modify,delete 三種類型的操做,數據庫的觸發器分Before和After兩種狀況,一種是在insert,modify,delete 三種類型的操做發生以前觸發(好比記錄日誌操做,通常是Before),一種是在insert,modify,delete 三種類型的操做以後觸發。spring
B、 建立增量表,增量表中的字段和原表中的字段徹底同樣,可是須要多一個操做類型字段(分表表明insert,modify,delete 三種類型的操做),而且須要一個惟一自增ID,表明數據原表中數據操做的順序,這個自增id很是重要,否則數據同步就會錯亂。sql
C、 原表中出現insert,modify,delete 三種類型的操做時,經過觸發器自動產生增量數據,插入增量表中。shell
D、處理增量表中的數據,處理時,必定是按照自增id的順序來處理,這種效率會很是低,沒辦法作批量操做,否則數據會錯亂。 有人可能會說,是否是能夠把insert操做合併在一塊兒,modify合併在一塊兒,delete操做合併在一塊兒,而後批量處理,我給的答案是不行,由於數據的增刪改是有順序的,合併後,就沒有順序了,同一條數據的增刪改順序一旦錯了,那數據同步就確定錯了。
市面上不少數據etl數據交換產品都是基於這種思想來作的。
E、 這種思想使用kettle 很容易就能夠實現,筆者曾經在本身的博客中寫過 kettle的文章,http://www.javashuo.com/article/p-vjidbsky-dy.html
4)、基於時間戳的增量同步
A、首先咱們須要一張臨時temp表,用來存取每次讀取的待同步的數據,也就是把每次從原表中根據時間戳讀取到數據先插入到臨時表中,每次在插入前,先清空臨時表的數據
B、咱們還須要建立一個時間戳配置表,用於存放每次讀取的處理完的數據的最後的時間戳。
C、每次從原表中讀取數據時,先查詢時間戳配置表,而後就知道了查詢原表時的開始時間戳。
D、根據時間戳讀取到原表的數據,插入到臨時表中,而後再將臨時表中的數據插入到目標表中。
E、從緩存表中讀取出數據的最大時間戳,而且更新到時間戳配置表中。緩存表的做用就是使用sql獲取每次讀取到的數據的最大的時間戳,固然這些都是徹底基於sql語句在kettle中來配置,才須要這樣的一張臨時表。
二、 大數據時代下的數據同步
1)、基於數據庫日誌(好比mysql的binlog)的同步
咱們都知道不少數據庫都支持了主從自動同步,尤爲是mysql,能夠支持多主多從的模式。那麼咱們是否是能夠利用這種思想呢,答案固然是確定的,mysql的主從同步的過程是這樣的。
A、master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
B、slave將master的binary log events拷貝到它的中繼日誌(relay log);
C、slave重作中繼日誌中的事件,將改變反映它本身的數據。
阿里巴巴開源的canal就完美的使用這種方式,canal 假裝了一個Slave 去喝Master進行同步。
A、 canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
B、 mysql master收到dump請求,開始推送binary log給slave(也就是canal)
C、 canal解析binary log對象(原始爲byte流)
另外canal 在設計時,特別設計了 client-server 模式,交互協議使用 protobuf 3.0 , client 端可採用不一樣語言實現不一樣的消費邏輯。
canal java 客戶端: https://github.com/alibaba/canal/wiki/ClientExample
canal c# 客戶端: https://github.com/dotnetcore/CanalSharp
canal go客戶端: https://github.com/CanalClient/canal-go
canal php客戶端: https://github.com/xingwenge/canal-php、
github的地址:https://github.com/alibaba/canal/
另外canal 1.1.1版本以後, 默認支持將canal server接收到的binlog數據直接投遞到MQ https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
D、在使用canal時,mysql須要開啓binlog,而且binlog-format必須爲row,能夠在mysql的my.cnf文件中增長以下配置
log-bin=E:/mysql5.5/bin_log/mysql-bin.log
binlog-format=ROW
server-id=12三、
E、 部署canal的服務端,配置canal.properties文件,而後 啓動 bin/startup.sh 或bin/startup.bat
#設置要監聽的mysql服務器的地址和端口
canal.instance.master.address = 127.0.0.1:3306
#設置一個可訪問mysql的用戶名和密碼並具備相應的權限,本示例用戶名、密碼都爲canal
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#鏈接的數據庫
canal.instance.defaultDatabaseName =test
#訂閱實例中全部的數據庫和表
canal.instance.filter.regex = .*\\..*
#鏈接canal的端口
canal.port= 11111
#監聽到的數據變動發送的隊列
canal.destinations= example
F、 客戶端開發,在maven中引入canal的依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.21</version>
</dependency>
代碼示例:
package com.example; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; public class CanalClientExample { public static void main(String[] args) { while (true) { //鏈接canal CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal"); connector.connect(); //訂閱 監控的 數據庫.表 connector.subscribe("demo_db.user_tab"); //一次取10條 Message msg = connector.getWithoutAck(10); long batchId = msg.getId(); int size = msg.getEntries().size(); if (batchId < 0 || size == 0) { System.out.println("沒有消息,休眠5秒"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // CanalEntry.RowChange row = null; for (CanalEntry.Entry entry : msg.getEntries()) { try { row = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = row.getRowDatasList(); for (CanalEntry.RowData rowdata : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList(); Map<String, Object> dataMap = transforListToMap(afterColumnsList); if (row.getEventType() == CanalEntry.EventType.INSERT) { //具體業務操做 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.UPDATE) { //具體業務操做 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.DELETE) { List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { if ("id".equals(column.getName())) { //具體業務操做 System.out.println("刪除的id:" + column.getValue()); } } } else { System.out.println("其餘操做類型不作處理"); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } //確認消息 connector.ack(batchId); } } } public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) { Map map = new HashMap(); if (afterColumnsList != null && afterColumnsList.size() > 0) { for (CanalEntry.Column column : afterColumnsList) { map.put(column.getName(), column.getValue()); } } return map; } }
2)、基於BulkLoad的數據同步,好比從hive同步數據到hbase
咱們有兩種方式能夠實現,
A、 使用spark任務,經過HQl讀取數據,而後再經過hbase的Api插入到hbase中。
可是這種作法,效率很低,並且大批量的數據同時插入Hbase,對Hbase的性能影響很大。
在大數據量的狀況下,使用BulkLoad能夠快速導入,BulkLoad主要是借用了hbase的存儲設計思想,由於hbase本質是存儲在hdfs上的一個文件夾,而後底層是以一個個的Hfile存在的。HFile的形式存在。Hfile的路徑格式通常是這樣的:
/hbase/data/default(默認是這個,若是hbase的表沒有指定命名空間的話,若是指定了,這個就是命名空間的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>
B、 BulkLoad實現的原理就是按照HFile格式存儲數據到HDFS上,生成Hfile可使用hadoop的MapReduce來實現。若是不是hive中的數據,好比外部的數據,那麼咱們能夠將外部的數據生成文件,而後上傳到hdfs中,組裝RowKey,而後將封裝後的數據在回寫到HDFS上,以HFile的形式存儲到HDFS指定的目錄中。
固然咱們也能夠不事先生成hfile,可使用spark任務直接從hive中讀取數據轉換成RDD,而後使用HbaseContext的自動生成Hfile文件,部分關鍵代碼以下:
… //將DataFrame轉換bulkload須要的RDD格式 val rddnew = datahiveDF.rdd.map(row => { val rowKey = row.getAs[String](rowKeyField) fields.map(field => { val fieldValue = row.getAs[String](field) (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue)))) }) }).flatMap(array => { (array) }) … //使用HBaseContext的bulkload生成HFile文件 hbaseContext.bulkLoad[Put](rddnew.map(record => { val put = new Put(record._1) record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) put }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload") val conn = ConnectionFactory.createConnection(hBaseConf) val hbTableName = TableName.valueOf(hBaseTempTable.getBytes()) val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn)) val realTable = conn.getTable(hbTableName) HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator) // bulk load start val loader = new LoadIncrementalHFiles(hBaseConf) val admin = conn.getAdmin() loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator) sc.stop() } … def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList() import scala.collection.JavaConversions._ for (cells <- put.getFamilyCellMap.entrySet().iterator()) { val family = cells.getKey for (value <- cells.getValue) { val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value)) ret.+=((kfq, CellUtil.cloneValue(value))) } } ret.iterator } } …
C、pg_bulkload的使用
這是一個支持pg庫(PostgreSQL)批量導入的插件工具,它的思想也是經過外部文件加載的方式,這個工具筆者沒有親自去用過,詳細的介紹能夠參考:https://my.oschina.net/u/3317105/blog/852785 pg_bulkload項目的地址:http://pgfoundry.org/projects/pgbulkload/
3)、基於sqoop的全量導入
Sqoop 是hadoop生態中的一個工具,專門用於外部數據導入進入到hdfs中,外部數據導出時,支持不少常見的關係型數據庫,也是在大數據中經常使用的一個數據導出導入的交換工具。
Sqoop從外部導入數據的流程圖以下:
Sqoop將hdfs中的數據導出的流程以下:
本質都是用了大數據的數據分佈式處理來快速的導入和導出數據。
4)、HBase中建表,而後Hive中建一個外部表,這樣當Hive中寫入數據後,HBase中也會同時更新,可是須要注意
A、hbase中的空cell在hive中會補null
B、hive和hbase中不匹配的字段會補null
咱們能夠在hbase的shell 交互模式下,建立一張hbse表
create 'bokeyuan','zhangyongqing'
使用這個命令,咱們能夠建立一張叫bokeyuan的表,而且裏面有一個列族zhangyongqing,hbase建立表時,能夠不用指定字段,可是須要指定表名以及列族
咱們可使用的hbase的put命令插入一些數據
put 'bokeyuan','001','zhangyongqing:name','robot'
put 'bokeyuan','001','zhangyongqing:age','20'
put 'bokeyuan','002','zhangyongqing:name','spring'
put 'bokeyuan','002','zhangyongqing:age','18'
能夠經過hbase的scan 全表掃描的方式查看咱們插入的數據
scan ' bokeyuan'
咱們繼續建立一張hive外部表
create external table bokeyuan (id int, name string, age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age")
TBLPROPERTIES("hbase.table.name" = " bokeyuan");
外部表建立好了後,咱們可使用HQL語句來查詢hive中的數據了
select * from classes;
OK
1 robot 20
2 spring 18
Debezium是一個開源項目,爲捕獲數據更改(change data capture,CDC)提供了一個低延遲的流式處理平臺。你能夠安裝而且配置Debezium去監控你的數據庫,而後你的應用就能夠消費對數據庫的每個行級別(row-level)的更改。只有已提交的更改纔是可見的,因此你的應用不用擔憂事務(transaction)或者更改被回滾(roll back)。Debezium爲全部的數據庫更改事件提供了一個統一的模型,因此你的應用不用擔憂每一種數據庫管理系統的錯綜複雜性。另外,因爲Debezium用持久化的、有副本備份的日誌來記錄數據庫數據變化的歷史,所以,你的應用能夠隨時中止再重啓,而不會錯過它中止運行時發生的事件,保證了全部的事件都能被正確地、徹底地處理掉。
該項目的GitHub地址爲:https://github.com/debezium/debezium 這是一個開源的項目。
原本監控數據庫,而且在數據變更的時候得到通知其實一直是一件很複雜的事情。關係型數據庫的觸發器能夠作到,可是隻對特定的數據庫有效,並且一般只能更新數據庫內的狀態(沒法和外部的進程通訊)。一些數據庫提供了監控數據變更的API或者框架,可是沒有一個標準,每種數據庫的實現方式都是不一樣的,而且須要大量特定的知識和理解特定的代碼才能運用。確保以相同的順序查看和處理全部更改,同時最小化影響數據庫仍然很是具備挑戰性。
Debezium正好提供了模塊爲你作這些複雜的工做。一些模塊是通用的,而且可以適用多種數據庫管理系統,但在功能和性能方面仍有一些限制。另外一些模塊是爲特定的數據庫管理系統定製的,因此他們一般能夠更多地利用數據庫系統自己的特性來提供更多功能,Debezium提供了對MongoDB,mysql,pg,sqlserver的支持。
Debezium是一個捕獲數據更改(CDC)平臺,而且利用Kafka和Kafka Connect實現了本身的持久性、可靠性和容錯性。每個部署在Kafka Connect分佈式的、可擴展的、容錯性的服務中的connector監控一個上游數據庫服務器,捕獲全部的數據庫更改,而後記錄到一個或者多個Kafka topic(一般一個數據庫表對應一個kafka topic)。Kafka確保全部這些數據更改事件都可以多副本而且整體上有序(Kafka只能保證一個topic的單個分區內有序),這樣,更多的客戶端能夠獨立消費一樣的數據更改事件而對上游數據庫系統形成的影響降到很小(若是N個應用都直接去監控數據庫更改,對數據庫的壓力爲N,而用debezium彙報數據庫更改事件到kafka,全部的應用都去消費kafka中的消息,能夠把對數據庫的壓力降到1)。另外,客戶端能夠隨時中止消費,而後重啓,從上次中止消費的地方接着消費。每一個客戶端能夠自行決定他們是否須要exactly-once或者at-least-once消息交付語義保證,而且全部的數據庫或者表的更改事件是按照上游數據庫發生的順序被交付的。
對於不須要或者不想要這種容錯級別、性能、可擴展性、可靠性的應用,他們可使用內嵌的Debezium connector引擎來直接在應用內部運行connector。這種應用仍須要消費數據庫更改事件,但更但願connector直接傳遞給它,而不是持久化到Kafka裏。
更詳細的介紹能夠參考:https://www.jianshu.com/p/f86219b1ab98
bireme 的github 地址 https://github.com/HashDataInc/bireme
bireme 的介紹:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
另外Maxwell也是能夠實現MySQL到Kafka的消息中間件,消息格式採用Json:
Download:
https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz
Source:
https://github.com/zendesk/maxwell
datax 是阿里開源的etl 工具,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各類異構數據源之間高效的數據同步功能,採用java+python進行開發,核心是java語言實現。
github地址:https://github.com/alibaba/DataX
A、設計架構:
數據交換經過DataX進行中轉,任何數據源只要和DataX鏈接上便可以和已實現的任意數據源同步
B、框架
DataX做業運行起來以後, Job監控並等待多個TaskGroup模塊任務完成,等待全部TaskGroup任務完成後Job成功退出。不然,異常退出,進程退出值非0
舉例來講,用戶提交了一個DataX做業,而且配置了20個併發,目的是將一個100張分表的mysql數據同步到odps裏面。 DataX的調度決策思路是:
優點:
總之,Job拆分爲Task,分別在框架提供的容器中執行,插件只須要實現Job和Task兩部分邏輯。
物理執行有三種運行模式:
整體來講,當JobContainer和TaskGroupContainer運行在同一個進程內的時候就是單機模式,在不一樣進程執行就是分佈式模式。
若是須要開發插件,能夠看zhege這個插件開發指南: https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
數據源支持狀況:
類型 | 數據源 | Reader(讀) | Writer(寫) | 文檔 |
---|---|---|---|---|
RDBMS 關係型數據庫 | MySQL | √ | √ | 讀 、寫 |
Oracle | √ | √ | 讀 、寫 | |
SQLServer | √ | √ | 讀 、寫 | |
PostgreSQL | √ | √ | 讀 、寫 | |
DRDS | √ | √ | 讀 、寫 | |
通用RDBMS(支持全部關係型數據庫) | √ | √ | 讀 、寫 | |
阿里雲數倉數據存儲 | ODPS | √ | √ | 讀 、寫 |
ADS | √ | 寫 | ||
OSS | √ | √ | 讀 、寫 | |
OCS | √ | √ | 讀 、寫 | |
NoSQL數據存儲 | OTS | √ | √ | 讀 、寫 |
Hbase0.94 | √ | √ | 讀 、寫 | |
Hbase1.1 | √ | √ | 讀 、寫 | |
Phoenix4.x | √ | √ | 讀 、寫 | |
Phoenix5.x | √ | √ | 讀 、寫 | |
MongoDB | √ | √ | 讀 、寫 | |
Hive | √ | √ | 讀 、寫 | |
無結構化數據存儲 | TxtFile | √ | √ | 讀 、寫 |
FTP | √ | √ | 讀 、寫 | |
HDFS | √ | √ | 讀 、寫 | |
Elasticsearch | √ | 寫 | ||
時間序列數據庫 | OpenTSDB | √ | 讀 | |
TSDB | √ | 寫 |
OGG 通常主要用於Oracle數據庫。即Oracle GoldenGate是Oracle的同步工具 ,能夠實現兩個Oracle數據庫之間的數據的同步,也能夠實現Oracle數據同步到Kafka,相關的配置操做能夠參考以下:
http://www.javashuo.com/article/p-faeyfdkn-b.html
https://www.jianshu.com/p/446ed2f267fa
http://blog.itpub.net/15412087/viewspace-2154644/
Databus是一個實時的、可靠的、支持事務的、保持一致性的數據變動抓取系統。 2011年在LinkedIn正式進入生產系統,2013年開源。
Databus經過挖掘數據庫日誌的方式,將數據庫變動實時、可靠的從數據庫拉取出來,業務能夠經過定製化client實時獲取變動。
Databus的傳輸層端到端延遲是微秒級的,每臺服務器每秒能夠處理數千次數據吞吐變動事件,同時還支持無限回溯能力和豐富的變動訂閱功能。
github:https://github.com/linkedin/databus
databus架構設計:
對比項 |
|
Databus |
canal |
結論 |
---|---|---|---|---|
支持的數據庫 |
|
mysql, oracle |
mysql(聽說內部版本支持oracle) |
Databus目前支持的數據源更多 |
業務開發 |
|
業務只須要實現事件處理接口 |
事件處理外,須要處理ack/rollback, 反序列化異常等 |
Databus開發接口用戶友好度更高 |
服務模型 |
relay |
relay能夠同時服務多個client |
一個server instance只能服務一個client (受限於server端保存拉取位點) |
Databus服務模式更靈活 |
|
client |
client能夠拉取多個relay的變動, 訪問的relay能夠指定拉取某些表某些分片的變動 |
client只能從一個server拉取變動, 並且只能是拉取全量的變動 |
|
可擴展性 |
|
client能夠線性擴展,處理能力也能線性擴展 (Databus可識別pk,自動作數據分片) |
client沒法擴展 |
Databus擴展性更好 |
可用性 |
client ha |
client支持cluster模式,每一個client處理一部分數據, 某個client掛掉,其餘client自動接管對應分片數據 |
主備client模式,主client消費, 若是主client掛掉,備client可自動接管 |
Databus實時熱備方案更成熟 |
|
relay/server ha |
多個relay可鏈接到同一個數據庫, client能夠配置多個relay,relay故障啓動切換 |
主備relay模式,relay經過zk進行failover |
canal主備模式對數據庫影響更小 |
|
故障對上游 數據庫的影響 |
client故障,bootstrap會繼續拉取變動, client恢復後直接從bootstrap拉取歷史變動 |
client故障會阻塞server拉取變動, client恢復會致使server瞬時從數據庫拉取大量變動 |
Databus自己的故障對數據庫影響幾乎爲0 |
系統狀態監控 |
|
程序經過http接口將運行狀態暴露給外部 |
暫無 |
Databus程序可監控性更好 |
開發語言 |
|
java,核心代碼16w,測試代碼6w |
java,4.2w核心代碼,6k測試代碼 |
Databus項目更成熟,固然學習成本也更大 |
總結:
一、databus活躍度不高,datax和canal 相對比較活躍。
二、datax 通常比較適合於全量數據同步,對全量數據同步效率很高(任務能夠拆分,併發同步,因此效率高),對於增量數據同步支持的不太好(能夠依靠時間戳+定時調度來實現,可是不能作到實時,延遲較大)。
三、canal 、databus 等因爲是經過日誌抓取的方式進行同步,因此對增量同步支持的比較好。
四、以上這些工具都缺乏一個監控和任務配置調度管理的平臺來進行支撐。