Spark SQL編程指南(Python)

前言
 
Spark SQL容許咱們在Spark環境中使用SQL或者Hive SQL執行關係型查詢。它的核心是一個特殊類型的Spark RDD:SchemaRDD。
 
SchemaRDD相似於傳統關係型數據庫的一張表,由兩部分組成:
 
Rows:數據行對象
Schema:數據行模式:列名、列數據類型、列能否爲空等
 
Schema能夠經過四種方式被建立:
 
(1)Existing RDD
(2)Parquet File
(3)JSON Dataset
(4)By running Hive SQL
 
考慮到Parquet File還沒有在平臺開始使用,所以暫時僅討論其它三項。
 
注意: Spark SQL is currently an alpha component.
 
SQLContext(HiveContext)
 
Spark SQL的入口點爲SQLContext,SQLContext的初始化依賴於SparkContext,代碼示例以下:
 
 
SQLContext目前僅僅使用一個簡單的SQL解析器,功能有限,並且目前不少的數據倉庫是創建在Hive之上的,所以Spark爲咱們提供了另外一個選擇:HiveContext。
 
HiveContext使用相對比較完善的HiveQL解析器,可使用HiveUDF,能夠訪問現有Hive數據倉庫中的數據,且適配SQLContext的全部數據源,推薦使用。
 
HiveContext初始化過程類似,以下:
 
 
數據源
 
Spark SQL(SchemaRDD)的數據源能夠簡單理解爲就是普通的Spark RDD,全部能夠應用於Spark RDD的操做都可以應用於SchemaRDD;此外,SchemaRDD還能夠「註冊」爲一張臨時表,而後經過SQL(Hive SQL)分析其中的數據(實際就是Spark RDD關聯的數據)。
 
SchemaRDD
 
SchemaRDD的數據源實際就是Spark RDD,可是Spark RDD與SchemaRDD仍是有區別的,Spark RDD相對於SchemaRDD而言缺失「Schema」,所以Spark提供兩種方式完成Spark RDD到SchemaRDD的轉換,實際就是爲Spark RDD應用「Schema」。
 
(1)使用反射推斷Schema
 
若是一個Spark RDD的數據類型爲Row,則Spark能夠經過反射推斷出該Spark RDD的Schema,並將其轉換爲一個SchemaRDD。
 
Spark使用反射推斷某個Spark RDD的Schema時,僅僅使用這個Spark RDD的第一條數據(Row),所以必須保證這條數據的完整性。
 
Row的構建過程須要一個鍵值對列表,
 
Row(id = 1, name = "a", age = 28)
 
這個鍵值對列表已經明肯定義出數據行的列名、列值,推斷僅做用於列類型。
 
代碼示例
 
 
處理邏輯能夠分爲如下幾步:
 
a. 建立一個字符串列表datas,用於模擬數據源;
b. 對datas執行「parallelize」操做,將其轉換爲Spark RDD source,數據類型爲字符串;
c. 將Spark RDD source中的每一條數據進行切片(split)後轉換爲Spark RDD rows,數據類型爲Row;
 
至此Spark RDD rows已經具有轉換爲SchemaRDD的條件:它的數據類型爲Row。
 
d. 使用HiveContext推斷rows的Schema,將其轉換爲SchemaRDD people;
 
經過people.printSchema(),咱們能夠查看推斷Schema的結果:
 
 
e. 將SchemaRDD people註冊爲一張臨時表「people」;
 
f. 執行SQL查詢語句:select * from people where age > 28 and age < 30,並將查詢結果保存至Spark RDD results,經過results.printSchema()的輸出結果:
 
 
能夠看出Spark RDD results實際也是SchemaRDD,所以咱們能夠繼續將其註冊爲一張臨時表;
 
g. 將SchemaRDD results註冊爲一張臨時表「people」,並執行SQL查詢語句:select name from people2,並將查詢結果保存至Spark RDD results2,經過f咱們能夠知道results2實際也是SchemaRDD,results2.printSchema()的輸出結果:
 
 
SchemaRDD results2的數據類型爲Row,受到查詢語句(select name)的影響,其僅包含一列數據,列名爲name。
 
h. SchemaRDD也能夠執行全部Spark RDD的操做,這裏咱們經過map將results2中的name值轉換爲大寫形式,最終的輸出結果:
 
 
上述示例說明如下三點:
 
a. 咱們能夠將一個數據類型爲Row的Spark RDD轉換爲一個SchemaRDD;
 
b. SchemaRDD能夠註冊爲一張臨時表執行SQL查詢語句,其查詢結果也是一個SchemaRDD;
 
c. SchemaRDD能夠執行全部Spark RDD的操做。
 
