MaxCompute上如何處理非結構化數據

0. 前言

MaxCompute做爲阿里雲大數據平臺的核心計算組件,擁有強大的計算能力,可以調度大量的節點作並行計算,同時對分佈式計算中的failover,重試等均有一套行之有效的處理管理機制。 而MaxCompute SQL能在簡明的語義上實現各類數據處理邏輯,在集團內外更是廣爲應用,在其上實現與各類數據源的互通,對於打通整個阿里雲的數據生態具備重要意義。基於這一點,最近MaxCompute團隊依託MaxCompute2.0系統架構,引入了非結構化數據處理框架:經過外部表,爲各類數據在MaxCompute上的計算處理提供了入口。這裏以MaxCompute處理存儲在OSS上的數據爲例,介紹這些新功能。html

現階段MaxCompute SQL面對的主要是以cfile列格式,存儲在內部MaxCompute表格中的結構化數據。而對於MaxCompute表外的各類用戶數據(包括文本以及各類非結構化的數據),須要首先經過各類工具導入MaxCompute表,才能在其上面進行計算。這個數據導入的過程,具備較大的侷限性。以OSS爲例子,想要在MaxCompute裏處理OSS上的數據,一般有兩種作法:java

  1. 經過OSS SDK或者其餘工具從OSS下載數據,而後再經過MaxCompute Tunnel將數據導入表裏。git

  2. 寫UDF,在UDF裏直接調用OSS SDK訪問OSS數據。github

但這兩種作法都不夠好。#1須要在MaxCompute系統外部作一次中轉,若是OSS數據量太大,還須要考慮如何併發來加速,沒法充分利用MaxCompute大規模計算的能力;#2一般須要申請UDF網絡訪問權限,還要開發者本身控制做業併發數和數據如何分片的問題。算法

本文介紹了一種外部表的功能 ,支持旨在提供處理除了MaxCompute現有表格之外的其餘數據的能力。在這個框架中,經過一條簡單的DDL語句,便可在MaxCompute上建立一張外部表,創建MaxCompute表與外部數據源的關聯,提供各類數據的接入和輸出能力。建立好的外部表能夠像普通的MaxCompute表同樣使用(大部分場景),充分利用MaxCompute SQL的強大計算功能。sql

這裏的「各類數據」涵蓋兩個維度:網絡

  1. 多樣的數據存儲介質架構

    插件式的框架能夠對接多種數據存儲介質,好比OSS、OTS併發

  2. 多樣的數據格式:MaxCompute表是結構化的數據,而外部表能夠不限於結構化數據框架

    a. 徹底無結構數據;好比圖像,音頻,視頻文件,raw binaries等

    b. 半結構化數據;好比csv,tsv等隱含必定schema的文本文件

    c. 非cfile的結構化數據; 好比orc/parquet文件,甚至hbase/OTS數據

下面經過一個簡單例子,來演示如何在MaxCompute上輕鬆訪問OSS上的數據。

1. 使用內置extractor讀取OSS數據

使用MaxCompute內置的extractor,能夠很是方便的讀取按照約定格式存儲的OSS數據。咱們只須要建立一個外部表,就能以這張表爲源表作查詢。假設有一份csv數據存在OSS上,endpoint爲oss-cn-hangzhou-zmf.aliyuncs.com,bucket爲oss-odps-test,數據文件放在/demo/SampleData/CSV/AmbulanceData/vehicle.csv

1.1 建立外部表

首先須要在RAM中受權MaxCompute訪問OSS的權限。登陸RAM控制檯,建立角色AliyunODPSDefaultRole,並將策略內容設置爲:

{  "Statement": [
    {      "Action": "sts:AssumeRole",      "Effect": "Allow",      "Principal": {        "Service": [          "odps.aliyuncs.com"
        ]
      }
    }
  ],  "Version": "1"}

而後編輯該角色的受權策略,將權限AliyunODPSRolePolicy受權給該角色。

若是以爲這些步驟太麻煩,能夠點擊此處完成一鍵受權

執行一條DDL語句,建立外部表:

-- (1)set odps.sql.planner.mode=lot;set odps.sql.ddl.odps2=true;set odps.sql.preparse.odps2=lot;set odps.task.major.version=2dot0_demo_flighting;CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string)STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (2)LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/Demo/SampleData/CSV/AmbulanceData/'; -- (3)(4)(5)

