Table Store: 海量結構化數據實時備份實戰

數據備份簡介

在信息技術與數據管理領域,備份是指將文件系統或數據庫系統中的數據加以複製,一旦發生災難或者錯誤操做時,得以方便而及時地恢復系統的有效數據和正常運做。在實際備份過程當中,最好將重要數據製做三個或三個以上的備份,而且放置在不一樣的場所異地備援,以供往後回存之用。html

備份有兩個不一樣的目的,其主要的目的是在數據丟失後恢復數據,不管數據是被刪除仍是被損壞。備份的第二個目的是根據用戶定義的數據保留策略從較早的時間恢復數據,一般在備份應用程序中配置須要備份多長時間的數據副本。數據庫

因爲備份系統至少會包含一個被認爲值得保存的全部數據的副本,所以對數據存儲的要求可能會很高,組織此存儲空間和管理備份過程多是一項複雜的任務。現在,有許多不一樣類型的數據存儲設備可用於進行備份,還能夠經過許多不一樣的方式來安排這些設備以提供地理冗餘,數據安全性和可移植性。安全

在將數據發送到其存儲位置以前,會選擇,提取和操做它們,目前已經有許多不一樣的技術來優化備份過程,其中包括處理打開的文件(open files)和實時數據源的優化,以及壓縮,加密和重複數據刪除等。每一個備份方案都應包括演習過程,以驗證正在備份的數據的可靠性,更重要的是要認識到任何備份方案中涉及的限制和人爲因素。併發

Table Store備份需求分析

對於存儲系統而言,數據的安全可靠永遠是第一位的,要保障數據儘量高的可靠性,須要從兩個方面保障:app

  • 存儲系統自己的數據可靠性:表格存儲(Table Store)是阿里雲自研的面向海量結構化數據存儲的Serverless NoSQL多模型數據庫,提供了99.9999999%的數據可靠性保證,在業界屬於很是很是高的標準了。
  • 誤操做後能恢復數據:誤操做永遠都沒法避免,要作的是當誤操做發生的時候能儘快恢復,那麼就須要有備份數據存在。對於備份,有兩種方案,一個是部署同城或異地災備,這種代價高費用高,更多的用於社會基礎信息或金融信息。另外一種是將數據備份到另外一個價格更低廉的系統,當萬一出現誤操做的時候,能夠有辦法恢復就行。通常能夠選擇文件存儲系統,好比阿里雲OSS。

Table Store備份恢復方案簡介

下圖爲Table Store備份恢復的邏輯結構圖,基於全增量一體的通道服務咱們能夠很容易的構建一整套的數據備份和數據恢復方案,同時具有了實時增量備份能力和秒級別的恢復能力。只要提早配置好備份和恢復的計劃,整個備份恢復系統能夠作到徹底的自動化進行。框架

Table Store備份恢復方案實戰

目前表格存儲雖然未推出官方的備份和恢復功能,可是筆者會step-by-step的帶你們基於表格存儲的通道服務設計屬於本身的專屬備份恢復方案,實戰步驟會分爲備份和恢復兩部分。less

1. 備份函數

  • 預準備階段:須要肯定待備份的數據源和備份的目的源,在這次的實戰中,分別對應着TableStore的表和OSS的Bucket。
  • 肯定備份計劃和策略
  • 使用通道服務SDK編寫代碼
  • 執行備份策略並監測備份過程

2. 恢復性能

  • 執行文件恢復

肯定備份計劃和策略

備份策略須要肯定備分內容、備份時間和備份方式,目前常見的備份策略主要有如下幾類。優化

  • 全量備份(Full Backup): 把硬盤或數據庫內的全部文件、文件夾或數據進行一次性的複製。
  • 增量備份(Incremental Backup): 對上一次的全量備份或增量備份後更新的數據進行的備份。
  • 差別備份(Differential Backup): 提供運行全量備份後變動文件的備份。
  • 選擇式備份:對系統數據的一部分進行的備份。
  • 冷備份:系統處於停機或維護狀態下的備份。這種狀況下,備份的數據與系統該時段的數據應該徹底一致。
  • 熱備份:系統處於正常運轉狀態下的備份。這種狀況下,因爲系統中的數據可能隨時在更新,備份的數據相對於系統的真實數據可能會有必定的滯後。