(2)經過編碼指定Schema
 
使用反射推斷Schema的方式要求咱們必須可以構建一個數據類型爲Row的Spark RDD,而後再將其轉換爲SchemaRDD;某些狀況下咱們可能須要更爲靈活的方式控制SchemaRDD構建過程,這正是經過編碼指定Schema的意義所在。
 
經過編碼指定Schema分爲三步:
 
a. 構建一個數據類型爲tuple或list的Spark RDD;
b. 構建Schema,須要匹配a中的tuple或list;
c.將b中的Schema應用於a中的Spark RDD。
 
代碼示例
 
 
代碼處理邏輯正好對應着上述三步,最終的輸出結果:
 
 
其中須要注意id、age的數據類型被聲明爲IntegerType,所以數據源(字符串)中的數據須要作強制類型轉換處理。
 
JSON Datasets
 
Spark可以自動推斷出Json數據集的「數據模式」(Schema),並將它加載爲一個SchemaRDD實例。這種「自動」的行爲是經過下述兩種方法實現的:
 
jsonFile:從一個文件目錄中加載數據,這個目錄中的文件的每一行均爲一個JSON字符串(若是JSON字符串「跨行」,則可能致使解析錯誤);
 
jsonRDD:從一個已經存在的RDD中加載數據,這個RDD中的每個元素均爲一個JSON字符串;
 
代碼示例
 
 
能夠得出如下兩點:
 
a. 若是數據輸入是JSON字符串的文本文件,咱們能夠直接使用jsonFile構建Spark RDD,實際就是SchemaRDD;
 
b. 若是某個Spark RDD的數據類型是字符串,且字符串均是JSON格式的字符串形式,則可使用jsonRDD將其轉換爲一個SchemaRDD。
 
Hive Tables
 
Hive Tables已是「表」,所以咱們無需建立或轉換,直接使用SQL查詢便可。
 
官方代碼示例
 
 
Hive UDF(Register Function)
 
Spark SQL使用HiveContext時能夠支持Hive UDF,這裏的UFD包含Hive自己內建的UDF,也包括咱們本身擴展的UDF(實測Spark-1.2.0-cdh5.3.2版本下沒法正常使用本身擴展的UDF(Permanent Function),已經過擴展源碼修復)。
 
這裏重點介紹Spark SQL的Register Function,也就是說能夠動態建立函數用於SQL查詢,其實際做用相似於Hive UDF。
 
代碼示例
 
 
代碼的處理邏輯與前大致相似,即首先經過編碼建立SchemaRDD people,而後將其註冊爲一張表(注意這裏使用了另外一種方式:HiveContext registerRDDAsTable),最後執行查詢語句並打印結果。
 
特別的是查詢語句中使用到了一個名爲「myfunc」的自定義SQL函數,而這個函數並非預先存在的(如Hive UDF),它是在咱們應用的運行期間被動態建立並註冊的,註冊過程使用到了HiveContext registerFunction。
 
對於Python而言,自定義函數的建立過程實際可分爲兩步:
 
(1)定義Python Function;
(2)將(1)中定義好的Python Function註冊爲SQL函數,註冊時的命名可與Function的名稱不一樣。
 
也可使用Lambda表達式將定義Function與註冊過程同時完成,如上述示例。
 
咱們自定義的SQL函數能夠與Hive UDF共同使用,以下示例:
 
 
 
其中func.iptolocationbysina是Hive UDF(Permanent Function),mychange是自定義SQL函數。
 
從上面的兩個示例能夠看出,自定義SQL函數遠比Hive UDF靈活。Hive UDF的建立過程比較複雜,須要使用Java語言完成編碼並部署爲jar,且在使用函數以前須要以temporaty function或permanent function的形式存在,每一次Hive UDF的更新都須要從新編碼並更新jar;而自定義SQL函數是運行期間動態建立的,而使用Python編碼時Function的建立及更新很是簡便,推薦使用。
 
總結
 
Spark SQL爲咱們提供了強大的數據分析能力,主要體如今如下三個方面:
 
(1)Spark RDD能夠經過反射推斷Schema或編碼指定Schema的方式轉換爲SchemaRDD,將SchemaRDD建立爲「數據表」以後,容許咱們以SQL語句的形式分析數據,節約大量編碼工做量;
(2)Spark SQL容許咱們在應用運行期間根據需求動態建立自定義SQL函數,擴充SQL的數據處理能力;
(3)SchemaRDD能夠執行全部Spark RDD的操做,若是SQL沒法表述咱們的計算邏輯時,咱們能夠經過Spark RDD豐富的API完成。
相關文章
相關標籤/搜索