個人Spark SQL單元測試實踐

最近加入一個Spark項目,做爲臨時的開發人員協助進行開發工做。該項目中不存在測試的概念,開發人員按需求進行編碼工做後,直接向生產系統部署,再由需求的提出者在生產系統檢驗程序運行結果的正確性。在這種原始的工做方式下,產品經理和開發人員老是在生產系統驗證本身的需求、代碼。能夠想見,各類直接交給用戶的錯誤致使了一系列的事故和不信任。爲了處理各種線上問題,你們都疲於奔命。當工做進行到後期,每個相關人都已經意氣消沉,經常對工做避之不及。html

爲了改善局面,我嘗試了重構部分代碼,將連篇的SQL分散到不一樣的方法裏,並對單個方法構建單元測試。目的是,在編碼完成後,首先在本地執行單元測試,以實現:python

  1. 部署到生產系統的代碼中無SQL語法錯誤。
  2. 將已出現的bug寫入測試用例,避免反覆出現相同的bug。
  3. 提早發現一些錯誤,減小影響到後續環節的問題。
  4. 經過自動化減小開發和程序問題處理的總時間花費。
  5. 經過流程和結果的改善,減小開發人員的思惟負擔,增長與其餘相關人的互信。

本文將介紹個人Spark單元測試實踐,供你們參考、批評。git

本文中的Spark API是PySpark,測試框架爲pytest。github

對於但願將本文看成單元測試教程使用的讀者,本文會假定讀者已經準備好了開發和測試所須要的環境。若是沒有也沒有關係,文末的參考部分會包含一些配置環境相關的連接。sql

 

本文連接:http://www.javashuo.com/article/p-aomxummh-o.html數據庫

原創內容,轉載請註明windows

概念

定義

單元測試是一種測試方法,它的對象是單個程序單元/組件,目的是驗證軟件的每一個組件都符合設計要求。服務器

單元是軟件中最小的可測試部分。它一般包含一些輸入和單一的輸出。session

本文中的單元就是python函數(function)。app

單元測試一般是程序開發人員的工做。

原則

爲了實現單元測試,函數最好符合一個條件,

  • 對於相同的輸入,函數總有相同的輸出。

這要求函數的輸出結果不依賴內外部狀態。

它的輸出結果的肯定不該該依賴輸入參數外的任何內容,例如,不能夠由於本地測試環境中沒有相應的數據庫就產生「鏈接數據庫異常」致使沒法返回結果。若是是類方法的話,也不能夠依據一個可能被改變的類屬性來決定輸出。

同時,函數內部不能存在「反作用」。它不該該改變除了返回結果之外的任何內容,例如,不能夠改變全局可變狀態。

知足以上條件的函數,能夠被稱爲「純函數」。

代碼實踐

下面是數據和程序部分。

數據

假設咱們的服務對象是一家水果運銷公司,公司在不一樣城市設有倉庫,現有三張表,其中inventory包含水果的總庫存數量信息,inventory_ratio包含水果在不一樣城市的應有比例,

目標是根據總庫存數量和比例算出水果在各地的庫存,寫入到第三張表inventory_city中。三張表的列以下,

1. inventory. Columns: 「item」, 「qty」.
2. inventory_ratio. Columns: 「item」, 「city」, 「ratio」.
3. inventory_city. Columns: 「item」, 「city」, 「qty」.

初版代碼

用最直接的方式實現這一功能,代碼將是,

from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

 

這段代碼能夠實現計算各城市庫存的需求,但測試起來會不太容易。特別是若是將來咱們還要在這個程序中增長其餘邏輯的話,不一樣的邏輯混雜在一塊兒後,測試和修改都會變得麻煩。

因此,在下一步,咱們要將部分代碼封裝到一個函數中。

有反作用的函數

建立一個名爲get_inventory_city的函數,將代碼包含在內,

from pyspark.sql import SparkSession

