Spark SQL inferSchema實現原理探微(Python)

使用Spark SQL的基礎是「註冊」(Register)若干表,表的一個重要組成部分就是模式,Spark SQL提供兩種選項供用戶選擇:
 
(1)applySchema
 
 
applySchema的方式須要用戶編碼顯示指定模式,優勢:數據類型明確,缺點:多表時有必定的代碼工做量。
 
(2)inferSchema
 
 
inferSchema的方式無需用戶編碼顯示指定模式,而是系統自動推斷模式,代碼比較簡潔,但既然是推斷,就可能出現推斷錯誤(即與用戶指望的數據類型不匹配的狀況),因此咱們須要對其推斷過程有清晰的認識,才能在實際應用中更好的應用。
 
本文僅僅針對Python(spark-1.5.1)進行介紹,推斷過程是依賴SQLContext(HiveContext是SQLContext的子類) inferSchema實現的:
 
 
SQLContext inferSchema已經在1.3版本中被棄用,取而代之的是createDataFrame,inferSchema仍然能夠在1.5.1版本中被使用,其實際執行過程就是SQLContext createDataFrame,這裏須要注意一個參數samplingRation,它的默認值爲None,後續會討論它的具體做用。
 
 
這裏咱們僅僅考慮從RDD推斷數據類型的狀況,也就是isinstance(data, RDD)爲True的狀況,代碼執行流程轉入SQLContext _createFromRDD:
 
 
從上述的代碼調用邏輯能夠看出,schema爲None,代碼執行流程轉入SQLContext _inferSchema:
 
 
SQLContext _inferSchema的主要流程大體分爲三步:
 
第一步:獲取RDD的第一行記錄first,並且要求first不能爲空值(注意不是None);
第二步:若是first的類型爲「dict」,會輸出一條警告信息:推斷模式時建議RDD的元素類型爲Row(pyspark.sql.Row),dict已被棄用;
第三步:若是samplingRatio爲None,則直接使用first(也就是RDD的第一條記錄)推斷模式;若是samplingRation不爲None,則根據該值「篩選」數據推斷模式。
 
咱們將着重介紹第三步的實現邏輯。
 
1. samplingRatio is None
 
 
 
_infer_schema使用一行記錄row(也就是RDD的第一行記錄)推斷模式,大體分爲四個步驟:
 
(1)若是記錄row的數據類型爲dict;
 
 
 
由此咱們能夠得出items實際就是一個鍵值對列表,其中鍵值對也能夠理解爲(列名,列值);之因此要進行排序操做(sorted)是爲了保證列名順序的一致性(dict.items()並不負責返回的列表元素順序)。
 
(2)若是記錄row的數據類型爲tuple或list,能夠細分爲三種狀況:
 
a. row的數據類型爲Row,模擬處理過程:
 
 
 
b. row的數據類型爲namedtuple,模擬處理過程:
 
 
 
c. row的數據類型爲其它(tuple or tuple),模擬處理過程:
 
 
 
(3)若是記錄row的數據類型爲object;
 
 
 
由(1)、(2)、(3)能夠看出,它們最終的邏輯是一致的,就是將記錄row轉換爲一個鍵值對列表;若是(1)、(2)、(3)均不匹配,則認爲沒法推斷,拋出異常便可。
 
(4)建立模式(StructType)
 
items中的每個鍵值對會對應着造成一個StructField,StructField用於描述一個列的模式,它接收三個參數:列名、列類型、能否包含None;列名就是「鍵」,列類型則須要根據「值」推斷(_infer_type),這裏默認設置能夠包含None。
 
迭代items中的這些鍵值對會造成一個StructField列表,最後經過StructType建立模式。
 
這是根據RDD的一行記錄建立模式的過程,這其中尚未涉及具體的數據類型是如何被推斷的,咱們還須要看一下_infer_type:
 
 
_infer_type就是根據傳入的obj來推斷類型的,返回值爲類型實例,須要處理如下六種狀況:
 
(1)若是obj爲None,則類型爲NullType;
(2)真的沒有理解,不解釋;
(3)嘗試根據type(obj)直接從_type_mappings中獲取對應的類型信息dataType,_type_mappings就是一個字典,預先保留着一些Python類型與Spark SQL數據類型的對應關係,以下:
 
 
若是dataType不爲None,則直接返回相應類型的實例便可;須要特殊處理的是DecimalType,考慮到實際數據中可能存在precision和scale不一致的狀況,這裏統一處理爲precision:38,scale:18;若是dataType爲None,則代表obj爲複合數據類型(數組、字典、結構體)。
 
(4)若是obj的數據類型爲dict,咱們須要分別推斷它的鍵類型(遞歸調用_infer_type)、值類型(遞歸調用_infer_type),而後構造MapType實例並返回;
 
推斷鍵、值類型時,僅僅選取某一個鍵值對:它的鍵、值均不爲None,若是存在多個這樣的鍵值對,則選取是隨機的,取決於dict.items();若是找不到這樣的鍵值對,則認爲鍵、值的類型均爲NullType。
 
(5)若是obj的數據類型爲list或array,則選取其中某一個不爲None的元素推斷其類型(遞歸調用_infer_type);若是找不到不爲None的元素,則認爲元素類型爲NullType;最後構造ArrayType實例並返回;
 
(6)若是(1)、(2)、(3)、(4)、(5)均沒法完成推斷,則咱們認爲obj可能(僅僅是可能)是一個結構體類型(StructType),使用_infer_schema推斷其類型;
 
2. samplingRatio is not None
 
samplingRatio爲None時,則僅僅選取RDD的第一行記錄參與推斷,這就對這一行記錄的「質量」提出很高的要求,某些狀況下它沒法表明全局,此時咱們能夠經過顯示設置samplingRatio,「篩選」足夠多的數據參與推斷過程。
 
若是samplingRatio的值小於0.99,則使用RDD sample API根據samplingRatio「篩選」部分數據(rdd)參與推斷;不然整個RDD(rdd)的全部記錄參與推斷。
 
推斷過程能夠簡單理解爲兩步:
 
(1)對於RDD中的每一行記錄經過方法_infer_schema推斷出一個類型(map);
(2)將這些類型進行聚合(reduce)。
 
咱們着重看一下聚合的實現邏輯:
 
 
聚合的實現邏輯由方法_merge_type完成,須要處理六種狀況:
 
(1)若是a是NullType的實例,則返回b的類型;
(2)若是a不是NullType的實例,b是NullType的實例,則返回a的類型;
(3)若是a和b的類型不相同,則拋出異常;
 
如下處理過程基於a和b的類型相同。
 
(4)若是a的類型爲StructType(結構體),則以a中的各個元素爲模板合併類型(遞歸調用_merge_type),並追加b-a(差集)的元素(類型);
(5)若是a的類型爲ArrayType(數組),則合併(遞歸調用_merge_type)二者的元素類型便可;
(6)若是a的類型爲MapType(字典),則須要分別合併二者的鍵類型(遞歸調用_merge_type)、值類型(遞歸調用_merge_type)。
 
我的以爲目前的類型聚合邏輯過於簡單,實際使用意義不大。
相關文章
相關標籤/搜索