註釋:

  1. 這個功能是MaxCompute2.0的一部分,目前處於試用狀態。這裏須要提早設置一些開關來臨時打開這個功能(這個開關設置前須要申請試用MaxCompute2.0,文章末尾有介紹),本文後面全部的SQL例子語句運行前都須要設置這些開關,爲了便於閱讀,我只在這裏明確寫出。

  2. com.aliyun.odps.CsvStorageHandler是內置的處理csv格式文件的StorageHandler,它定義瞭如何讀寫csv文件。咱們只須要指明這個名字,相關邏輯已經由系統實現。

  3. LOCATION必須指定一個OSS目錄,默認系統會讀取這個目錄下全部的文件。

  4. 外部表只是在系統中記錄了與OSS目錄的關聯,當DROP這張表時,對應的LOCATION數據不會被刪除。

  5. 前面咱們已經經過RAM將帳號中的OSS資源訪問權限受權給了MaxCompute。在後續的訪問OSS過程當中,MaxCompute將經過STS拿到對OSS資源的臨時權限。須要注意的是,MaxCompute在獲取權限時,是以表的建立者的身份去STS申請的,所以,這裏建立表的帳號和OSS必須是同一個雲帳號,並且必須是主帳號,不能是子帳號。

1.2 查詢外部表

外部表建立成功後,咱們能夠像對普通表同樣使用這個外部表。

假設/demo/SampleData/CSV/AmbulanceData/vehicle.csv數據爲:

1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N

執行:

SELECT recordId, patientId, directionFROM ambulance_data_csv_externalWHERE patientId > 25;

這條語句會提交一個做業,調用內置csv extractor,從OSS讀取數據進行處理。

結果:

+------------+------------+-----------+
| recordId   | patientId  | direction |
+------------+------------+-----------+
| 1          | 51         | S         |
| 3          | 48         | NE        |
| 4          | 30         | W         |
| 5          | 47         | S         |
| 7          | 53         | N         |
| 8          | 63         | SW        |
| 10         | 31         | N         |
+------------+------------+-----------+

2. 自定義extractor

當OSS中數據格式比較複雜,內置的extractor沒法知足需求時,須要自定義extractor來讀取OSS文件中的數據。

例若有一個text數據文件,並非csv格式,記錄之間的列經過|分割。/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv數據爲:

1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N

2.1 定義Extractor

這個時候能夠寫一個通用的extractor,將分隔符做爲參數傳進來,能夠處理全部相似格式的text文件。

/**
 * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
 **/public class TextExtractor extends Extractor {  private InputStreamSet inputs;  private String columnDelimiter;  private DataAttributes attributes;  private BufferedReader currentReader;  private boolean firstRead = true;  public TextExtractor() {    // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
    this.columnDelimiter = ",";
  }  // no particular usage for execution context in this example
  @Override
  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {    this.inputs = inputs; // (1)
    this.attributes = attributes;    // check if "delimiter" attribute is supplied via SQL query
    String columnDelimiter = this.attributes.getValueByKey("delimiter"); // (2)
    if ( columnDelimiter != null)
    {      this.columnDelimiter = columnDelimiter;
    }    // note: more properties can be inited from attributes if needed
  }  @Override
  public Record extract() throws IOException {
    String line = readNextLine();    if (line == null) {      return null; // (5)
    }    return textLineToRecord(line); // (3)(4)
  }  @Override
  public void close(){    // no-op
  }
}
  1. inputs是一個InputStreamSet,每次調用next()返回一個InputStream,這個InputStream能夠讀取一個OSS文件的全部內容。

  2. delimiter經過DDL語句傳參

  3. textLineToRecord將一行數據按照delimiter分割爲多個列,完整實現能夠參考: https://github.com/aliyun/aliyun-odps-java-sdk/blob/master/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/text/TextExtractor.java

  4. extactor()調用返回一條Record,表明外部表中的一條記錄

  5. 返回null來表示這個表中已經沒有記錄可讀了

2.2 定義StorageHandler

StorageHandler做爲external table自定義邏輯的統一入口。

package com.aliyun.odps.udf.example.text;

public class TextStorageHandler extends OdpsStorageHandler {  @Override
  public Class<? extends Extractor> getExtractorClass() {    return TextExtractor.class;
  }  @Override
  public Class<? extends Outputer> getOutputerClass() {    return TextOutputer.class;
  }
}

2.3 編譯打包

