使用DataX同步MaxCompute數據到TableStore(原OTS)優化指南

概述python

如今愈來愈多的技術架構下會組合使用MaxCompute和TableStore,用MaxCompute做大數據分析,計算的結果會導出到TableStore提供在線訪問。MaxCompute提供海量數據計算的能力,而TableStore提供海量數據高併發低延遲讀寫的能力。git

 MaxCompute內數據導出至TableStore,目前可選的幾種主要途徑包括:github

本身編寫工具:使用MaxCompute SDK經過Tunnel讀取表數據,再經過TableStore SDK再寫入數據。
DataX:本身在服務器上託管執行DataX任務。
使用數據集成服務:其系統底層也是DataX,額外提供了服務化以及分佈式的能力。
其中第二種是咱們最常推薦給用戶作臨時的數據導出使用的,若是沒有須要對數據作特殊處理的需求,咱們通常不推薦第一種途徑。json

DataX在阿里集團內部已經應用了不少年,經歷了屢次雙十一的考驗,是一個穩定、易用、高效的工具。隨着MaxCompute上結果數據愈來愈龐大,數據導出的速率愈來愈被看重,海量的數據須要在基線內完成導出。本篇文章,主要會介紹幾種優化手段,以提升使用DataX來進行MaxCompute向TableStore數據導出的吞吐量。後端

優化過程緩存

咱們會以實際的場景,來演示如何經過一步步的優化,提高數據導出的速度。在數據導出的整個鏈路上,主要有三個環節,一是MaxCompute數據通道的讀,二是DataX的數據交換,三是TableStore的在線寫,這三個環節任意一個成爲瓶頸,都會影響導出的速度。服務器

MaxCompute數據通道的讀的性能比較高,通常不會成爲瓶頸,本文主要是針對後兩個環節來優化。優化的核心指導方針就是:1. 提升併發,2. 下降寫入延遲。接下來列舉的幾種優化手段,也是圍繞這兩點,來不斷進行優化。架構

實驗選擇使用TableStore的測試環境,在MaxCompute上,咱們會建立一張表並準備1億行數據。TableStore的測試環境規模以及DataX Job宿主機的規格都較小,因此整個實驗最終達到的速率是比較小的,主要爲了演示速率如何提高。而在真實的TableStore生產環境上,規模足夠的狀況下,咱們幫助過應用優化到每秒上百M甚至上G的速度,優化手段相同。併發

數據準備
首先在MaxCompute內建立以下表:jvm

md5 string,
  userid string,
  name string,
  comments string,
  attr0 string,
  attr1 string,
  attr2 string,
  attr3 string,
  create_time string,
  udpate_time string
);

其次在表內倒入1億行數據,每行數據約200個字節,其中userid列採用隨機值,計算出的md5值取4個字節做爲md5列,數據樣例以下:

測試數據導入使用的是MaxCompute Tunnel,速度仍是比較可觀的。

數據準備完畢後,在TableStore上建立一張表,使用md5和userid做爲主鍵列:

TableMeta tableMeta = new TableMeta("DataTable");
  tableMeta.addPrimaryKeyColumn("md5", PrimaryKeyType.STRING);
  tableMeta.addPrimaryKeyColumn("userid", PrimaryKeyType.STRING);

  CapacityUnit capacityUnit = new CapacityUnit(0, 0);

  CreateTableRequest request = new CreateTableRequest();
  request.setTableMeta(tableMeta);
  request.setReservedThroughput(capacityUnit);

  ots.createTable(request);

表和數據均準備完畢後,使用以下DataX Job配置類進行一次數據導出:

