Spark DataFrame ETL教程

前言

ETL是 Extract-Transform-Load的縮寫,也就是抽取-轉換-加載,在數據工做中是很是重要的部分。實際上,ETL就是一個對數據進行批處理的過程,一個ETL程序就是一個批處理腳本,執行時能將一堆數據轉化成咱們須要的形式。
每一個接觸過數據批處理的工程師,都走過ETL的流程,只是沒有意識到而已。按照ETL過程的框架來從新認識數據批處理,有利於咱們更清晰地編寫批處理腳本。
在單機範圍內的數據量下,使用python的pandas包就能夠很是方便地完成數據批處理工做。但當數據量達到1G以上時,pandas處理起來就有些力不從心了,到數據量達到1T以上,只能以分塊的方式存儲在分佈式系統上時,pandas就無能爲力了。在當前的技術背景下,典型的場景就是數據存儲在Hive on HDFS上。要作ETL,就須要新的工具。Hadoop生態下,原生的工具是MapReduce計算模型,一般用Java編寫,比較複雜,每次計算的中間結果也須要進行磁盤存取,很是費時。Spark是一個MPP架構的計算引擎,相比MapReduce,Spark 有DataFrame(又名 Schema RDD), 以表的形式來儲存數據,不管是理解仍是操做,都更爲簡單,還支持Python,在許多須要使用函數做參數的場合,很是好用。html

本教程將介紹如何使用pyspark.sql模塊,操做Spark DataFrame,從Hive中讀取數據,通過一系列轉換,最後存入Hive中。Spark的DataFrame和pandas的DataFrame的概念很類似,只是操做略有不一樣,若是讀者有pandas的使用經驗,很容易就能快速上手。
教程只是爲了方便讀者快速入門,想要更好地開發Spark程序,仍然須要詳細瞭解Spark的API接口,對python環境下,Hive的ETL來講,研究pyspark.sql模塊下的內容就足夠了,能夠參考官方文檔python

環境:Spark的API隨版本不一樣會有較大變化,目前比較流行的版本是1.6和2.2,本文使用Spark 1.6.0,語言爲Python 2.7。默認數據都儲存在Hive中,Hadoop集羣帶有yarn。sql

冒煙測試

學習一門語言或者軟件的第一步,永遠都是冒煙測試。最經典的冒煙測試就是輸出Hello World。但對ETL來講,一個打印"Hello World"的Spark程序是沒什麼用的。因此咱們這裏講講如何打印一張表,這張表有一行數據,列名爲t,值爲"Hello World"。數據庫

Spark的核心是SparkContext,它提供了Spark程序的運行環境。而SqlContext則是由SparkContext產生,提供了對數據庫表的訪問接口。由於這裏數據庫的環境是Hive,一般使用SqlContext的派生類HiveContext。在Spark提供的交互式環境中,會在啓動時自動建立環境,生成SparkContext和HiveContext的實例。在pyspark的交互式環境中,SparkContext實例名爲sc,HiveContext實例名爲sqlContext。apache

交互式操做只在學習和調試時使用,實際工做中仍是要靠命令行執行腳本。在腳本中咱們就須要本身生成SparkContext和HiveContext了。基本操做代碼以下:編程

# -*- coding: UTF-8 -*-
from pyspark import SparkContext,HiveContext
sc = SparkContext(appName="Hello World") #  appName就是這個Spark程序的名字,在DEBUG時有用
hc = HiveContext(sc)
df = hc.createDataFrame([["Hello World"]],['t']) # 建立一個DataFrame,第一個參數是數據,一個二維列表,第二個參數是表頭,一個列表)
first_cell = df.collect()[0][0] # 取第一個單元格的值
df.show() # 將表打印到屏幕上
print(first_cell)

將這段代碼保存成文件hello.py,在終端中進入到該文件所在目錄,輸入命令spark-submit --master yarn hello.py ,而後就能夠看到屏幕上輸出以下,冒煙測試就算完成了。json

+-----------+
|          t|
+-----------+
|Hello World|
+-----------+
Hello World

