MaxCompute做爲使用最普遍的大數據平臺,內部存儲的數據以EB量級計算。巨大的數據存儲量以及大規模計算下高性能數據讀寫的需求,對於MaxCompute提出了各類高要求及挑戰。處在大數據時代,數據的來源多種多樣,開源社區通過十幾年的發展,百花齊放,各類各樣的數據格式不斷的出現。 咱們的用戶也在各個場景上,經過各類計算框架,積累了各類不一樣格式的數據。怎樣將MaxCompute強大的計算能力開放給這些使用開源格式存儲沉澱下來的數據,在MaxCompute上挖掘這些數據中的信息,是MaxCompute團隊但願解決的問題。java
MaxCompute 2.0最近推出的非結構化計算框架【公測階段】,旨在從存儲介質和存儲格式兩個維度,打通計算與存儲的通道。 在以前的文章中,咱們已經介紹過怎樣在MaxCompute上對存儲在OSS上的文本,音頻,圖像等格式的數據,以及TableStore(OTS)的KV數據進行計算處理。在這裏,則將介紹對於各類流行的開源數據格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE等等),怎樣將其存儲在OSS上面,並經過非結構化框架在MaxCompute進行處理。sql
本着不重造輪子的原則,對於絕大部分這些開源數據格式的解析工做,在非結構化框架中會直接調用開源社區的實現,而且無縫的與MaxCompute系統作對接。apache
MaxCompute非結構化數據框架經過EXTERNAL TABLE的概念來提供MaxCompute與各類數據的聯通,與讀取OSS數據的使用方法相似,對OSS數據進行寫操做,首先要經過CREATE EXTERNAL TABLE語句建立出一個外部表,而在讀取開源數據格式時,建立外表的DDL語句格式以下:安全
DROP TABLE [IF EXISTS] <external_table>; CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table> (<column schemas>) [PARTITIONED BY (partition column schemas)] [ROW FORMAT SERDE '<serde class>'] STORED AS <file format> LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
能夠看到,這個語法與HIVE的語法是至關接近的,而在這個CREATE EXTERNAL TABLE的ddl語句中,有以下幾點要說明:性能優化
STORED AS
的關鍵字,而不是普通非結構化外表用的STORED BY
關鍵字,這也是目前在讀取開源兼容數據時獨有的。<column schemas>
必須與具體OSS上存儲存儲數據的schema相符合。ROW FORMAT SERDE
並不是必選選項,只有在使用一些特殊的格式上,好比TEXTFILE時才須要使用。STORED AS
後面接的是文件格式名字, 好比 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等。如今再來看一個具體的例子,假設咱們有一些PARQUET文件存放在一個OSS路徑上,每一個文件都是PARQUET格式,存放着schema爲16列(4列BINGINT, 4列DOUBLE, 8列STRING)的數據,那麼能夠經過以下DDL語句來描述:app
CREATE EXTERNAL TABLE tpch_lineitem_parquet ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber bigint, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string ) STORED AS PARQUET LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
一樣的數據,若是是每行以JSON格式,存儲成OSS上TEXTFILE文件;同時,數據在OSS經過多個目錄組織,這時是可使用MaxCompute分區表和數據關聯,則能夠經過以下DDL語句來描述:框架
CREATE EXTERNAL TABLE tpch_lineitem_textfile ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber bigint, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, l_shipmode string, l_comment string ) PARTITIONED BY (ds string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
若是OSS表目錄下面的子目錄是以Partition Name方式組織,好比:性能
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/' oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/' ...
則可使用如下DDL語句ADD PARTITION:大數據
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102"); ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
若是OSS分區目錄不是按這種方式組織,或者根本不在表目錄下,好比:優化
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/; oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/; ...
則可使用如下DDL語句ADD PARTITION:
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/'; ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103") LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/'; ...
對比上面的兩個範例,能夠看出對於不一樣文件類型,只要簡單修改STORED AS
後的格式名。在接下來的例子中,咱們將只集中描述對上面PARQUET數據對應的外表(tpch_lineitem_parquet)的處理,若是要處理不一樣的文件類型,只要在DDL建立外表時指定是PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE便可,處理數據的語句則是同樣的。
在建立數據外表後,直接對外表就能夠進行與普通MaxCompute表的操做,直接對存儲在OSS上的數據進行處理,好比:
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_parquet WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
能夠看到,在這裏tpch_lineitem_parquet
這個外表被看成一個普通的內部表同樣使用。惟一不一樣的只是在MaxCompute內部計算引擎將從OSS上去讀取對應的PARQUET數據來進行處理。
可是咱們應該強調的是,在這裏直接使用外表,每次讀取的時候都須要涉及外部OSS的IO操做,而且MaxCompute系統自己針對內部存儲作的許多高性能優化都用不上了,因此性能上會有所損失。 因此若是是須要對數據進行反覆計算以及對計算的高效性比較敏感的場景上,咱們推薦下面這種用法:先將數據導入MaxCompute內部,再進行計算。
注意,上面例子中的tpch_lineitem_textfile表,由於使用了ROW FORMAT + STORED AS,須要手動設置flag(只使用STORED AS,odps.sql.hive.compatible默認爲TRUE),再進行讀取,不然會有報錯。
SELECT * FROM tpch_lineitem_textfile LIMIT 1; FAILED: ODPS-0123131:User defined function exception - Traceback: com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper --須要手動設置hive兼容flag set odps.sql.hive.compatible=true; SELECT * FROM tpch_lineitem_textfile LIMIT 1; +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+ | 5640000001 | 174458698 | 9458733 | 1 | 14.0 | 23071.58 | 0.08 | 0.06 | N | O | 1998-01-26 | 1997-11-16 | 1998-02-18 | TAKE BACK RETURN | SHIP | cuses nag silently. quick | +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
tpch_lineitem_internal
,而後將OSS上的開源數據導入MaxCompute內部表,以cFile格式存儲在MaxCompute內部:CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet; INSERT OVERWRITE TABLE tpch_lineitem_internal SELECT * FROM tpch_lineitem_parquet;
SELECT l_returnflag, l_linestatus, SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price, AVG(l_quantity) AS avg_qty, COUNT(*) AS count_order FROM tpch_lineitem_internal WHERE l_shipdate <= '1998-09-02' GROUP BY l_returnflag, l_linestatus;
經過這樣子將數據先導入系統的狀況下,對一樣數據的計算就會更高效得多。
開源的種種數據格式每每由各類數據處理生態產生,而MaxCompute非結構化數據處理框架經過實現計算與存儲的互聯,但願打通阿里雲核心計算平臺與各類數據的通路。在這個基礎上,各類各樣依賴於不一樣數據格式的應用,將能在MaxCompute計算平臺上實現,後繼咱們會對一些具體的這種應用,好比基因計算等,再作一些具體的case study以及介紹。咱們也歡迎有對開源數據進行處理分析的更多應用,能在MaxCompute強大計算能力的基礎上開花結果。
本文爲雲棲社區原創內容,未經容許不得轉載。