"job": {
        "setting": {
            "speed": {
                "channel": "1"
            }
        },
        "content": [
            {
                "reader": {
                    "name": "odpsreader",
                    "parameter": {
                        "accessId": "accessid",
                        "accessKey": "accesskey",
                        "project": "aliyun_ots_dev",
                        "table": "data_for_ots",
                        "partition": [],
                        "column": ["md5","userid","name","comments","attr0","attr1","attr2","attr3","create_time","udpate_time"],
                        "packageAuthorizedProject": "",
                        "splitMode": "record",
                        "odpsServer": "****",
                        "tunnelServer": "****"
                    }
                },
                "writer": {
                    "name": "otswriter",
                    "parameter": {
                        "endpoint":"http://data-import-test.cn-hangzhou.ots.aliyuncs.com",
                        "accessId":"accessid",
                        "accessKey":"accesskey",
                        "instanceName":"data-import-test",
                        "table":"DataTable",
                        "primaryKey":[
                            {"name":"md5", "type":"string"},
                            {"name":"userid", "type":"string"}
                        ],
                        "column":[
                            {"name":"name","type":"string"},
                            {"name":"comments","type":"string"},
                            {"name":"attr0","type":"string"},
                            {"name":"attr1","type":"string"},
                            {"name":"attr2","type":"string"},
                            {"name":"attr3","type":"string"},
                            {"name":"create_time","type":"string"},
                            {"name":"update_time","type":"string"}
                        ],
                        "writeMode":"UpdateRow"
                    }
                }
            }
        ]
    }
}

啓動DataX任務,從標準輸出中能夠看到當前數據導出的速度:

2017-02-07 08:41:49.285 [job-0] INFO  StandAloneJobContainerCommunicator - Total 271520 records, 55194052 bytes | Speed 1.05MB/s, 5404 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 4.501s |  All Task WaitReaderTime 47.815s | Percentage 0.00%
2017-02-07 08:41:59.286 [job-0] INFO  StandAloneJobContainerCommunicator - Total 324640 records, 65992457 bytes | Speed 1.03MB/s, 5312 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 5.474s |  All Task WaitReaderTime 55.068s | Percentage 0.00%
2017-02-07 08:42:09.288 [job-0] INFO  StandAloneJobContainerCommunicator - Total 377600 records, 76758462 bytes | Speed 1.03MB/s, 5296 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 6.479s |  All Task WaitReaderTime 62.297s | Percentage 0.00%
2017-02-07 08:42:19.289 [job-0] INFO  StandAloneJobContainerCommunicator - Total 431072 records, 87628377 bytes | Speed 1.04MB/s, 5347 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 7.469s |  All Task WaitReaderTime 69.559s | Percentage 0.00%
2017-02-07 08:42:29.290 [job-0] INFO  StandAloneJobContainerCommunicator - Total 484672 records, 98524462 bytes | Speed 1.04MB/s, 5360 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 8.421s |  All Task WaitReaderTime 76.892s | Percentage 0.00%
2017-02-07 08:42:39.292 [job-0] INFO  StandAloneJobContainerCommunicator - Total 538144 records, 109394175 bytes | Speed 1.04MB/s, 5347 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 9.428s |  All Task WaitReaderTime 83.889s | Percentage 0.00%

能夠看到,當前的速度大約是1MB/s,接下來會演示如何進行優化,一步一步將速度給提高上去。

一:配置合理的DataX基礎參數
第一步是對DataX的幾個基礎參數進行調優,先大體瞭解下一個DataX Job內部,任務的運行結構:

一個DataX Job會切分紅多個Task,每一個Task會按TaskGroup進行分組,一個Task內部會有一組Reader->Channel->Writer。Channel是鏈接Reader和Writer的數據交換通道,全部的數據都會經由Channel進行傳輸。

在DataX內部對每一個Channel會有嚴格的速度控制,默認的速度限制是1MB/s,這也是爲什麼咱們使用默認配置,速度爲1MB/s的緣由。因此第一個須要優化的基礎參數就是單個Channel的速度限制,更改配置以下:

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        ...
    }
}

咱們把單個Channel的速度上限配置爲5MB。這個值須要針對不一樣的場景進行不一樣的配置,例如對於MaxCompute,單個Channel的速度能夠達到幾十MB,對於TableStore,在列較小較多的場景下,單個Channel的速度是幾MB,而在列較大的場景下,可能速度就會上到幾十MB。