指令解釋:spark-submit就是spark的執行程序,master yarn是spark-submit的參數,指定yarn做爲計算調度的中心。最後hello.py就是咱們的ETL程序。api

Extract 抽取

ETL的第一步就是從數據源抽取數據,在Spark中就是從Hive裏讀取數據。緩存

Hive雖然實質上是個MapReduce接口的封裝,但從上層抽象模型來看,有最基本的Schema、Table和Column,還有一套類SQL語法,能夠說就是一個典型的關係數據庫模型,所以在ETL過程當中,咱們徹底能夠把Hive當成一個關係數據庫來看待。網絡

抽取的經常使用方法由兩種,一種是直接讀取Hive表,一種是經過Hive QL讀取。
都須要以HiveContext的實例做爲入口,結果返回一個Spark DataFrame,爲了檢查結果,可使用show方法查看DataFrame的數據。

假設咱們有一個名爲test 的庫,裏面有一張表爲t1,數據結構以下:

a b c
1 2 3
4 5 6
7 8 9

直接讀取Hive表

HiveContext對讀取操做提供統一的接口- DataFrameReader,HiveContext的實例的read屬性就能夠獲取到這個接口。
固然,這個接口也能用來讀取Hive的數據,read.table就可獲取到表的數據,代碼以下

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="extract")
hc = HiveContext(sc) # 生成HiveContext實例
t =hc.read.table("test.t1")
t.show()

Hive QL讀取

實質是讓HiveContext將HiveQL傳給Hive,讓Hive執行後,將查詢結果封裝成Spark DataFrame返回。在處理過程比較簡單,或者須要大量設置別名時,比較有用(由於Spark批量設置別名不太方便),但不推薦寫太過複雜的Hive QL,由於Hive 執行Hive QL的實質是把Hive QL轉成MapReduce執行,在計算效率上是不如Spark的。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="extract")
hc = HiveContext(sc)
hc.sql("use test") 
t = hc.sql("select * from t1")
t.show()

Load 加載

爲何不先講Trasform呢?由於Trasform的操做不少,先講Load有助於快速上手完成一個初級的ETL程序。
相似於讀取,HiveContext也提供了統一的寫接口,名爲DataFrameWriter.調用write屬性便可獲取。

寫入的具體方式也不少,不過爲了快速上手,只講最關鍵的一些東西。

mode 寫入方式

若是表已經存在,該如何操做。

  • append 追加: 在尾部追加數據
  • overwrite 覆寫: 覆蓋原有數據
  • error 錯誤: 拋出異常
  • ignore忽略 : 自動跳過

由於Hive on HDFS的關係,更新表最快的方式是全表覆寫。對於須要更新原有的ETL,通常都是全表重寫,只須要追加的,就能夠用追加。

format 文件格式

在Hive on HDFS中,數據實質上是以文件的形式保存的。不一樣的文件格式,在壓縮容量、支持數據類型和查詢速度上都有所不一樣。textfile,avro,sequence,parquet,json等。目前我經常使用的格式是text和parquet,若是不設置文件格式,默認會使用Hive表的文件格式,若是Hive表不存在,則使用Hive表的默認格式textfile

加載新表

瞭解了上面的操做以後,咱們就能夠開始寫加載部分的代碼了,只須要使用一個saveAsTable方法就好了,很是簡單。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="load")
hc = HiveContext(sc)
hc.sql("use test") 
t1 = hc.sql("select a as a1,b as b1,c as c1 from t1")
t1.write.saveAsTable("test.t2",format="parquet",mode="overwrite") # 將t1的三個列更名後存成t2表
t2.read.table("test.t2")
t2.show()

轉換

轉換是ETL過程當中最複雜的部分,去掉抽取和加載,剩下的全都是轉換,包含的內容是很是多的,常見的有篩選、聚合、多列合併或計算,列賦值,根據不一樣的須要有不一樣的處理方法。因爲Spark的轉換操做較爲囉嗦,因此推薦把部分簡單的操做經過Hive QL的方式,在抽取步驟中交由Hive完成,這樣有助於精簡代碼,提升可讀性,下降維度難度。
下面就講一講Spark DataFrame 轉換部分的基本概念和操做。