def get_inventory_city():
    
    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
    
    result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''')
    result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()

顯然,這是一個不太易於測試的函數,由於它,

  • 沒有輸入輸出參數,不能直接根據給定數據檢驗運行結果。
  • 包含對數據庫的讀/寫,這意味着它要依賴外部數據庫。
  • 包含對spark session的獲取/建立,這和計算庫存的邏輯也毫無關係。

咱們把這些函數中的多餘的東西稱爲反作用。反作用和函數的核心邏輯糾纏在一塊兒,使單元測試變得困難,也不利於代碼的模塊化。

咱們必須另外管理反作用,只在函數內部保留純邏輯

無反作用的函數

按照上文中提到的原則,從新設計函數,能夠獲得,

from pyspark.sql import SparkSession, DataFrame

def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame):

    inventory.createOrReplaceTempView('v_inventory')
    ratio.createOrReplaceTempView('v_ratio')

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    return result

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    inventory = spark.sql('''select * from inventory''')
    ratio     = spark.sql('''select * from inventory_ratio''')

    result = get_inventory_city(spark, inventory, ratio)

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

修改後的函數get_inventory_city有3個輸入參數和1個返回參數,函數內部已經再也不包含對spark session和數據庫表的處理,這意味着對於肯定的輸入值,它總會輸出不變的結果。

這比以前的設計更加理想,由於函數只包含純邏輯,因此調用者使用它時不會再受到反作用的干擾,這使得函數的可測試性和可組合性獲得了提升。

測試代碼

建立一個test_data目錄,將csv格式的測試數據保存到裏面。測試數據的來源能夠是手工模擬製做,也能夠是生產環境導出。

而後建立測試文件,添加代碼,

from inventory import get_inventory_city
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

def test_get_inventory_city():

    #導入測試數據
    inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv")
    ratio     = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv")

    #執行函數
    result = get_inventory_city(spark, inventory, ratio)

    #驗證拆分後的總數量等於拆分前的總數量
    result.createOrReplaceTempView('v_result')
    inventory.createOrReplaceTempView('v_inventory')

    qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''')
    qty_after_split  = spark.sql('''select sum(qty) as qty from v_result''')

    assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']

執行測試,能夠看到如下輸出內容

============================= test session starts =============================
platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item

test_get_inventory_city.py .2019-03-21 14:16:24 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
                                             [100%]
========================= 1 passed in 18.06 seconds ==========================

這樣一個單元測試例子就完成了。

相比把程序放到服務器測試,單元測試的運行速度更快,開發者不用再擔憂測試會對生產做業和用戶形成影響,也能夠更早發如今編碼期間犯下的錯誤。它也能夠成爲自動化測試的基礎。

待解決的問題

目前我已經能夠在項目中構建初步的單元測試,但依然面臨着一些問題。

運行時間

上面這個簡單的測試示例在個人聯想T470筆記本上須要花費18.06秒執行完成,而實際項目中的程序的複雜度要更高,執行時間也更長。執行時間過長一件糟糕的事情,由於單元測試的執行花費越大,就會越被開發者拒斥。面對顯示器等待單元測試執行完成的時間是難捱的。雖然相比於把程序丟到生產系統中執行,這種單元測試模式已經能夠節約很多時間,但還不夠好。

接下來可能會嘗試的解決辦法:提高電腦配置/改變測試數據的導入方式。

有效範圍

在生產實踐中構建純函數是一件不太容易的事情,它對開發者的設計和編碼能力有至關的要求。

單元測試雖然能幫助發現一些問題和肯定問題代碼範圍,但它彷佛並不能揭示錯誤的緣由。只靠單元測試,不能徹底證實代碼的正確性。

筆者水平有限,目前寫出的代碼中仍有不少單元測試力所不能及的地方。可能須要在實踐中對它們進行改進,或者引入其它測試手段做爲補充。

參考

一些參考內容。

配置

Getting Started with PySpark on Windows

win10下安裝pyspark

PyCharm中的pytest

pycharm 配置spark 2.2.0

閱讀

函數響應式領域建模

ABAP單元測試最佳實踐

相關文章
相關標籤/搜索