咱們當前默認配置中配置啓動的Job內Channel數爲1,要提升速度,併發必須提升,這個是第二步要作的優化。可是在作第二個優化以前,還須要調整一個基礎參數,那就是DataX Job啓動的JVM的內存大小配置。

目前DataX啓動的JVM默認的配置是"-Xms1g -Xmx1g",當一個Job內Channel數變多後,內存的佔用會顯著增長,由於DataX做爲數據交換通道,在內存中會緩存較多的數據,例如Channel中會有一個Buffer,做爲臨時的數據交換的緩衝區,而在部分Reader和Writer的中,也會存在一些Buffer。

調整JVM參數的方式有兩種,一種是直接更改datax.py,另外一種是在啓動的時候,加上對應的參數,以下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" ots.json

一般咱們建議將內存設置爲4G或者8G,這個也能夠根據實際狀況來調整。

在優化完單Channel的限速和JVM的內存參數以後,咱們從新跑一下任務:

2017-02-07 08:44:53.188 [job-0] INFO  StandAloneJobContainerCommunicator - Total 153920 records, 31289079 bytes | Speed 1.67MB/s, 8608 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 2.873s |  All Task WaitReaderTime 12.098s | Percentage 0.00%
2017-02-07 08:45:03.189 [job-0] INFO  StandAloneJobContainerCommunicator - Total 256064 records, 52051995 bytes | Speed 1.98MB/s, 10214 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 4.892s |  All Task WaitReaderTime 17.194s | Percentage 0.00%
2017-02-07 08:45:13.191 [job-0] INFO  StandAloneJobContainerCommunicator - Total 360864 records, 73356370 bytes | Speed 2.03MB/s, 10480 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 9.221s |  All Task WaitReaderTime 19.192s | Percentage 0.00%
2017-02-07 08:45:23.192 [job-0] INFO  StandAloneJobContainerCommunicator - Total 464384 records, 94400221 bytes | Speed 2.01MB/s, 10352 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 11.754s |  All Task WaitReaderTime 22.278s | Percentage 0.00%
2017-02-07 08:45:33.194 [job-0] INFO  StandAloneJobContainerCommunicator - Total 570176 records, 115905214 bytes | Speed 2.05MB/s, 10579 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 14.827s |  All Task WaitReaderTime 25.367s | Percentage 0.00%
2017-02-07 08:45:43.195 [job-0] INFO  StandAloneJobContainerCommunicator - Total 675328 records, 137281049 bytes | Speed 2.04MB/s, 10515 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 18.515s |  All Task WaitReaderTime 27.810s | Percentage 0.00%
2017-02-07 08:45:53.197 [job-0] INFO  StandAloneJobContainerCommunicator - Total 778752 records, 158304152 bytes | Speed 2.00MB/s, 10342 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 20.403s |  All Task WaitReaderTime 32.418s | Percentage 0.00%

當前數據導出的速度已經從1MB提高到2MB。

二:提高DataX Job內Channel併發
在上一點中指出,當前Job內部,只有單個Channel在執行導出任務,而要提高速率,要作的就是提高Channel的併發數。

DataX內部對每一個Channel會作限速,能夠限制每秒byte數,也能夠限制每秒record數。除了對每一個Channel限速,在全局還會有一個速度限制的配置,默認是不限。

提高Channel併發數有三種途徑:

1, 配置全局Byte限速以及單Channel Byte限速,Channel個數 = 全局Byte限速 / 單Channel Byte限速。(下面示例中最終Channel個數爲10)

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 10485760
            }
        },
        ...
    }
}

2,配置全局Record限速以及單Channel Record限速,Channel個數 = 全局Record限速 / 單Channel Record限速。(下面示例中最終Channel個數爲3)

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "record": 100
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "record" : 300
            }
        },
        ...
    }
}

3, 全局不限速,直接配置Channel個數。(下面示例中最終Channel個數爲5)

"core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel" : 5
            }
        },
        ...
    }
}