向量化編程

對於平常用Java來作數據批處理的工程師來講,可能更習慣用for循環來逐條處理數據。但這樣作在操做上是很不方便的,也不太利於閱讀理解。在科學計算的語境下,數據老是以DataFrame的形式儲存,也就是一張表。數據處理操做一般是對這張表的某些行或者某些列來進行處理。好比,「令t1表的a列中數字大於2的值的,所有都等於2」,或者「給t1表新加一常數列d,值爲99」,這樣的操做在向量化編程的語境下,就是一個調用API接口的操做,比for循環容易被理解。
能夠類比pandas。在pandas中,也主要是經過向量化編程的方式來處理數據,雖然提供了迭代器的接口,能夠一行行地讀取數據,但通常以表做爲修改對象的操做,主要是以API接口來完成,不推薦使用迭代器來作行級修改。一來操做不方便,二來運算速度未必能比優化過的API接口快。
Spark是分佈式執行的,數據分散在各個機器上,背後有一套調度系統來控制數據計算負載。若是用for循環來處理,就是把負載都加在了執行腳本的機器上,通常來講執行腳本的機器都是不儲存數據的master,實際上這一過程就會致使須要把數據從slave傳到master上,無謂地增長了網絡負擔。因此,在Spark腳本里,嚴禁使用原生的python for循環來處理SparkData Frame,即便要用,也應該使用Spark提供的API接口。

基本操做對象

在Spark DataFrame語境下,操做對象主要有三個:DataFrame,Row,Column。

  • DataFrame: DataFrame就是一張表,有表頭和若干行數據。這張表是一個有序、可迭代的集合。
  • Row:DataFrame 集合中的元素就是Row。每一個Row儲存一行數據,有相同的屬性,這些屬性和表頭同名。DataFrame沒有API接口能夠直接獲取到某個Row,但能夠經過Colect方法獲取到Row對象的list,再從中獲取指定的Row。
  • Column:Column與數據的實際結構無關,是一個操做上的概念。在實際的轉換操做中,絕大多數都是對若干列進行數學運算、拼接、映射等等。取DataFrame中的一列,獲得的就是一個Column對象。

事實上,最經常使用的主要是DataFrame和Column,Row不多用到。其中,DataFrame是核心,一個ETl過程,實質就是從抽取一個DataFrame開始,通過一系列的DataFrame變換,獲得一個與目標一致的DataFrame,而後寫入到目標數據庫中去。Column在其中扮演着中間點的角色,好比取DataFrame的多個列,拼接合成一個新列,而後把這個新列加到本來的DataFrame中去。

基本操做分類

上面提到了,DataFrame是核心操做對象。其實在Spark中,真正意義上的核心操做對象是RDD,一個有序的,分佈式儲存在內存中的操做對象。DataFrame就是一個特殊的RDD——Schema RDD。全部的DataFrame操做,均可以歸類爲兩種基本操做:轉化(Transformation)和行動(action)。轉換操做是不會觸發Spark的實際計算的,即便轉換過程當中出現了錯誤,在執行到這一行代碼時,也不會報錯。直到執行了行動操做以後,纔會真正讓Spark執行計算,這時候纔會拋出在轉化過程當中出現的錯誤。這在DEBU時,尤爲是交互式編程環境下,可能會致使問題代碼定位錯誤,須要特別注意。

  • Transform:典型的轉換操做有讀(read),篩選(filter)、拼接(union)等等,只要這個過程只改變DataFrame的形態,而不須要實際取出DataFrame的數據進行計算,都屬於轉換。理論上來講,ETL過程當中的Transfrom過程,主幹流程只會有轉換操做,不會有Action操做。
  • Action:典型的動做操做有計數(count),打印表(show),寫(write)等,這些操做都須要真正地取出數據,就會觸發Spark的計算。

篩選

