去年和今年年初,咱們開源了MongoShake和RedisShake分別用於MongoDB和Redis的遷移、同步、備份等多種需求。最近,咱們的shake系列又進一步壯大,咱們推出了一款dynamodb遷移的工具:dynamo-shake。目前支持從dynamodb遷移到MongoDB,後續咱們還會考慮支持多種通道,好比直接文件備份、遷移至kafka,或者遷移到別的數據庫如cassandra,redis等。
下載地址:目前暫時不對外,請聯繫燭昭。git
DynamoDB支持全量和增量的同步,進程啓動後會先進行全量同步,全量同步結束後進入增量同步的階段。
全量同步分爲數據同步和索引同步兩部分,數據同步用於同步數據,數據同步結束後將會進行索引的同步,索引同步會同步默認的primary key,用戶自建的索引GSI若是MongoDB是副本集支持,集羣版目前暫時不支持同步。
增量同步只同步數據,不一樣步增量同步過程當中產生的索引。
此外,全量和增量同步階段不支持對原來的庫表進行DDL操做,好比刪表,建表,建索引等。github
全量同步不支持斷點續傳功能,增量同步支持斷點續傳,也就是說若是增量斷開了,必定時間內恢復是能夠只進行增量的斷點續傳。但在某些狀況下,好比斷開的時間太久,或者以前的位點(參考下文)丟失,那麼都會致使從新觸發全量同步。redis
全部源端的表會寫入到目的的一個庫(默認是dynamo-shake)的不一樣表中,好比用戶有table1,table2,那麼同步完後,目的端會有個dynamo-shake的庫,庫裏面有table1和table2的表。
在原生的dynamodb中,協議是包裹了一層類型字段,其格式是「key: type: value」格式,例如用戶插入了一條{hello: 1},那麼dynamodb接口獲取的數據是{"hello": {"N": 1}}的格式。
Dynamo全部的數據類型:mongodb
那麼咱們提供2種轉換方式,raw和change,其中raw就是按照裸的dynamodb接口獲取的數據寫入:數據庫
rszz-4.0-2:PRIMARY> use dynamo-shake switched to db dynamo-shake rszz-4.0-2:PRIMARY> db.zhuzhao.find() { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : { "L" : [ { "S" : "aa1" }, { "N" : "1234" } ] }, "hello_world" : { "S" : "f2" } } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : { "N" : "222" }, "qqq" : { "SS" : [ "h1", "h2" ] }, "hello_world" : { "S" : "yyyyyyyyyyy" }, "test" : { "S" : "aaa" } } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : { "L" : [ { "N" : "0" }, { "N" : "1" }, { "N" : "2" } ] }, "hello_world" : { "S" : "測試中文" } }
change表示剝離類型字段:緩存
rszz-4.0-2:PRIMARY> use dynamo-shake switched to db dynamo-shake rszz-4.0-2:PRIMARY> db.zhuzhao.find() { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : [ "aa1", 1234 ] , "hello_world" : "f2" } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : 222, "qqq" : [ "h1", "h2" ] , "hello_world" : "yyyyyyyyyyy", "test" : "aaa" } { "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : [ 0, 1, 2 ], "hello_world" : "測試中文" }
用戶能夠根據本身的需求制定本身的同步類型。網絡
增量的斷點續傳是根據位點來實現的,默認的位點是寫入到目的MongoDB中,庫名是dynamo-shake-checkpoint。每一個表都會記錄一個checkpoint的表,一樣還會有一個status_table表記錄當前是全量同步仍是增量同步階段。session
rszz-4.0-2:PRIMARY> use dynamo-shake42-checkpoint switched to db dynamo-shake42-checkpoint rszz-4.0-2:PRIMARY> show collections status_table zz_incr0 zz_incr1 rszz-4.0-2:PRIMARY> rszz-4.0-2:PRIMARY> rszz-4.0-2:PRIMARY> db.status_table.find() { "_id" : ObjectId("5d6e0ef77e592206a8c86bfd"), "key" : "status_key", "status_value" : "incr_sync" } rszz-4.0-2:PRIMARY> db.zz_incr0.find() { "_id" : ObjectId("5d6e0ef17e592206a8c8643a"), "shard_id" : "shardId-00000001567391596311-61ca009c", "father_id" : "shardId-00000001567375527511-6a3ba193", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c8644c"), "shard_id" : "shardId-00000001567406847810-f5b6578b", "father_id" : "shardId-00000001567391596311-61ca009c", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c86456"), "shard_id" : "shardId-00000001567422218995-fe7104bc", "father_id" : "shardId-00000001567406847810-f5b6578b", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c86460"), "shard_id" : "shardId-00000001567438304561-d3dc6f28", "father_id" : "shardId-00000001567422218995-fe7104bc", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c8646a"), "shard_id" : "shardId-00000001567452243581-ed601f96", "father_id" : "shardId-00000001567438304561-d3dc6f28", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef17e592206a8c86474"), "shard_id" : "shardId-00000001567466737539-cc721900", "father_id" : "shardId-00000001567452243581-ed601f96", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" } { "_id" : ObjectId("5d6e0ef27e592206a8c8647e"), "shard_id" : "shardId-00000001567481807517-935745a3", "father_id" : "shardId-00000001567466737539-cc721900", "seq_num" : "", "status" : "done", "worker_id" : "unknown-worker", "iterator_type" : "LATEST", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAGsTOg0+3HY+yzzD1cTzc7TPXi/iBi7sA5Q6SGSoaAJ2gz2deQu5aPRW/flYK0pG9ZUvmCfWqe1A5usMFWfVvd+yubMwWSHfV2IPVs36TaQnqpMvsywll/x7IVlCgmsjr6jStyonbuHlUYwKtUSq8t0tFvAQXtKi0zzS25fQpITy/nIb2y/FLppcbV/iZ+ae1ujgWGRoojhJ0FiYPhmbrR5ZBY2dKwEpok+QeYMfF3cEOkA4iFeuqtboUMgVqBh0zUn87iyTFRd6Xm49PwWZHDqtj/jtpdFn0CPoQPj2ilapjh9lYq/ArXMai5DUHJ7xnmtSITsyzUHakhYyIRXQqF2UWbDK3F7+Bx5d4rub1d4S2yqNUYA2eZ5CySeQz7CgvzaZT391axoqKUjjPpdUsm05zS003cDDwrzxmLnFi0/mtoJdGoO/FX9LXuvk8G3hgsDXBLSyTggRE0YM+feER8hPgjRBqbfubhdjUxR+VazwjcVO3pzt2nIkyKPStPXJZIf4cjCagTxQpC/UPMtcwWNo2gQjM2XSkWpj7DGS2E4738biV3mtKXGUXtMFVecxTL/qXy2qpLgy4dD3AG0Z7pE+eJ9qP5YRE6pxQeDlgbERg==", "update_date" : "" } { "_id" : ObjectId("5d6e1d807e592206a8c9a102"), "shard_id" : "shardId-00000001567497561747-03819eba", "father_id" : "shardId-00000001567481807517-935745a3", "seq_num" : "39136900000000000325557205", "status" : "in processing", "worker_id" : "unknown", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAFw/qdbPLjsXMlPalnhh55koia44yz6A1W2uwUyu/MzRUhaaqnI0gPM8ebVgy7dW7dDWLTh/WXYyDNNyXR3Hvk01IfEKDf+FSLMNvh2iELdrO5tRoLtZI2fxxrPZvudRc3KShX0Pvqy2YYwl4nlBR6QezHTWx5H2AU22MGPTx8aMRbjUgPwvgEExRgdzfhG6G9gkc7C71fIc98azwpSm/IW+mV/h/doFndme47k2v8g0GNJvgLSoET7HdJYH3XFdqh4QVDIP4sbz8X1cpN3y8AlT7Muk2/yXOdNeTL6tApuonCrUpJME9/qyBYQVI5dsAHnAWaP2Te3EAvz3ao7oNdnA8O6uz5VF9zFdN1OUHWM40kLUsX4sHve7McEzFLgf4NL1WTAnPN13cFhEm9BS8M7tiJqZ0OzgkbF1AWfq+xg/O6c57/Vvx/G/75DZ8XcWIABgGNkWBET/vLDrgjJQ0PUZJZKNmmbgKKTyHgSl4YOXNEeyH7l6atuc2WaREDjbf7lnQO5No11sz4g3O+AreBcpGVhdZNhGGcrG/wduPYEZfg2hG1sfYiSAM8GipUPMA0PM7JPIJmqCaY90JxRcI1By24tpp9Th35/5rLTGPYJZA==", "update_date" : "" }
其中status_table表中"status_value" : "incr_sync"
表示進入了增量階段。增量的每一個shard都會記錄一個checkpoint,關於具體shard分裂的規則能夠參考dynamodb的guan'fa官方文檔。下面是增量表checkpoint的各個字段的說明:架構
_id
:mongodb自帶主鍵idshard_id
:shard的id,每一個shard有一個惟一的idfather_id
:父shard的id,shard可能有一個父shard。seq_num
: 目前處理到的shard內部的sequence number,這個是主要的位點信息。status
: 目前同步的階段,一共有如下幾個狀態:併發
worker_id
:處理的worker id,目前暫未啓用iterator_type
:shard的遍歷方式shard_it
:shard的迭代器地址,次要位點信息。update_date
:checkpoint更新的時間戳根據默認的primary key建立一個惟一索引,而且根據partition key建立shard key。用戶本身的索引gsi目前不進行建立。
本小節主要介紹DynamoShake的部分架構細節
下圖是基本的一個table的數據同步架構圖(dynamo-shake會啓動多個併發線程tableSyncer進行拉取,用戶可控併發度),fetcher線程從源端dynamodb拉取數據後將數據推入隊列,緊接着parser線程從隊列中拿取數據並進行解析(dynamo協議轉bson),executor負責聚合部分數據並寫入mongodb。
FullDocumentParser
進行控制。其主要就是從隊列中讀取數據,並解析成bson結構。parser解析後,數據按條寫入executor的隊列。parser線程單獨獨立出來主要是出於解析比較耗CPU資源考慮。FullDocumentConcurrency
進行控制。executor從隊列中拉取,並進行batch聚合(聚合上限16MB,總條數1024)後寫入目的mongodb。增量總體架構以下:
Fetcher線程負責感知stream中shard的變化,Manager負責進行消息的通知,或者建立新的Dispatcher進行消息的處理,一個shard對應一個Dispatcher。Dispatcher從源端拉取增量數據,並經過Batcher進行數據解析和打包整合,而後經過executor進行寫入到MongoDB,同時會更新checkpoint。另外,若是是斷點續傳,那麼Dispatcher會從舊的checkpoint位點開始拉取,而不是從頭開始拉。
啓動:./dynamo-shake -conf=dynamo-shake.conf
,配置參數在dynamo-shake.conf中指定,如下是各個參數的意義:
DynamoFullCheck是一個用於校驗DynamoDB和MongoDB數據是否一致的工具,目前僅支持全量校驗,不支持增量,也就是說,若是增量同步階段,那麼源和目的是不一致的。
DynamoFullCheck只支持單向校驗,也就是校驗DynamoDB的數據是不是MongoDB的子集,反向不進行校驗。
另外,還支持抽樣校驗,支持只校驗感興趣的表。
校驗主要分爲如下幾部分:
精確校驗的時候,若是啓用抽樣,那麼會對每一個doc進行抽樣,判斷當前doc是否須要抽樣。原理比較簡單,好比按30%抽樣,那麼再0~100中產生一個隨機數,若是是0~30的就校驗,反之不校驗。
DynamoFullCheck因爲從源DynamoDB拉取也須要通過fetch,parse階段,因此必定程度上,該部分代碼複用了DynamoShake,不一樣的是DynamoFullCheck內部各個fetcher, parser, executor線程併發度都是1。
full-check參數稍微簡單點,直接用的命令行注入,例如:./dynamo-full-check --sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q --sourceSecretAccessKey=TwWV9reJCrZhHKSYfqtTaFHW0qRPvjXb3m8TYHMe --sourceRegion=ap-east-1 -t="10.1.1.1:30441" --sample=300
Usage: dynamo-full-check.darwin [OPTIONS] Application Options: -i, --id= target database collection name (default: dynamo-shake) -l, --logLevel= -s, --sourceAccessKeyID= dynamodb source access key id --sourceSecretAccessKey= dynamodb source secret access key --sourceSessionToken= dynamodb source session token --sourceRegion= dynamodb source region --qpsFull= qps of scan command, default is 10000 --qpsFullBatchNum= batch number in each scan command, default is 128 -t, --targetAddress= mongodb target address -d, --diffOutputFile= diff output file name (default: dynamo-full-check-diff) -p, --parallel= how many threads used to compare, default is 16 (default: 16) -e, --sample= comparison sample number for each table, 0 means disable (default: 1000) --filterCollectionWhite= only compare the given tables, split by ';' --filterCollectionBlack= do not compare the given tables, split by ';' -c, --convertType= convert type (default: raw) -v, --version print version Help Options: -h, --help Show this help message
咱們DynamoShake要不要開源,嗯……這個還沒定,敬請期待。
本文做者:燭昭
本文爲雲棲社區原創內容,未經容許不得轉載。