第三種方式最簡單直接,可是這樣就缺乏了全局的限速。在選擇Channel個數時,一樣須要注意,Channel個數並非越多越好。Channel個數的增長,帶來的是更多的CPU消耗以及內存消耗。若是Channel併發配置太高致使JVM內存不夠用,會出現的狀況是發生頻繁的Full GC,導出速度會驟降,拔苗助長。

能夠在DataX的輸出日誌中,找到本次任務的Channel的數:

2017-02-07 13:27:45.016 [job-0] INFO  JobContainer - DataX Reader.Job [odpsreader] splits to [15] tasks.
2017-02-07 13:27:45.017 [job-0] INFO  OtsWriterMasterProxy - Begin split and MandatoryNumber : 15
2017-02-07 13:27:45.025 [job-0] INFO  OtsWriterMasterProxy - End split.
2017-02-07 13:27:45.025 [job-0] INFO  JobContainer - DataX Writer.Job [otswriter] splits to [15] tasks.

在咱們此次實驗中,咱們把Channel數直接配置爲10,再進行一次導出:

2017-02-07 08:58:24.366 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2465984 records, 501286700 bytes | Speed 9.19MB/s, 47414 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 362.875s |  All Task WaitReaderTime 378.978s | Percentage 0.00%
2017-02-07 08:58:34.368 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2941792 records, 598009404 bytes | Speed 9.22MB/s, 47580 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 459.910s |  All Task WaitReaderTime 379.002s | Percentage 0.00%
2017-02-07 08:58:44.369 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3436064 records, 698484741 bytes | Speed 9.58MB/s, 49427 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 556.324s |  All Task WaitReaderTime 379.026s | Percentage 0.00%
2017-02-07 08:58:54.371 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3905856 records, 793982836 bytes | Speed 9.11MB/s, 46979 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 652.749s |  All Task WaitReaderTime 379.050s | Percentage 0.00%
2017-02-07 08:59:04.372 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4384512 records, 891284760 bytes | Speed 9.28MB/s, 47865 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 749.464s |  All Task WaitReaderTime 379.074s | Percentage 0.00%
2017-02-07 08:59:14.373 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4875136 records, 991017582 bytes | Speed 9.51MB/s, 49062 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 846.522s |  All Task WaitReaderTime 379.098s | Percentage 0.00%

能夠看到在Channel數從1提高到10以後,速度從2MB/s提高到了9MB/s。此時若再提升Channel數到15,速度已經不見漲,而從服務端監控看,每批次導入的寫入延遲確在漲,說明當前瓶頸在TableStore寫入端。

三:對TableStore表進行預分區,並進一步提高DataX Channel併發

在上面幾個優化作完後,DataX數據交換這一環節已經不是瓶頸,當前瓶頸在TableStore端的寫入能力上。TableStore是分佈式的存儲,一張大表會被切分紅不少的分區,分區會分散到後端的各個物理機上提供服務。一張新建立的表,默認分區數爲1,當這張表愈來愈大,TableStore會將其分裂,此時分裂是自動完成的。分區的個數,必定程度上與能提供的服務能力相關。某些業務場景,新建表後,就須要對錶進行大規模的數據導入,此時默認的單個分區確定是不夠用的,固然能夠等數據量慢慢漲上來後等表自動分裂,可是這個週期會比較長。此時,咱們推薦的作法是在建立表的時候進行預分區。

不過目前咱們尚未對外開放經過SDK來進行預分區的功能,因此若是須要對錶進行預分區,能夠先經過工單來聯繫咱們幫助進行預分區。

咱們新建一張表,並將表預分4個分區,partition key爲md5列,採用md5列的主要緣由是在其上數據的分區基本是均勻的。若是數據在partition key分佈不均勻,則即便作了預分區,導入性能也不會獲得明顯的提高。以相同的Job配置,再跑一下導出任務:

2017-02-08 13:48:18.692 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11395424 records, 2316456451 bytes | Speed 18.79MB/s, 96940 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 666.003s |  All Task WaitReaderTime 336.048s | Percentage 0.00%
2017-02-08 13:48:28.693 [job-0] INFO  StandAloneJobContainerCommunicator - Total 12340192 records, 2508508780 bytes | Speed 18.32MB/s, 94476 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 716.743s |  All Task WaitReaderTime 349.424s | Percentage 0.00%
2017-02-08 13:48:38.694 [job-0] INFO  StandAloneJobContainerCommunicator - Total 13197472 records, 2682776109 bytes | Speed 16.62MB/s, 85728 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 776.487s |  All Task WaitReaderTime 359.132s | Percentage 0.00%
2017-02-08 13:48:48.695 [job-0] INFO  StandAloneJobContainerCommunicator - Total 14085856 records, 2863367678 bytes | Speed 17.22MB/s, 88838 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 826.191s |  All Task WaitReaderTime 378.034s | Percentage 0.00%
2017-02-08 13:48:58.696 [job-0] INFO  StandAloneJobContainerCommunicator - Total 15063328 records, 3062065378 bytes | Speed 18.95MB/s, 97747 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 867.363s |  All Task WaitReaderTime 401.640s | Percentage 0.00%
2017-02-08 13:49:08.697 [job-0] INFO  StandAloneJobContainerCommunicator - Total 15908736 records, 3233917750 bytes | Speed 16.39MB/s, 84540 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 921.193s |  All Task WaitReaderTime 418.862s | Percentage 0.00%

此時速度從9MB/s提高到18MB/s左右,在TableStore服務端可以提升更多的服務能力後,咱們嘗試再將Channel的併發從10提升到15:

2017-02-08 13:51:54.546 [job-0] INFO  StandAloneJobContainerCommunicator - Total 8194848 records, 1665844036 bytes | Speed 20.97MB/s, 108160 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 884.016s |  All Task WaitReaderTime 263.742s | Percentage 0.00%
2017-02-08 13:52:04.547 [job-0] INFO  StandAloneJobContainerCommunicator - Total 9351040 records, 1900875263 bytes | Speed 22.41MB/s, 115619 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,007.206s |  All Task WaitReaderTime 263.789s | Percentage 0.00%
2017-02-08 13:52:14.548 [job-0] INFO  StandAloneJobContainerCommunicator - Total 10460064 records, 2126318844 bytes | Speed 21.50MB/s, 110902 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,140.113s |  All Task WaitReaderTime 263.824s | Percentage 0.00%
2017-02-08 13:52:24.549 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11662112 records, 2370669233 bytes | Speed 23.30MB/s, 120204 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,269.070s |  All Task WaitReaderTime 263.863s | Percentage 0.00%
2017-02-08 13:52:34.550 [job-0] INFO  StandAloneJobContainerCommunicator - Total 12874240 records, 2617069638 bytes | Speed 23.50MB/s, 121212 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 1,396.991s |  All Task WaitReaderTime 263.913s | Percentage 0.00%

此時速度又進一步提高,從18MB/s提高到22MB/s左右。

四:提升每次批量寫行數

咱們構建的場景,每行大約是200字節左右大小。DataX的OTSWriter寫入插件底層是使用的TableStore SDK提供的BatchWrite接口進行數據寫入,默認一次請求寫入100行數據,也就是說一次請求只會導入約20KB大小的數據。每次寫過來的數據包都比較小,很是的不經濟。

當前TableStore的BatchWrite的限制比較不靈活,會限制行數和數據大小,其中行數默認上限是200行。在每行都比較小的場景下,200行一次批量寫入是很是不經濟的,在咱們的此次實驗中,咱們將上限改成1000行,並將DataX TableStore寫入插件內部一次批量寫入的行數也改成1000行,來驗證將每次寫入的包變大後,對寫入效率的提高。任務配置更改以下(配置項爲job.content.writer.parameter.batchWriteCount):

"job": {
        "content": [
            {
                "reader": {
                    ...
                },
                "writer": {
                    "name": "otswriter",
                    "parameter": {
                        "batchWriteCount":1000,
                        ...
                    }
                }
            }
        ]
    }
}

再次執行任務,速度以下:

2017-02-08 13:55:16.924 [job-0] INFO  StandAloneJobContainerCommunicator - Total 11413216 records, 2320073926 bytes | Speed 29.44MB/s, 151849 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 72.662s |  All Task WaitReaderTime 1,030.787s | Percentage 0.00%
2017-02-08 13:55:36.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 14462240 records, 2939879188 bytes | Speed 29.55MB/s, 152451 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 85.228s |  All Task WaitReaderTime 1,297.655s | Percentage 0.00%
2017-02-08 13:55:46.927 [job-0] INFO  StandAloneJobContainerCommunicator - Total 15979552 records, 3248317815 bytes | Speed 29.41MB/s, 151731 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 89.841s |  All Task WaitReaderTime 1,432.022s | Percentage 0.00%
2017-02-08 13:55:56.928 [job-0] INFO  StandAloneJobContainerCommunicator - Total 17488864 records, 3555129299 bytes | Speed 29.26MB/s, 150931 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 100.300s |  All Task WaitReaderTime 1,558.120s | Percentage 0.00%
2017-02-08 13:56:06.929 [job-0] INFO  StandAloneJobContainerCommunicator - Total 19018240 records, 3866017412 bytes | Speed 29.65MB/s, 152937 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 106.391s |  All Task WaitReaderTime 1,691.072s | Percentage 0.00%

速度再次提高,從22MB/s提高到29MB/s。TableStore後續會優化對BatchWrite的行數限制,對於行比較小的場景採用一個比較友好的策略。

五:MaxCompute表分區,提升DataX Job併發
以上優化策略都是在單個DataX Job的場景下進行的優化,單個DataX Job只可以運行在單臺服務器上,沒有辦法分佈式的執行。D2上的託管服務器,通常是千兆網卡,也就是說最多提供100MB/s的速度。若想要進一步的速度提高,則必須採用多個DataX Job分佈在多臺服務器上執行才行。

DataX內的ODPSReader,能夠經過配置一次導出整張表或者表的某個Partition。咱們能夠利用Partition,來將一張表拆分紅多個Job分散導出,可是要求表必須是多分區的。
在咱們的實驗中,建立的MaxCompute表並非多分區的,咱們從新建立一張多分區的表:

md5 string,
    userid string,
    name string,
    comments string,
    attr0 string,
    attr1 string,
    attr2 string,
    attr3 string,
    create_time string,
    udpate_time string
)
PARTITIONED BY (
    partid string
)

增長一列爲partid,做爲分區,咱們經過一個SQL將原表的數據導入到新表,並自動均勻的分散到partid:

attr0, attr1, attr2, attr3, create_time, udpate_time, SUBSTR(md5, 1, 1) from data_for_ots;

以上SQL會將partid的值取自md5列的第一個字符,md5是一個十六進制的值,字符的取值範圍是:0-f,這樣咱們就將原表切成了一個帶16個分區的表。咱們但願在每一個分區內,數據都是均勻的,爲了不長尾,這也是爲何要設計一個md5列的緣由。

在將一張表拆成多個分區後,咱們就能夠選擇在不一樣的服務器上,爲每一個分區啓動一個任務,配置以下(job.content.reader.parameter.partition):

"job": {
          "content": [
              {
                  "reader": {
                      "name": "odpsreader",
                      "parameter": {
                          ...
                          "partition": ["partid=0"],
                          ...
                      }
                  },
                  "writer": {
                      ...
                  }
              }
          ]
      }
  }

因爲測試集羣規模的緣由,咱們不演示多個Job併發後的速度提高。在TableStore服務端能力不是瓶頸的狀況下,經過擴展DataX Job的併發,速度是能線性提高的。

END

總結下上面的幾個優化點:

對DataX的幾個基本參數進行調整,包括:Channel數、單個Channel的限速以及JVM的內存參數。
建立TableStore表的時候儘可能採起預分區,在設計partition key的時候儘可能保證在每一個partition key上導入數據的分佈均勻。
若是導入TableStore的數據行都比較小,則須要考慮提升單批次的導入行數。
若單個DataX Job已成瓶頸,則須要考慮將任務拆成多個DataX Job並行執行。
但願以上經驗對各位有用,歡迎交流。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索