基於Tablestore Tunnel的數據複製實戰

前言

數據複製主要指經過互聯的網絡在多臺機器上保存相同數據的副本,經過數據複製方案,人們一般但願達到如下目的:1)使數據在地理位置上更接近用戶,進而下降訪問延遲;2)當部分組件出現故障時,系統依舊能夠繼續工做,提升可用性;3)擴展至多臺機器以同時提供數據訪問服務,從而提高讀吞吐量。
若是複製的數據一成不變,那麼數據複製就很是容易,只須要將數據複製到每一個節點,一次性便可搞定,面對持續更改的數據如何正確而有效的完成數據複製是一個不小的挑戰。html

使用DataX進行Tablestore數據複製

表格存儲(Tablestore)是阿里雲自研的NoSQL多模型數據庫,提供海量結構化數據存儲以及快速的查詢和分析服務,表格存儲的分佈式存儲和強大的索引引擎可以提供PB級存儲、千萬TPS以及毫秒級延遲的服務能力。DataX是阿里巴巴集團內被普遍使用的離線數據同步工具,DataX自己做爲數據同步框架,將不一樣數據源的同步抽象爲從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件。
經過使用DataX能夠完成Tablestore表的數據複製,以下圖所示,otsreader插件實現了從Tablestore讀取數據,並能夠經過用戶指定抽取數據範圍可方便的實現數據增量抽取的需求,otsstreamreader插件實現了Tablestore的增量數據導出,而otswriter插件則實現了向Tablestore中寫入數據。經過在DataX中配置Tablestore相關的Reader和Writer插件,便可以完成Tablestore的表數據複製。git

使用通道服務進行Tablestore數據複製github

通道服務(Tunnel Service)是基於表格存儲數據接口之上的全增量一體化服務。通道服務爲您提供了增量、全量、增量加全量三種類型的分佈式數據實時消費通道。經過爲數據表創建數據通道,能夠簡單地實現對錶中歷史存量和新增數據的消費處理。數據庫

藉助於全增量一體的通道服務,咱們能夠輕鬆構建高效、彈性的數據複製解決方案。本文將逐步介紹如何結合通道服務進行Tablestore的數據複製,完整代碼開源在github上的 tablestore-examples中。本次的實戰將基於通道服務的Java SDK來完成,推薦先閱讀下通道服務的相關文檔,包括快速開始等。網絡

1. 配置抽取

配置抽取其實對應的是數據同步所具有的功能,在本次實戰中,咱們將完成指定時間點以前的表數據同步,指定的時間點能夠是如今或者將來的某個時刻。具體的配置以下所示,ots-reader中記錄的是源表的相關配置,ots-writer中記錄的是目的表的相關配置。負載均衡

{
  "ots-reader": {
    "endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
    "instanceName": "zhuoran-high",
    "tableName": "testSrcTable",
    "accessId": "",
    "accessKey": "",
    "tunnelName": "testTunnel",
    "endTime": "2019-06-19 17:00:00"
  },
  "ots-writer": {
    "endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
    "instanceName": "zhuoran-search",
    "tableName": "testDstTable",
    "accessId": "",
    "accessKey": "",
    "batchWriteCount": 100
  }
}

ots-reader中各參數的說明以下:框架

  • endpoint: Tablestore服務的Endpoint地址,例如https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com。在進行數據複製前,請檢查下連通性(可使用curl命令)。
  • instanceName: Tablestore的實例名。
  • tableName: Tablestore的表名。
  • accessId: 訪問Tablestore的雲帳號accessId。
  • accessKey: 訪問Tablestore的雲帳號accessKey。
  • tunnelName: Tablestore的通道名,配置
  • endTime: 數據同步的截止時間點,對應到Java裏SimpleFormat的格式爲:yyyy-MM-dd HH:mm:ss 。

ots-writer中各參數的說明以下(略去相同的參數):curl

  • batchWriteCount: Tablestore單次批量寫入的條數,最大值爲200。

注:將來會開放更多的功能配置,好比指定時間範圍的數據複製等。分佈式

 

2. 編寫主邏輯

數據複製的主邏輯主要分爲如下4步,在第一次運行時,會完整的進行全部步驟,而在程序重啓或者斷點續傳場景時,只須要進行第3步和第4步。ide

  1. 建立複製目的表
    經過使用DesribeTable接口,咱們能夠獲取到源表的Schema,藉此能夠建立出目的表,值得注意的是須要把目的表的有效版本誤差設成一個足夠大的值(默認爲86400秒),由於服務端在處理寫請求時會對屬性列的版本號進行檢查,寫入的版本號須要在一個範圍內才能寫入成功,對於源表中的歷史存量數據而言,時間戳每每是比較小的,會被服務端過濾掉,最終致使同步數據的丟失。
sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
    config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),
    config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());
if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {
      System.out.println("Table is already exist: " + config.getWriteConf().getTableName());
} else {
    DescribeTableResponse describeTableResponse = sourceClient.describeTable(
        new DescribeTableRequest(config.getReadConf().getTableName()));
    describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
    describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
    CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
        describeTableResponse.getTableOptions(),
        new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
    destClient.createTable(createTableRequest);
    System.out.println("Create table success: " + config.getWriteConf().getTableName());
}
  1. 在源表上建立通道
    使用通道服務的CreateTunnel接口能夠建立通道,此處咱們建立全量加增量類型(TunnelType.BaseAndStream)類型的通道。
sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
    config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(
    new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();
String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {
    tunnelId = tunnelInfo.getTunnelId();
    System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
        config.getReadConf().getTunnelName(), tunnelId));
} else {
    CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
        new CreateTunnelRequest(config.getReadConf().getTableName(),
            config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
    System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());
}
  1. 啓動定時任務來監測備份進度
    備份進度的監測能夠經過DesribeTunnel接口來完成,DescribeTunnel接口能夠獲取到最新消費到的時間點,經過和配置裏的備份結束時間對比,咱們能夠獲取到當前同步的進度。在到達結束時間後,便可退出備份程序。
backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "background-checker-" + counter.getAndIncrement());
    }
});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
            config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
        ));
        // 已同步完成
        if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
            System.out.println("Table copy finished, program exit!");
            // 退出備份程序
            shutdown();
        }
    }
}, 0, 2, TimeUnit.SECONDS);
  1. 啓動數據複製
    啓動通道服務的自動化消費框架,開始自動化的數據同步,其中OtsReaderProcessor中完成的是源表數據的解析和目的表的寫入,處理邏輯將會在後文中介紹。
if (tunnelId != null) {
    sourceWorkerConfig = new TunnelWorkerConfig(
        new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
    sourceWorkerConfig.setHeartbeatIntervalInSec(15);
    sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
    sourceWorker.connectAndWorking();
}

 

3. 數據同步邏輯(OtsReaderProcessor)

使用通道服務,咱們須要編寫數據的Process邏輯和Shutdown邏輯,數據同步中的核心在於解析數據並將其寫入到目的表中,處理數據的完整代碼以下所示,主要邏輯仍是比較清晰的,首先會檢查數據的時間戳是否在合理的時間範圍內,而後將StreamRecord轉化爲BatchWrite裏對應的行,最後將數據串行寫入到目的表中。

public void process(ProcessRecordsInput input) {
    System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
    BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
    int count = 0;
    for (StreamRecord record : input.getRecords()) {
        if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
            System.out.println(String.format("skip record timestamp %d larger than endTime %d",
                record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
            continue;
        }
        count++;
        switch (record.getRecordType()) {
            case PUT:
                RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
                putChange.addColumns(getColumns(record));
                batchWriteRowRequest.addRowChange(putChange);
                break;
            case UPDATE:
                RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
                    record.getPrimaryKey());
                for (RecordColumn column : record.getColumns()) {
                    switch (column.getColumnType()) {
                        case PUT:
                            updateChange.put(column.getColumn());
                            break;
                        case DELETE_ONE_VERSION:
                            updateChange.deleteColumn(column.getColumn().getName(),
                                column.getColumn().getTimestamp());
                            break;
                        case DELETE_ALL_VERSION:
                            updateChange.deleteColumns(column.getColumn().getName());
                            break;
                        default:
                            break;
                    }
                }
                batchWriteRowRequest.addRowChange(updateChange);
                break;
            case DELETE:
                RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
                    record.getPrimaryKey());
                batchWriteRowRequest.addRowChange(deleteChange);
                break;
            default:
                break;
        }

        if (count == writeConf.getBatchWriteCount()) {
            System.out.println("BatchWriteRow: " + count);
            writeClient.batchWriteRow(batchWriteRowRequest);
            batchWriteRowRequest = new BatchWriteRowRequest();
            count = 0;
        }
    }

    // 寫最後一次的數據。
    if (!batchWriteRowRequest.isEmpty()) {
        System.out.println("BatchWriteRow: " + count);
        writeClient.batchWriteRow(batchWriteRowRequest);
    }
}

 

4. 技術註解

  1. 如何保障備份性能?
    備份過程分爲全量(存量)和增量階段,對於全量階段,通道服務會自動將全表的數據在邏輯上劃分紅接近指定大小的若干分片,全量階段的數據同步的總體並行度和分片數相關,可以有效的保障吞吐量。而對於增量階段,爲了保障數據的有序性,單分區內的數據咱們須要串行處理數據,增量階段的性能和分區數成正比關係(增量同步性能白皮書),若是須要提速(增長分區)能夠聯繫表格存儲技術支持。
  2. 如何作到數據同步的水平擴展?
    運行多個TunnelWorker(客戶端)對同一個Tunnel進行消費時(TunnelId相同), 在TunnelWorker執行Heartbeat時,通道服務端會自動的對Channel(分區)資源進行重分配,讓活躍的Channel儘量的均攤到每個TunnelWorker上,達到資源負載均衡的目的。同時,在水平擴展性方面,用戶能夠很容易的經過增長TunnelWorker的數量來完成,TunnelWorker能夠在同一個機器或者不一樣機器上。更多的原理能夠參見數據消費框架原理介紹
  3. 如何作到數據的最終一致性?
    數據的一致性創建在通道服務的保序協議基礎上,經過全量和增量數據同步的冪等性能夠保障備份數據的最終一致。
  4. 如何完成斷點續傳功能?
    通道服務的客戶端會按期將已同步(消費)完成的數據的時間位點按期發送到服務端進行持久化,在發生Failover或者重啓程序後,下一次的數據消費會從記錄的checkpoint開始數據處理,不會形成數據的丟失。

 

將來展望

在本次的實戰中,咱們結合通道服務完成一個簡潔而有效的數據複製方案,實現了指定時間點的表數據複製。藉助於本次的實戰樣例代碼,用戶僅須要配置源表和目的表的相關參數,便可以高效的完成的表數據的複製和數據的遷移。
在將來的演進中,通道服務還將支持建立指定時間段的通道,這樣能夠更加靈活的制定數據備份的計劃,也能夠完成持續備份和按時間點恢復等更加豐富的功能。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索