《Spark The Definitive Guide》Chapter 4:結構化API預覽

前言

《Spark 權威指南》學習計劃java

Chapter 4:結構化API預覽

這章開頭就談及會深刻講解一下 Spark 的結構化 API(Structured APIs),具體又分爲三種核心類型的分佈式集合API——Datasets、DataFrames、SQL tables and views,這些APIs用來處理各類數據——非結構化的日誌、半結構化的csv文件和高度結構化的Parquet文件。python

Datasets 和 DataFrames

這個是分佈式的形如數據庫中表的集合,有行和列,每一個列的行數是一致的(行對應的值的缺失能夠用null代替),而且每一個列都有數據類型信息。並且在Spark中Datasets 和 DataFrames 說白了就是對RDD的另外一種封裝,因此一樣是表明着不可變、延遲計算的計劃(懶加載)。當使在DataFrame上執行Action操做時,Spark 執行實際的 Transformation 操做並返回結果。git

而SQL tables和views基本上和DataFrames 相同,只是針對它們執行的是SQL語句,而不是DataFrame 相應API。Chapter 10會說起這些問題github

Schemas 模式

這個就定義了DataFrame的列名、數據類型以及是否容許缺失值和空值,也就是聲明什麼位置上存儲什麼數據。在Chapter 5中會詳細談及算法

Columns 列

你能夠簡單看做DataFrame中的列,它能夠是一個簡單類型(integer、string),也能夠是複雜類型(array、map),或者是nullsql

Rows 行

Row就是DataFrame中的一行記錄,如圖顯示的Row類型數據庫

Row 類型是Spark 對DataFrame優化的數據格式的內部表示,能夠執行高效計算。它不使用JVM 類型,就沒有了GC 垃圾收集和對象實例化成本

對比 Datasets 和 DataFrames

DataFrame早期是叫SchemaRDD,spark 1.3以後改進爲df,可見df就是加上了Schema的RDD,但又進一步提高了執行效率,減小了數據讀取(針對一些特別的數據格式,像ORC、RCFile、Parquet,能夠根據數據文件中附帶的統計信息來進行選擇性讀取)以及優化了執行計劃(就是前面談過的邏輯計劃的優化)。apache

spark 1.6引入了DataSet,DataFrame=DataSet[Row]也能看出DataFrame是Row 類型的DataSet,DataSet能夠當作一條記錄的df的特例,主要區別是Dataset每個record存儲的是一個強類型值而不是一個Row,其次DataSet徹底是面向對象的使用相關RDD算子編程,還有就是DataSet只支持Java、Scala。編程

相互轉換:bash

  • DataFrame->DataSet:df.as[ElementType]
  • DataSet->DataFrame:ds.toDF(colName)

RDD和DataFrame的一點區別

RDD:
	java/scala->運行在jvm上
	python->運行在python運行環境上
	(因此不一樣語言運行spark程序效率可能不一樣)
DataFrame:
	java/scala/python->轉換爲logical plan,運行效率是同樣的
	DataFrame相比RDD還有更高級的API
複製代碼

這是由於Spark 在內部進行了優化,使用了一種叫Catalyst的引擎,會在計劃和處理計算任務的過程當中維護本身的數據類型信息。Spark 類型會映射到Spark維護的不一樣語言的API,在Scala、Java、Python、R上都存在一個查找表,因此不管使用哪一種語言的結構化API,都會嚴格地使用Spark 類型,因此性能同樣,效率一致。在Chapter 5中會說起

Spark Types(Spark內置類型)

很好理解就是SPark將一般講的哪些Integer、String給包裝成了內置類型IntegerType、StringType,只要記住都在org.apache.spark.sql.types包下,以下是scala中申明字節類型

# scala寫法
import org.apache.spark.sql.types._
val b = ByteType
# java寫法
import org.apache.spark.sql.types.DataTypes;
ByteType x = DataTypes.ByteType;
# python寫法
from pyspark.sql.types import *
b = ByteType()
複製代碼

深度截圖_選擇區域_20190525141604.png
看書看到這裏你可能會不清楚這個Spark Type有啥用,其實在大體瀏覽過一邊會發如今RDD轉DataFrame時會用到這個Spark Type,記着就是了。想了解的話能夠參見: Chapter 5——Schemas模式

結構化API執行流程預覽

總的來講分以下幾步:

  1. 寫出正確的 DataFrame/Dataset/SQL 代碼
  2. 若是代碼沒錯,spark會把它轉化爲邏輯執行計劃
  3. spark 將邏輯計劃進行優化,並轉化爲物理執行計劃
  4. spark在集羣上執行這個物理計劃(基於系列RDD操做)

20190521215220419_1184038790.png

理解這個有助於編碼和調試bug,Catalyst Optimizer(優化器)Catalyst是上面說過的Spark SQL引擎,具體的流程以下

邏輯計劃

20190521221542470_1092846121.png
如圖,是將代碼轉化爲邏輯執行計劃的流程,這只是抽象的轉化不涉及Executor(執行器)和Driver(驅動程序)。首先,把你寫的代碼轉換成 unresolved logical plan(未解決的邏輯計劃),由於代碼中可能會有不存在的表名、列名。而後經過catalog來解析(resolve)分析器中的列和表,這裏的catalog存儲了全部的表和DataFrame的信息。若是代碼中有不存在的表名、列名,分析器會拒絕 unresolved logical plan,反之成爲 resolved logical plan(解決的邏輯計劃)交付給 Catalyst Optimizer 進行優化。Catalyst Optimizer 是一組規則的集合,經過謂詞下推(pushing down predicates)或者投影(selections)來嘗試優化邏輯計劃。

物理計劃

20190521222849896_391396741.png

如圖,成功轉化爲邏輯執行計劃後就開始轉化爲物理計劃了,spark會嘗試生成多個不一樣的物理計劃,而後經過一個代價模型(cost model)來比較開銷成本,從中選出最優的一個物理計劃在集羣上運行。書上給了一個代價比較的例子:經過查看所給表的物理屬性,好比表的大小、分區的大小。 最終物理計劃的結果是一些列RDDs和transformations(個人理解是轉換操做,對應轉換算子)。

執行

在選擇了物理計劃時,spark經過底層(lower-level)編程接口也運行了這些基於RDDs的代碼,運行時還會進一步優化,生成本地的java字節碼,能夠在執行階段移除整個tasks和stages(沒搞懂這說的是啥),最終返回結果給你


收錄於此:josonle/Spark-The-Definitive-Guide-Learning

更多推薦: Coding Now

學習記錄的一些筆記,以及所看得一些電子書eBooks、視頻資源和日常收納的一些本身認爲比較好的博客、網站、工具。涉及大數據幾大組件、Python機器學習和數據分析、Linux、操做系統、算法、網絡等

相關文章
相關標籤/搜索