filter(cond):篩選出知足條件cond的行。cond能夠填字符串,格式和SQL中的where子句同樣,也能夠填Bool類型的Column對象,好比 df['a']>1。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t1 = df.filter("a > 1 and c < 9")
t1.show() # 輸出 4,5,6 這一行
t2 = df.filter( (df['b']<5) & (df['c']<8)) # 可使用&或|對兩個bool列進行邏輯運算,但必需要用圓括號括起,限定運算順序。
t2.show() # 輸出 1,2,3 這一行

賦值,加列

withColumn(col_name,col):col_name是列名,col是列值,必須是一個Column對象。
賦值和加列操做是相同的,col_name存在,就是賦值,不然就是加列。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t1 = df.withColumn("c",df['c']+1)
t1.show() # c的值全都增長了1
t2 = df.withColumn("d",df['a']+1)
t2.show() # 增長了新一列d

刪除列

drop(col_name):col_name爲列名。該方法會返回一個刪除col_name列的DataFrame

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t = df.drop("c")
t.show() # 只有 a,b兩列

給列取名

alias(col_name):一般和select配合使用,請看下面的例子

選取列

select(*cols):cols爲列名或列對象。
賦值和刪除操做,每次只能改加減一列數據,若是想要批量地改變,尤爲是調整列順序的時候,就很是有用了。在ETL中,當須要計算的列不少時,一般就是逐個計算出不一樣的列對象,最後用select把它們排好順序。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
a1 = (df['a']+1).alias("a1") # 新增一個列對象,取名爲a1
t = df.select("a",a1,"b") # 若是用字符串,必須是df中存在的列名。
t.show() # 顯示a, a_1,b 三列

生成Column對象

在賦值的例子裏,Column對象是由原DataFrame的Column通過簡單的數學運算或邏輯運算獲得的,但若是咱們想生成一些更特殊的Column呢?好比常數列或者本身定義複雜的規則。
Spark提供了pyspark.sql.functions,含有豐富的接口,其中就有咱們須要的東西。篇幅有限,只能介紹一些經常使用的,更多的仍是須要去看官方文檔。

常數列

lit(value):value數必須是必須爲pyspark.sql.types支持的類型,好比int,double,string,datetime等

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import lit
from datetime import datetime
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t = df.withColumn("constant",lit(datetime(2018,1,1,2,3,4,999)))
t.show(truncate=False)

取整

round、floor:和Python的標準函數用法一致,只是數字換成列名

條件分支

when(cond,value):符合cond就取value,value能夠是常數也能夠是一個列對象,連續能夠接when構成多分支
otherwise(value):接在when後使用,全部不知足when的行都會取value,若不接這一項,則取Null。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import when
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t = df.withColumn("when",when(df['a']==1,"a=1").when(df['b']==5,df['b']%5).otherwise("other"))
t.show() # 生成when列,值分別爲 a=1,0,other

日期和時間

current_date():當前日期,返回一個date列
current_timestamp():當前時刻,返回一個timestamp列
date_add(start, days):日期正向偏移,start爲開始時間,必須是Column或字符串對象,指向一個date或timestamp列,days爲偏移天數。
date_sub(start, days):相似date_add,可是負向偏移。
date_format(date, format): 日期格式化,date爲要格式化的時間,必須是Column或字符串對象,指向一個date或timestamp列,days爲偏移天數,format爲格式化的字符串,具體參考Hive QL的date_format函數。
datediff(end, start):計算天數差

自定義規則

udf(f, returnType=StringType): 自定義處理函數,f爲自定義的處理函數,returnType爲f的返回類型,必須爲pyspark.sql.types支持的類型,若是不填,會默認自動轉化爲String類型。udf會返回一個函數,能夠當作列函數使用。
這在處理邏輯很是複雜時頗有用。好比對身份證號進行校驗計算,而後取出有效的身份證號的第1,4,10位,這個複雜流程很難用Spark提供的API拼接起來,只能本身寫。
做爲教程,就不寫太複雜的函數了。
自定義函數f的傳入參數爲列的值。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
def f(a,b,c):
    r=0
    if a==1:
        r=1
    elif b==5:
        r=2
    return r