將自定義代碼編譯打包,並上傳到MaxCompute。

add jar odps-udf-example.jar;

2.4 建立external表

與使用內置extractor相似,咱們一樣須要創建一個外部表,不一樣的是此次須要指定外部表訪問數據的時候,使用自定義的StorageHandler。

CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string)STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' -- (1)WITH SERDEPROPERTIES(  'delimiter'='|'  --(2)LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'USING 'odps-udf-example.jar'; --(3)
  1. STORED BY指定自定義StorageHandler的類名

  2. SERDEPROPERITES能夠指定參數,這些參數會經過DataAttributes傳遞到Extractor代碼中

  3. 同時須要指定類定義所在的jar包

2.5 查詢外部表

執行:

SELECT recordId, patientId, directionFROM ambulance_data_txt_externalWHERE patientId > 25;

這條語句會提交一個做業,調用自定義的extractor,從OSS讀取數據進行處理。

結果:

+------------+------------+-----------+
| recordId   | patientId  | direction |
+------------+------------+-----------+
| 1          | 51         | S         |
| 3          | 48         | NE        |
| 4          | 30         | W         |
| 5          | 47         | S         |
| 7          | 53         | N         |
| 8          | 63         | SW        |
| 10         | 31         | N         |
+------------+------------+-----------+

3. 經過自定義extractor讀取外部非結構化數據

在前面咱們看到了經過內置與自定義的extractor能夠輕鬆處理存儲在OSS上的csv等文本數據。接下來咱們以語音數據(wav格式文件)爲例,來看看怎樣經過自定義的extractor來訪問處理OSS上的非文本文件。

這裏咱們從最終執行的SQL query開始,介紹以MaxCompute SQL爲入口,處理存放在OSS上的語音文件的使用方法:

CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
(
sentence_snr double,id string)STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'WITH SERDEPROPERTIES (    'mlfFileName'='sm_random_5_utterance.text.label' ,    'speechSampleRateInKHz' = '16')
LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';
SELECT sentence_snr, id
    FROM speech_sentence_snr_externalWHERE sentence_snr > 10.0;

這裏咱們依然創建的外部表,而且經過外部表的schema定義了咱們但願經過外部表從語音文件中抽取出來的信息:

  1. 一個語音文件中的語句信噪比(SNR):sentence_snr

  2. 對應語音文件的名字:id

建立了外部表後,經過標準的select語句進行查詢,則會觸發extractor運行計算。

從這咱們能夠更直接的感覺到,在讀取處理OSS數據時,除了以前介紹過的對文本文件作簡單的反序列化處理,還能夠經過在自定義的extractor中實現更復雜的數據處理抽取邏輯:在這個例子中,咱們經過自定義的com.aliyun.odps.udf.example.speech. SpeechStorageHandler 中封裝的extractor, 實現了對語音文件計算平均有效語句信噪比的功能,並將抽取出來的結構化數據直接進行SQL運算(WHERE sentence_snr > 10), 最終返回全部信噪比大於10的語音文件以及對應的信噪比值。

在OSS地址oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/上,存儲了原始的多個wav格式的語音文件,MaxCompute 框架將讀取該地址上的全部文件,並在必要的時候進行文件級別的分片,自動將文件分配給多個計算節點處理。每一個計算節點上的extractor則負責處理經過InputStreamSet分配給該節點的文件集。具體的處理邏輯則與用戶單機程序相仿,用戶不用關心分佈計算中的種種細節,按照類單機方式實現其用戶算法便可。

這裏簡單介紹一下定製化的SpeechSentenceSnrExtractor主體邏輯。首先咱們在setup接口中讀取參數,進行初始化,而且導入語音處理模型(經過resource引入):

  public SpeechSentenceSnrExtractor(){    this.utteranceLabels = new HashMap<String, UtteranceLabel>();
  }  @Override
  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){    this.inputs = inputs;    this.attributes = attributes;    this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
    String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);    this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);    try {      // read the speech model file from resource and load the model into memory
      BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
      loadMlfLabelsFromResource(inputStream);
      inputStream.close();
    } catch (IOException e) {      throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
    }
  }

