使用AWS Glue進行 ETL 工做

數據湖

數據湖的產生是爲了存儲各類各樣原始數據的大型倉庫。這些數據根據需求,進行存取、處理、分析等。對於存儲部分來講,開源版本常見的就是 hdfs。而各大雲廠商也提供了各自的存儲服務,如 Amazon S3,Azure Blob 等。html

而因爲數據湖中存儲的數據所有爲原始數據,通常須要對數據作ETL(Extract-Transform-Load)。對於大型數據集,經常使用的框架是 Spark、pyspark。在數據作完 ETL 後,再次將清洗後的數據存儲到存儲系統中(如hdfs、s3)。基於這部分清洗後的數據,數據分析師或是機器學習工程師等,可能夠基於這些數據進行數據分析或是訓練模型。在這些過程當中,還有很是重要的一點是:如何對數據進行元數據管理?java

在 AWS 中,Glue 服務不只提供了 ETL 服務,還提供的元數據的管理。下面咱們會使用 S3+Glue +EMR 來展現一個數據湖+ETL+數據分析的一個簡單過程。python

 

準備數據

這次使用的是GDELT數據,地址爲:sql

https://registry.opendata.aws/gdelt/apache

此數據集中,每一個文件名均顯示了此文件的日期。做爲原始數據,咱們首先將2015年的數據放在一個year=2015 的s3目錄下:編程

aws s3 cp s3://xxx/data/20151231.export.csv s3://xxxx/gdelt/year=2015/20151231.export.csvapp

 

使用Glue爬取數據定義

經過glue 建立一個爬網程序,爬取此文件中的數據格式,指定的數據源路徑爲s3://xxxx/gdelt/ 。框架

 

此部分功能及具體介紹可參考aws 官方文檔:機器學習

https://docs.aws.amazon.com/zh_cn/glue/latest/dg/console-crawlers.html工具

 

爬網程序結束後,在Glue 的數據目錄中,便可看到新建立的 gdelt 表:

 

原數據爲csv格式,因爲沒有header,因此列名分別爲col0、col1…、col57。其中因爲s3下的目錄結構爲year=2015,因此爬網程序自動將year 識別爲分區列。

至此,這部分原數據的元數據即保存在了Glue。在作ETL 以前,咱們可使用AWS EMR 先驗證一下它對元數據的管理,。

 

AWS EMR

AWS EMR 是 AWS 提供的大數據集羣,能夠一鍵啓動帶Hive、HBase、Presto、Spark 等經常使用框架的集羣。

啓動AWS EMR,勾選 Hive、Spark,並使用Glue做爲它們表的元數據。EMR 啓動後,登陸到主節點,啓動Hive:

> show tables;

gdelt

Time taken: 0.154 seconds, Fetched: 1 row(s)

 

能夠看到在 hive 中已經能夠看到此表,執行查詢:

> select * from gdelt where year=2015 limit 3;

OK

498318487       20060102        200601  2006    2006.0055       CVL     COMMUNITY                                               CVL                                                                                                   1       53      53      5       1       3.8     3       1       3       -2.42718446601942       1       United States   US      US      38.0    -97.0   US      0                               NULL  NULL            1       United States   US      US      38.0    -97.0   US      20151231        http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896       2015

498318488       20060102        200601  2006    2006.0055       CVL     COMMUNITY                                               CVL                     USA     UNITED STATES   USA                                                           1       51      51      5       1       3.4     3       1       3       -2.42718446601942       1       United States   US      US      38.0    -97.0   US      1       United States   US    US      38.0    -97.0   US      1       United States   US      US      38.0    -97.0   US      20151231        http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896       2015

498318489       20060102        200601  2006    2006.0055       CVL     COMMUNITY                                               CVL                     USA     UNITED STATES   USA                                                           1       53      53      5       1       3.8     3       1       3       -2.42718446601942       1       United States   US      US      38.0    -97.0   US      1       United States   US    US      38.0    -97.0   US      1       United States   US      US      38.0    -97.0   US      20151231        http://www.inlander.com/spokane/after-dolezal/Content?oid=2646896       2015

 

能夠看到原始數據的列很是多,假設咱們所須要的僅有4列:事件ID、國家代碼、日期、以及網址,並基於這些數據作分析。那咱們下一步就是作ETL。

 

GLUE ETL

Glue 服務也提供了 ETL 的工具,能夠編寫基於spark 或是 python 的腳本,提交給 glue etl 執行。在這個例子中,咱們會抽取col0、col5二、col5六、col5七、以及year這些列,並給它們重命名。而後從中抽取僅包含「UK」的記錄,最終以date=current_day 的格式寫入到最終s3 目錄,存儲格式爲parquet。能夠經過 python 或是 scala 語言調用 GLUE 編程接口,在本文中使用的是 scala:

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import java.text.SimpleDateFormat
import java.util.Date

