Tunnel是MaxCompute提供的離線批量數據通道服務,主要提供大批量離線數據上傳和下載,
僅提供每次批量大於等於64MB數據的場景,小批量流式數據場景請使用DataHub實時數據通道以得到更好的性能和體驗。java
import java.io.IOException; import java.util.Date; import com.aliyun.odps.Column; import com.aliyun.odps.Odps; import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.TableSchema; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TunnelException; import com.aliyun.odps.tunnel.TableTunnel.UploadSession; public class UploadSample { private static String accessId = "<your access id>"; private static String accessKey = "<your access Key>"; private static String odpsUrl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; public static void main(String args[]) { // 準備工做,僅需作一次 Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setEndpoint(odpsUrl); odps.setDefaultProject(project); TableTunnel tunnel = new TableTunnel(odps); try { // 肯定寫入分區 PartitionSpec partitionSpec = new PartitionSpec(partition); // 在服務端建立一個在本表本分區上有效期24小時的session,24小時內該session能夠共計上傳20000個Block數據 // 建立Session的時耗爲秒級,會在服務端使用部分資源、建立臨時目錄等,操做較重,所以強烈建議同一個分區數據儘量複用Session上傳。 UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec); System.out.println("Session Status is : " + uploadSession.getStatus().toString()); TableSchema schema = uploadSession.getSchema(); // 準備數據後打開Writer開始寫入數據,準備數據後寫入一個Block,每一個Block僅能成功上傳一次,不可重複上傳,CloseWriter成功表明該Block上傳完成,失敗能夠從新上傳該Block,同一個Session下最多容許20000個BlockId,即0-19999,若超出請CommitSession而且再建立一個新Session使用,以此類推。 // 單個Block內寫入數據過少會產生大量小文件 嚴重影響計算性能, 強烈建議每次寫入64MB以上數據(100GB之內數據都可寫入同一Block) // 可經過數據的平均大小與記錄數量大體計算總量即 64MB < 平均記錄大小*記錄數 < 100GB // maxBlockID服務端限制爲20000,用戶能夠根據本身業務需求,每一個Session使用必定數量的block例如100個,可是建議每一個Session內使用block越多越好,由於建立Session是一個很重的操做 // 若是建立一個Session後僅僅上傳少許數據,不只會形成小文件、空目錄等問題,還會嚴重影響上傳總體性能(建立Session花費秒級,真正上傳可能僅僅用了十幾毫秒) int maxBlockID = 20000; for (int blockId = 0; blockId < maxBlockID; blockId++) { // 準備好至少64MB以上數據,準備完成後方可寫入 // 例如:讀取若干文件或者從數據庫中讀取數據 try { // 在該Block上建立一個Writer,writer建立後任意一段時間內,若某連續2分鐘沒有寫入4KB以上數據,則會超時斷開鏈接 // 所以建議在建立writer前在內存中準備能夠直接進行寫入的數據 RecordWriter recordWriter = uploadSession.openRecordWriter(blockId); // 將讀取到的全部數據轉換爲Tunnel Record格式並切入 int recordNumber = 1000000; for (int index = 0; i < recordNumber; i++) { // 將第index條原始數據轉化爲odps record Record record = uploadSession.newRecord(); for (int i = 0; i < schema.getColumns().size(); i++) { Column column = schema.getColumn(i); switch (column.getType()) { case BIGINT: record.setBigint(i, 1L); break; case BOOLEAN: record.setBoolean(i, true); break; case DATETIME: record.setDatetime(i, new Date()); break; case DOUBLE: record.setDouble(i, 0.0); break; case STRING: record.setString(i, "sample"); break; default: throw new RuntimeException("Unknown column type: " + column.getType()); } } // Write本條數據至服務端,每寫入4KB數據會進行一次網絡傳輸 // 若120s沒有網絡傳輸服務端將會關閉鏈接,屆時該Writer將不可用,須要從新寫入 recordWriter.write(record); } // close成功即表明該block上傳成功,可是在整個Session Commit前,這些數據是在odps 臨時目錄中不可見的 recordWriter.close(); } catch (TunnelException e) { // 建議重試必定次數 e.printStackTrace(); System.out.println("write failed:" + e.getMessage()); } catch (IOException e) { // 建議重試必定次數 e.printStackTrace(); System.out.println("write failed:" + e.getMessage()); } } // 提交全部Block,uploadSession.getBlockList()能夠自行指定須要提交的Block,Commit成功後數據纔會正式寫入Odps分區,Commit失敗建議重試10次 for (int retry = 0; retry < 10; ++retry) { try { // 秒級操做,正式提交數據 uploadSession.commit(uploadSession.getBlockList()); break; } catch (TunnelException e) { System.out.println("uploadSession commit failed:" + e.getMessage()); } catch (IOException e) { System.out.println("uploadSession commit failed:" + e.getMessage()); } } System.out.println("upload success!"); } catch (TunnelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
構造器舉例說明:數據庫
PartitionSpec(String spec):經過字符串構造此類對象。api
參數:緩存
spec: 分區定義字符串,好比: pt='1',ds='2'。
所以程序中應該這樣配置:private static String partition = "pt='XXX',ds='XXX'";網絡
Tunnel是MaxCompute的數據通道,用戶能夠經過Tunnel向MaxCompute中上傳或者下載數據。目前Tunnel僅支持表(不包括視圖View)數據的上傳下載。session
同一個UploadSession裏的blockId不能重複。也就是說,對於同一個UploadSession,用一個blockId打開RecordWriter,寫入一批數據後,調用close,
而後再commit完成後,寫入成功後不能夠從新再用該blockId打開另外一個RecordWriter寫入數據。 Block默認最多20000個,即0-19999。併發
一個block大小上限 100GB,強烈建議大於64M的數據,每個Block對應一個文件,小於64MB的文件統稱爲小文件,小文件過多將會影響使用性能。
使用新版BufferedWriter能夠更簡單的進行上傳功能避免小文件等問題 Tunnel-SDK-BufferedWriter分佈式
每一個Session在服務端的生命週期爲24小時,建立後24小時內都可使用,也能夠跨進程/線程共享使用,可是必須保證同一個BlockId沒有重複使用,分佈式上傳能夠按照以下步驟:
建立Session->數據量估算->分配Block(例如線程1使用0-100,線程2使用100-200)->準備數據->上傳數據->Commit全部寫入成功的Block。工具
每一個Session在建立時會生成兩個文件目錄,若是大量建立而不使用,會致使臨時目錄增多,大量堆積時可能形成系統負擔,請必定避免此類行爲,儘可能共享利用session。性能
上傳數據時,Writer每寫入8KB數據會觸發一次網絡動做,若是120秒內沒有網絡動做,服務端將主動關閉鏈接,屆時Writer將不可用,請從新打開一個新的Writer寫入。
建議使用 [Tunnel-SDK-BufferedWriter]接口上傳數據,該接口對用戶屏蔽了blockId的細節,而且內部帶有數據緩存區,會自動進行失敗重試。
下載數據時,Reader也有相似機制,若長時間沒有網絡IO會被斷開鏈接,建議Read過程連續進行中間不穿插其餘系統的接口。
MaxCompute Tunnel目前提供Java版的SDK。
支持。
MaxCompute Tunnel用於批量上傳,不適合流式上傳,流式上傳可使用[DataHub高速流式數據通道],毫秒級延時寫入。
是的,Tunnel不會自動建立分區。
dship是一個工具,經過MaxCompute Tunnel來上傳下載。
追加的模式。
路由功能指的是Tunnel SDK經過設置MaxCompute獲取Tunnel Endpoint的功能。所以,SDK能夠只設置MaxCompute的endpoint來正常工做。
沒有一個絕對最優的答案,要綜合考慮網絡狀況,實時性要求,數據如何使用以及集羣小文件等因素。通常,若是數量較大是持續上傳的模式,能夠在64M - 256M,
若是是天天傳一次的批量模式,能夠設大一些到1G左右
通常是endpoint錯誤,請檢查Endpoint配置,簡單的判斷方法是經過telnet等方法檢測網絡連通性。
該project開啓了數據保護功能,用戶操做這是從一個項目的數據導向另外一個項目,這須要該project的owner操做。
Tunnel對請求的併發進行了控制,默認上傳和下載的併發Quota爲2000,任何相關的請求發出到結束過程當中均會佔用一個Quota單位。若出現相似錯誤,有以下幾種建議的解決方案:
1 sleep一下再重試;
2 將project的tunnel併發quota調大,須要聯繫管理員評估流量壓力;
3 報告project owner調查誰佔用了大量併發quota,控制一下。