在這次的實戰中,筆者對於備份計劃和策略的選擇以下(這裏能夠根據本身的需求進行自定義設計,而後利用通道服務的SDK來完成相應的需求)。

  • 備分內容:TableStore 數據表
  • 備份時間:

    • 全量備份按期執行(時間可調,默認爲一週)。
    • 增量備份根據配置按期執行,表格存儲的通道服務能夠保障數據的嚴格有序性,每一個增量文件是流式append的,可隨時被消費。
  • 備份方式:全量備份+增量備份,熱備份。

使用通道服務SDK編寫代碼

這部分會結合代碼片斷的形式講解,詳細的代碼後續會開源出去(連接會更新在本文中),盡請期待。在進行實戰以前,推薦先閱讀通道服務Java SDK的 使用文檔

  1. 建立一個全量+增量類型的Tunnel,這裏可使用SDK或者官網控制檯進行建立。  
private static void createTunnel(TunnelClient client, String tunnelName) {
    CreateTunnelRequest request = new CreateTunnelRequest(TableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = client.createTunnel(request);
    System.out.println("RequestId: " + resp.getRequestId());
    System.out.println("TunnelId: " + resp.getTunnelId());
}
  1. 瞭解通道服務自動化數據處理框架 
    在通道服務的快速開始裏,咱們能夠看到用戶只須要傳入處理數據的process函數和shutdown函數便可完成自動化的數據處理。在process函數的入參中會帶有List,而StreamRecord包裝的正是TableStore裏每一行的數據,包括Record的類型,主鍵列,屬性列,用戶寫入Record的時間戳等。
  2. 設計TableStore每一行數據的持久化格式 
    本次實戰中咱們使用CSV文件格式,固然你也能夠用pb或者其它格式,CSV的優勢是可讀性比較強,每一行數據會對應CSV文件的一行,持久化的格式以下圖所示,CSV文件會有四列, TimeStamp列是數據寫入TableStore的時間戳(全量時都爲0,增量備份爲具體的時間戳),RecordType是數據行的操做類型(PUT, UPDATE和DELETE),PrimaryKey爲主鍵列的JSON字符串表示,RecordColumns爲屬性列的JSON字符串表示。

轉換部分的核心代碼參見以下代碼片斷,筆者處理這部分用的是univocity-parsers(CSV)和Gson庫,有幾個地方須要特別注意下:1). Gson反序列化Long類型會轉爲Number類型,可能會形成進度丟失,有若干解決辦法,筆者採用的是將其轉爲String類型序列化。2). 對於binary類型的數據的特殊處理,筆者進行了base64的編解碼。3). 能夠直接流式寫入OSS中,減小本地持久化的消耗。

this.gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter())
           .setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
// ByteArrayOutputStream到ByteArrayInputStream會有一次array.copy, 可考慮用管道或者NIO channel.
public void streamRecordsToOSS(List<StreamRecord> records, String bucketName, String filename, boolean isNewFile) {
   if (records.size() == 0) {
       LOG.info("No stream records, skip it!");
       return;
   }
   try {
       CsvWriterSettings settings = new CsvWriterSettings();
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       CsvWriter writer = new CsvWriter(out, settings);
       if (isNewFile) {
           LOG.info("Write csv header, filename {}", filename);
           List<String> headers = Arrays.asList(RECORD_TIMESTAMP, RECORD_TYPE, PRIMARY_KEY, RECORD_COLUMNS);
           writer.writeHeaders(headers);
           System.out.println(writer.getRecordCount());
       }
       List<String[]> totalRows = new ArrayList<String[]>();
       LOG.info("Write stream records, num: {}", records.size());
       for (StreamRecord record : records) {
           String timestamp = String.valueOf(record.getSequenceInfo().getTimestamp());
           String recordType = record.getRecordType().name();
           String primaryKey = gson.toJson(
               TunnelPrimaryKeyColumn.genColumns(record.getPrimaryKey().getPrimaryKeyColumns()));
           String columns = gson.toJson(TunnelRecordColumn.genColumns(record.getColumns()));
           totalRows.add(new String[] {timestamp, recordType, primaryKey, columns});
       }
       writer.writeStringRowsAndClose(totalRows);

       // write to oss file
       ossClient.putObject(bucketName, filename, new ByteArrayInputStream(out.toByteArray()));
   } catch (Exception e) {
       e.printStackTrace();
   }
}

