Apache Hudi是一個開源的數據湖框架,旨在簡化增量數據處理和數據管道開發。藉助Hudi能夠在Amazon S三、Aliyun OSS數據湖中進行記錄級別管理插入/更新/刪除。AWS EMR集羣已支持Hudi組件,而且能夠與AWS Glue Data Catalog無縫集成。此特性可以使得直接在Athena或Redshift Spectrum查詢Hudi數據集。html
對於企業使用AWS雲的一種常見數據流如圖1所示,即將數據實時複製到S3。
git
本篇文章將介紹如何使用Oracle GoldenGate
來捕獲變動事件並利用Hudi格式寫入S3數據湖。github
Oracle GG可使用多個處理程序和格式輸出,請查看此處獲取更多信息。sql
本篇文章中不關心處理程序,咱們假設使用Avro Operation格式,這種格式較爲冗長,但有着普遍應用,由於其平衡了數據完整性和性能。如圖2所示,此格式包含每一個記錄的before
和after
版本。shell
即便完整且易於生成,此格式也不適合用Athena或Spectrum進行分析,從使用角度也沒法替代源數據。此外你可能須要對歷史數據進行分區處理以便快速檢索。數據庫
本文咱們將介紹如何利用Apache Hudi框架作到這一點,以構建易於分析的目標數據集。express
咱們不詳細介紹如何將avro格式文件放入Replica S3
桶中,整個數據體系結構以下所示apache
Hudi代碼運行在EMR集羣中,從Replica S3
桶中讀取avro數據,並將目標數據集存儲到Target S3
桶中。架構
EMR軟件配置以下oracle
硬件配置以下
因爲插入/更新
始終保留最後一條記錄,所以Hudi做業很是具備彈性, 所以能夠利用Spot Instance(搶佔式實例)
大大下降成本。
除此以外,還須要設置
配置完後須要確保EMR集羣有讀寫權限。
若是你須要一些樣例數據,能夠點擊此處獲取。當設置好桶後,啓動EMR集羣並將這些樣例數據導入Replica
桶。
爲構建按時間劃分的數據集,必須肯定不可變的日期類型字段。參照示例數據集(銷售訂單),咱們假設訂單日期永遠不會改變,所以咱們將DAT_ORDER
字段做爲寫入Hudi數據集的分區字段。
分區方式是YYYY/MM/DD,經過該方式,全部數據將被組織在嵌套的子文件夾中。Hudi框架將提供此分區信息,並將一個特定字段添加到關聯的Hive/Glue表中。當查詢時,該字段上的過濾條件將轉換爲超高效的分區修剪掃描條件。
實際上這是咱們必須對數據集作的惟一強假設,全部其餘信息都在avro文件中(字段名稱,字段類型,PK等)。
除此元數據外,GoldenGate一般還會添加一些其餘信息,例如表名稱,操做時間戳,操做類型(插入/更新/刪除)和自定義標記。你能夠利用這些字段來構造通用邏輯並構建靈活的遷移平臺。
啓動spark-shell
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
啓動後能夠運行以下代碼:
val ggDeltaFiles = "s3://" + sourceBucket + "/" + sourceSubFolder + "/" + sourceSystem + "/" + inputTableName + "/"; val rootDataframe:DataFrame = spark.read.format("avro").load(ggDeltaFiles); // extract PK fields name from first line val pkFields: Seq[String] = rootDataframe.select("primary_keys").limit(1).collect()(0).getSeq(0); // take into account the "after." fields only val columnsPre:Array[String] = rootDataframe.select("after.*").columns; // exclude "_isMissing" fields added by Oracle GoldenGate // The second part of the expression will safely preserve all native "**_isMissing" fields val columnsPost:Array[String] = columnsPre.filter { x => (!x.endsWith("_isMissing")) || (!x.endsWith("_isMissing_isMissing") && (columnsPre.filter(y => (y.equals(x + "_isMissing")) ).nonEmpty))}; val columnsFinal:ArrayBuffer[String] = new ArrayBuffer[String](); columnsFinal += "op_ts"; columnsFinal += "pos"; // add the "after." prefix columnsPost.foreach(x => (columnsFinal += "after." + x)); // prepare the target dataframe with the partition additional column val preparedDataframe = rootDataframe.select("opTypeFieldName", columnsFinal.toArray:_*). withColumn("HUDI_PART_DATE", date_format(to_date(col("DAT_ORDER"), "yyyy-MM-dd"),"yyyy/MM/dd")). filter(col(opTypeFieldName).isin(admittedValues.toList: _*)); // write data preparedDataframe.write.format("org.apache.hudi"). options(hudiOptions). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, pkFields.mkString(",")). mode(SaveMode.Append). save(hudiTablePath);
上述簡化了部分代碼,能夠在此處找到完整的代碼。
輸出的S3對象結果以下所示
同時Glue數據目錄將使該表可用於經過外部模式在Athena或Spectrum中進行查詢分析,外部表具備咱們用於分區的hudi_part_date附加字段。