col_match = udf(f,IntegerType())
t = df.withColumn("col_match",col_match("a","b","c"))
t.show() # 生成col_match列,值分別爲 a=1,2,0

排序

Spark支持多字段,升降序排序。
可使用orderBy和sort,由於操做比較簡單也符合直覺,這裏略去例子,詳情能夠看文檔。

聚合

Spark 支持直接聚合,也支持分組聚合。聚合的表達方式很是多,這裏僅選取經常使用的。

直接聚合

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import sum
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c'])
t = df.agg(sum("a"))
print(t.collect()[0][0]) # 打印 12

分組聚合

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import sum,max
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c'])
t = df.groupBy("b").agg(sum("a"),max("c"))
t.show()

輸出:

+---+------+------+
|  b|sum(a)|max(c)|
+---+------+------+
|  1|     5|     3|
|  2|     7|     9|
+---+------+------+

窗口函數

有一類分析需求,是須要分組計算,但保持數據的粒度不變的。好比經過成績表,按班計算的學生的成績排名,加一列到本來的成績表中,整個表的每一行仍然表示一名學生。這種分析需求稱爲窗口分析,好比說每一個班,就是一個窗口,在這個窗口中,計算出班級成績排名,再併到原表中。
這種分析,首先要建立一個窗口,而後再使用窗口函數來進行計算。Spark提供了豐富的窗口函數,能夠知足各種分析需求。

建立窗口

使用pyspark.sql.Window對象能夠建立一個窗口,最簡單的窗口能夠什麼都沒有,但通常不推薦這樣作。可使用partitionBy進行分組,使用orderBy進行排序,好比

from pyspark.sql import Window
window = Window.partitionBy("a").orderBy("b")

窗口函數使用示例

rank():根據窗口中partitionBy進行分組,以orderBy排序

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import rank,desc
from pyspark.sql import Window
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
score = [
    ['a','a_1',90],
    ['a','a_2',80],
    ['a','a_3',85],
    ['b','b_1',70],
    ['b','b_2',80],
    ['b','b_3',75],
    ['c','c_1',90]
]
df = hc.createDataFrame(score,['class','student','score'])
class_window = Window.partitionBy("class").orderBy(desc("score")) #降序排列
class_rank = rank().over(class_window) 
class_row_number = row_number().over(class_window) #窗口函數(xxx).over(window),就是通常的用法
t = df.withColumn("rank",class_rank)
t.show()

按班級,分數從高到低,生成排名

+-----+-------+-----+----+
|class|student|score|rank|
+-----+-------+-----+----+
|    a|    a_1|   90|   1|
|    a|    a_3|   85|   2|
|    a|    a_2|   80|   3|
|    b|    b_2|   80|   1|
|    b|    b_3|   75|   2|
|    b|    b_1|   70|   3|
|    c|    c_1|   90|   1|
+-----+-------+-----+----+

緩存

在實際業務中,經常會碰到這種需求:須要把一個計算結果,稍加不一樣的改動,分別存爲不一樣的表。好比,ETL中爲了保證出錯後能重試,就會要求除了保存轉換計算結果以外,還要備份一份到備份表裏。備份表一般是按天分區的,每一個區存當天的轉換計算結果。而應用表則不分區,只存最新一天的計算結果。
在完成這一需求時,若是是先保存應用表,而後再添加分區列後添加到分區表,就會觸發兩次完整的計算流程,時間翻倍。而若是有緩存,就不同了。咱們能夠在計算到最終結果時,緩存一下這張表,而後把它保存爲應用表,再添加分區列保存爲分區表。那麼,實際計算中,到緩存操做爲止的計算,只會觸發一次,實際消耗時間爲1次到最終結果的計算+1次加分區列,遠小於2次計算的時間。當某些中間結果須要反覆使用時,緩存能夠給咱們帶來極大的效率提高。固然,相應地,內存也會佔用更多,仍是應該根據具體狀況決定如何取捨。緩存的方法很簡單,只要讓DataFrame對象執行cache方法就好了:df.cache()

相關文章
相關標籤/搜索