執行備份策略並監測備份過程

  1. 運行通道服務的自動化數據框架,掛載上一步中設計的備份策略代碼。
public class TunnelBackup {
    private final ConfigHelper config;
    private final SyncClient syncClient;
    private final CsvHelper csvHelper;
    private final OSSClient ossClient;

    public TunnelBackup(ConfigHelper config) {
        this.config = config;
        syncClient = new SyncClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),
            config.getInstanceName());
        ossClient = new OSSClient(config.getOssEndpoint(), config.getAccessId(), config.getAccessKey());
        csvHelper = new CsvHelper(syncClient, ossClient);
    }

    public void working() {
        TunnelClient client = new TunnelClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),
            config.getInstanceName());
        OtsReaderConfig readerConfig = new OtsReaderConfig();
        TunnelWorkerConfig workerConfig = new TunnelWorkerConfig(
            new OtsReaderProcessor(csvHelper, config.getOssBucket(), readerConfig));
        TunnelWorker worker = new TunnelWorker(config.getTunnelId(), client, workerConfig);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            worker.shutdown();
            client.shutdown();
        }
    }

    public static void main(String[] args) {
        TunnelBackup tunnelBackup = new TunnelBackup(new ConfigHelper());
        tunnelBackup.working();
    }
}
  1. 監測備份過程
    備份過程的監控能夠經過表格存儲控制檯或者 DescribeTunnel 接口完成,DescribeTunnel接口能夠實時獲取當前Tunnel下每個Channel的Client(能夠自定義), 消費總行數和消費位點等信息。好比用戶有下午2點-3點的數據須要同步,若DescribeTunnel獲取到的消費位點爲下午2點半,則表示備份到一半了,若獲取到的消費位點爲下午3點,則表明2點-3點的數據已經備份完畢了。

執行文件恢復

在數據備份完成後,咱們能夠根據需求進行全量的恢復和部分數據的恢復,來下降RTO。在恢復數據時,能夠有一些措施來優化恢復的性能:

  • 從OSS下載時,可使用流式下載或者分片下載的方式,必要時能夠增長併發。
  • 寫入TableStore時,可使用併發BatchWrite的方式。
    下圖爲Restore的實例代碼(略去若干細節),首先根據需求算出須要下載的文件名(和備份策略對應),而後用OSS的流式下載讀取相應的文件,最後用併發的BatchWrite方式寫入TableStore的恢復表中。
public class TunnelRestore {
    private ConfigHelper config;
    private final SyncClient syncClient;
    private final CsvHelper csvHelper;
    private final OSSClient ossClient;

    public TunnelRestore(ConfigHelper config) {
        this.config = config;
        syncClient = new SyncClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),
            config.getInstanceName());
        ossClient = new OSSClient(config.getOssEndpoint(), config.getAccessId(), config.getAccessKey());
        csvHelper = new CsvHelper(syncClient, ossClient);
    }

    public void restore(String filename, String tableName) {
        csvHelper.parseStreamRecordsFromCSV(filename, tableName);
    }

    public static void main(String[] args) {
        TunnelRestore restore = new TunnelRestore(new ConfigHelper());
        restore.restore("FullData-1551767131130.csv", "testRestore");
    }
}

總結

本文首先介紹了Table Store備份的需求,接着介紹了使用表格存儲的通道服務進行數據備份和恢復的原理,最後結合實際的代碼片斷講述瞭如何一步步的構建Tabel Store數據備份恢復方案。

參考連接

  1. 4 Steps to Create Your Backup Plan
  2. https://en.wikipedia.org/wiki/Backup



本文做者:琸然

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索