object Gdelt_etl {
  def main(sysArgs: Array[String]) {
      
    val sc: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(sc)
    val spark = glueContext.getSparkSession
    
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    // db and table
    val dbName = "default"
    val tblName = "gdelt"

    // s3 location for output
    val format = new SimpleDateFormat("yyyy-MM-dd")
    val curdate = format.format(new Date())
    val outputDir = "s3://xxx-xxx-xxx/cleaned-gdelt/date=" + curdate + "/"

    // Read data into DynamicFrame
    val raw_data = glueContext.getCatalogSource(database=dbName, tableName=tblName).getDynamicFrame()

    // Re-Mapping Data
    val cleanedDyF = raw_data.applyMapping(Seq(("col0", "long", "EventID", "string"), ("col52", "string", "CountryCode", "string"), ("col56", "long", "Date", "String"), ("col57", "string", "url", "string"), ("year", "string", "year", "string")))

    // Spark SQL on a Spark DataFrame
    val cleanedDF = cleanedDyF.toDF()
    cleanedDF.createOrReplaceTempView("gdlttable")
    
    // Get Only UK data
    val only_uk_sqlDF = spark.sql("select * from gdlttable where CountryCode = 'UK'")
    
    val cleanedSQLDyF = DynamicFrame(only_uk_sqlDF, glueContext).withName("only_uk_sqlDF")        
    
    // Write it out in Parquet
    glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> outputDir)), format = "parquet").writeDynamicFrame(cleanedSQLDyF)    
    
    Job.commit()
  }
}

 

將此腳本保存爲gdelt.scala 文件,並提交給 GLUE ETL做業執行。等待執行完畢後,咱們能夠在s3看到生成了輸出文件:

> aws s3 ls s3://xxxx-xxx-xxx/cleaned-gdelt/ date=2020-04-12/

part-00000-d25201b8-2d9c-49a0-95c8-f5e8cbb52b5b-c000.snappy.parquet

 

而後咱們再對此/cleaned-gdelt/目錄執行一個新的 GLUE 網爬程序:

 

執行完成後,能夠在GLUE 看到生產了新表,此表結構爲:

 

能夠看到輸入輸出格式均爲parquet,分區鍵爲date,且僅包含了咱們所需的列。

再次進入到 EMR Hive 中,能夠看到新表已出現:

hive> describe cleaned_gdelt;
OK
eventid                 string
countrycode             string
date                    string
url                     string
year                    string
date                    string

# Partition Information
# col_name              data_type               comment
date                    string

 

查詢此表:

hive> select * from cleaned_gdelt limit 10;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
498318821       UK      20151231        http://wmpoweruser.com/microsoft-denies-lumia-950-xl-withdrawn-due-issues-says-stock-due-strong-demand/ 2015
498319466       UK      20151231        http://www.princegeorgecitizen.com/news/police-say-woman-man-mauled-by-2-dogs-in-home-in-british-columbia-1.2142296     2015
498319777       UK      20151231        http://www.catchnews.com/life-society-news/happy-women-do-not-live-any-longer-than-sad-women-1451420391.html    2015
498319915       UK      20151231        http://www.nationalinterest.org/feature/the-perils-eu-army-14770        2015
…
Time taken: 0.394 seconds, Fetched: 10 row(s)

 

能夠看到出現的結果均的 CountryCode 均爲 UK,達到咱們的目標。

 

自動化

下面是將 GLUE 網爬 + ETL 進行自動化。在GLUE ETL 的工做流程中,建立一個工做流,建立完後以下所示:

 

如圖所示,此工做流的過程爲:

  1. 每晚11點40開始觸發工做流
  2. 觸發 gdelt 的網爬做業,爬取原始數據的元數據
  3. 觸發gdelt的ETL做業
  4. 觸發gdelt-cleaned 網爬程序,爬取清洗後的數據的元數據

 

下面咱們添加一個新文件到原始文件目錄,此新數據爲 year=2016 的數據:

aws s3 cp s3://xxx-xxxx/data/20160101.export.csv s3://xxx-xxx-xxx/gdelt/year=2016/20160101.export.csv

 

而後執行此工做流。

期間咱們能夠看到ETL job 在raw_crawler_done 以後,被正常觸發:

 

做業完成後,在Hive 中便可查詢到 2016 年的數據:

select * from cleaned_gdelt where year=2016 limit 10;
OK
498554334       UK      20160101        http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/    2016
498554336       UK      20160101        http://medicinehatnews.com/news/national-news/2015/12/31/support-overwhelming-for-bc-couple-mauled-by-dogs-on-christmas-day/    2016
相關文章
相關標籤/搜索