extractor()接口中,實現了對語音文件的具體讀取和處理邏輯,對讀取的數據根據語音模型進行信噪比的計算,而且將結果填充成[snr, id]格式的Record。
這個例子中對實現進行了簡化,同時也沒有包括涉及語音處理的算法邏輯,具體實現可參見:
https://github.com/aliyun/aliyun-odps-java-sdk/blob/master/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/speech/SpeechSentenceSnrExtractor.java

  @Override  public Record extract() throws IOException {
    SourceInputStream inputStream = inputs.next();    if (inputStream == null){      return null;
    }    // process one wav file to extract one output record [snr, id]
    String fileName = inputStream.getFileName();
    fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
    logger.info("Processing wav file " + fileName);    String id = fileName.substring(0, fileName.lastIndexOf('.'));    // read speech file into memory buffer
    long fileSize = inputStream.getFileSize();    byte[] buffer = new byte[(int)fileSize];    int readSize = inputStream.readToEnd(buffer);
    inputStream.close();    // compute the avg sentence snr
    double snr = computeSnr(id, buffer, readSize);    // construct output record [snr, id]
    Column[] outputColumns = this.attributes.getRecordColumns();
    ArrayRecord record = new ArrayRecord(outputColumns);
    record.setDouble(0, snr);
    record.setString(1, id);    return record;
  }  private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
          throws IOException {    //  skipped here
  }  // compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
  private double computeSnr(String id, byte[] buffer, int validBufferLen){    // computing the snr value for the wav file (supplied as byte buffer array), skipped here
  }

執行一開始的SQL語句,可得到計算結果:

--------------------------------------------------------------
| sentence_snr |                     id                      |
--------------------------------------------------------------|   34.4703    |          J310209090013_H02_K03_042          |
--------------------------------------------------------------|   31.3905    | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
--------------------------------------------------------------|   35.4774    | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0  |
--------------------------------------------------------------|   16.0462    | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0  |
--------------------------------------------------------------|   14.5568    |   tsh_148_3013_5_13_47_3d5008d792408f81_0   |
--------------------------------------------------------------

能夠看到,經過自定義extractor,咱們在SQL語句上便可分佈式地處理多個OSS上語音數據文件。一樣的,用相似的方法,咱們能夠方便的利用MaxCompute的大規模計算能力,完成對圖像,視頻等各類類型非結構化數據的處理。

4. 數據的分區

在前面的例子中,一個外部表關聯的數據經過LOCATION上指定的OSS「目錄」來實現,而在處理的時候,MaxCompute對讀取「目錄」下面的全部數據,**包括子目錄中的全部文件**。在數據量比較大,尤爲是對於隨着時間不斷積累的數據目錄,這樣子的全目錄掃描,可能帶來沒必要要的額外IO以及數據處理時間。 解決這個問題一般有兩種作法:

  • 比較直接的減小訪問數據量的方法是用戶本身負責對數據存放地址作好規劃,同時考慮使用多個EXTERNAL TABLE來描述不一樣部分的數據,讓每一個EXTERNALTABLE的LOCATION指向數據的一個子集。

  • 另外一個方面,EXTERNAL TABLE與內部表同樣,**支持分區表的功能**,用戶能夠利用這個功能來對數據作系統化的管理。這個章節主要介紹一下EXTERNAL TABLE的分區功能。

4.1 分區數據在OSS上的標準組織方式和路徑格式

與MaxCompute內部表不一樣,對於存放在外部存儲上(如OSS)上面的數據,MaxCompute不擁有數據的管理權,因此用戶若是但願系統的使用分區表功能的話,在OSS上的數據文件的存放路徑應該符合必定的格式,具體說來就是路徑爲以下格式:

partitionKey1=value1\partitionKey2=value2\...

舉一個例子,假設用戶天天的LOG文件存放在OSS上面,而後但願能在經過MaxCompute處理的時候,可以按照粒度爲「天」來訪問一部分數據。簡單假設這些LOG文件就是CSV的格式(複雜自定義格式用法也相似),那麼可使用以下的**分區外部表**來定義數據:

CREATE EXTERNAL TABLE log_table_external (
    click STRING,
    ip STRING,    url STRING,
  )
  PARTITIONED BY (    year STRING,    month STRING,    day STRING
  )  STORED BY 'com.aliyun.odps.CsvStorageHandler'
  LOCATION 'oss://<ak_id>:<ak_key>@oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data/';

能夠看到,這裏和前面的例子不同的主要就是在定義EXTERNAL TABLE的時候,咱們經過PARTITIONED BY的語法,來指定該外部表爲分區表,這裏舉了一個三層分區的例子,分區的key分別是 year, month 和 day。爲了讓這樣的分區能生效,在OSS上面存儲數據的時候須要遵循上面提到的路徑格式。這裏舉一個有效的路徑存儲layout的例子:

osscmd ls oss://oss-odps-test/log_data/2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile.12017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=02/logfile2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data/year=2016/month=07/day=10/logfile2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data/year=2016/month=08/day=08/logfile...

須要再次強調的,由於用戶本身離線準備了數據,即經過osscmd或者其餘OSS工具自行上載到OSS存儲服務上,因此數據路徑格式是由用戶決定的,若是想要EXTERNAL TABLE的分區表功能正常工做的話,咱們推薦用戶上載數據的時候遵循如上路徑格式。在這個前提下,經過ALTER TABLE ADD PARTITIONDDL語句,就能夠把這些分區信息引入MaxCompute。在這裏對應的DDL語句是:

ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01')ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '02')ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '07', day = '10')ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '08', day = '08')
...

事實上這些操做都和標準的MaxCompute內部表操做同樣,對分區表概念不熟悉的能夠參考文檔。在數據準備好而且PARTITION信息引入系統以後,就能夠經過SQL語句來對OSS上的外表數據的分區進行操做了。

好比若是隻想分析2016年6月1號當天,有多少不一樣的IP出如今LOG裏面,能夠經過以下語句實現:

SELECT count(distinct(ip)) FROM log_table_external 
WHERE year = '2016' AND month = '06' AND day = '01';

這種狀況下, 對log_table_external這個外表對應的目錄,將只訪問log_data/year=2016/month=06/day=01子目錄下的文件(logfilelogfile.1), 而**不會對整個**log_data/**目錄做全量數據掃描,避免大量無用的IO操做。**

一樣若是隻但願對2016年下半年的數據作分析, 則能夠經過

SELECT count(distinct(ip)) FROM log_table_external 
WHERE year = '2016' AND month > '06';

來**只訪問OSS上面存儲的下半年的LOG數據**.

4.2 分區數據在OSS上的自定義路徑

若是用戶已經有事先存在OSS上面的歷史數據,可是又不是根據partitionKey1=value1\partitionKey2=value2\...的路徑格式來組織存放的,那也是能夠經過MaxCompute的分區方式來進行訪問計算的。雖然*一般狀況下不這樣推薦*,可是在非結構化數據處理這方面,MaxCompute一樣提供了經過自定義路徑來引入partition的方法。

好比假設用戶的數據路徑上只有簡單的分區值(而無分區key信息),也就是數據的layout爲:

osscmd ls oss://oss-odps-test/log_data_customized/2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile.12017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data_customized/2016/06/02/logfile2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data_customized/2016/07/10/logfile2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data_customized/2016/08/08/logfile...

那麼要綁定不一樣的子目錄到不一樣的分區上,能夠經過相似以下自定義分區路徑的DDL語句實現:

ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01')
LOCATION 'oss://<ak_id>:<ak_key>@oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data_customized/2016/06/01/';

在ADD PARTITION的時候增長了LOCATION信息,從而實現自定義分區數據路徑後,即便數據存放不符合推薦的partitionKey1=value1\partitionKey2=value2\...格式,也能正確的實現對子目錄數據的分區訪問了。

4.3 徹底自定義的非分區數據子集訪問

最後在一些特殊狀況下,用戶可能會有訪問某個OSS路徑上的任意文件子集的需求,而這個文件子集中的文件在路徑格式上*沒有明顯的規律性*。在這方面MaxCompute非結構化數據處理框架也提供了相應的支持,可是在這裏不展開作介紹了。若是對於這些高階的特殊用法,有詳細的具體需求和場景描述的話,能夠聯繫MaxCompute技術團隊。

5. 結語

MaxCompute上增添處理非結構化數據的能力,可以有效的利用MaxCompute框架上成熟的大規模計算資源,處理來自各類數據源上的數據,從而真正實現數據計算互通。隨着MaxCompute 2.0框架的逐步上線,這些新功能將爲集團內外用戶提供更多的計算價值。目前對於OSS數據的讀取計算功能,在集團內一些急需大規模非結構化數據處理能力的團隊中已經使用。MaxCompute團隊將進一步完善相關功能,而且提供對更多數據源的支持,例如TableStore(OTS)等。後繼咱們也會在ATA上作更多的介紹。

原文連接:http://click.aliyun.com/m/14000/

相關文章
相關標籤/搜索