使用Python的Mock庫進行PySpark單元測試

測試是軟件開發中的基礎工做,它常常被數據開發者忽視,可是它很重要。在本文中會展現如何使用Python的uniittest.mock庫對一段PySpark代碼進行測試。筆者會從數據科學家的視角來進行描述,這意味着本文將不會深刻某些軟件開發的細節。html

本文連接:http://www.javashuo.com/article/p-rknyxokw-c.htmlpython

英文原文:Stop mocking me! Unit tests in PySpark using Python’s mock librarylinux

單元測試和mock是什麼?

單元測試是一種測試代碼片斷的方式,確保代碼片斷按預期工做。Python中的uniittest.mock庫,容許人們將部分代碼替換爲mock對象,並對人們使用這些mock對象的方式進行斷言。「mock」的功能如名字所示——它模仿代碼中的對象/變量的屬性。sql

最終目標:測試spark.sql(query)

PySpark中最簡單的建立dataframe的方式以下:session

df = spark.sql("SELECT * FROM table")

雖然它很簡單,但依然應該被測試。app

準備代碼和問題

假設咱們爲一家電子商務服裝公司服務,咱們的目標是建立產品類似度表,用某些條件過濾數據,把它們寫入到HDFS中。ide

假設咱們有以下的表:函數

1. Products. Columns: 「item_id」, 「category_id」.
2. Product_similarity (unfiltered). Columns: 「item_id_1」, 「item_id_2」, 「similarity_score」.

(假設Product_similarity中的類似度分數在0~1之間,越接近1,就越類似。)單元測試

查看一對產品和它們的類似度分數是很簡單的:測試

SELECT
s.item_id_1,
s.item_id_2,
s.similarity_score
FROM product_similarity s
WHERE s.item_id_1 != s.item_id_2

where子句將和自身對比的項目移除。不然的話會獲得分數爲1的結果,沒有意義!

要是咱們想要建立一個展現相同目錄下的產品的類似度的表呢?要是咱們不關心鞋子和圍巾的類似度,可是想要比較不一樣的鞋子與鞋子、圍巾與圍巾呢?這會有點複雜,須要咱們鏈接「product」和「product_similarity」兩個表。

查詢語句變爲:

SELECT
  s.item_id_1,
  s.item_id_2,
  s.similarity_score
FROM product_similarity s
INNER JOIN products p
ON s.item_id_1 = p.item_id
INNER JOIN products q
ON s.item_id_2 = q.item_id
WHERE s.item_id_1 != s.item_id_2
AND p.category_id = q.category_i

咱們也可能想得知與每一個產品最類似的N個其它項目,在該狀況下,查詢語句爲:

SELECT
    s.item_id_1,
    s.item_id_2,
    s.similarity_score
FROM (
    SELECT
        s.item_id_1,
        s.item_id_2,
        s.similarity_score,
        ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
    FROM product_similarity s
    INNER JOIN products p
    ON s.item_id_1 = p.item_id
    INNER JOIN products q
    ON s.item_id_2 = q.item_id
    WHERE s.item_id_1 != s.item_id_2
    AND p.category_id = q.category_id
)
WHERE row_num <= 10

(假設N=10)

如今,要是咱們但願跨產品目錄比較和在產品目錄內比較兩種功能成爲一個可選項呢?咱們能夠經過使用名爲same_category的布爾變量,它會控制一個字符串變量same_category_q的值,並將其傳入查詢語句(經過.format())。若是same_category爲True,則same_category_q中爲inner join的內容,反之,則爲空。查詢語句以下:

'''
SELECT
  s.item_id_1,
  s.item_id_2,
  s.similarity_score
FROM product_similarity s
{same_category_q}
'''.format(same_category_q='') # Depends on value of same_category boolean

(譯註:Python 3.6以上可使用f-Strings代替format)

讓咱們把它寫得更清楚點,用function包裝一下,

def make_query(same_category, table_paths): 

    if same_category is True:
        same_category_q = '''
INNER JOIN {product_table} p
ON s.item_id_1 = p.item_id
INNER JOIN {product_table} q
ON s.item_id_2 = q.item_id
WHERE item_id_1 != item_id_2
AND p.category_id = q.category_id
'''.format(product_table=table_paths["products"]["table"])
    else:
        same_category_q = ''

return same_category_q

到目前爲止,很不錯。咱們輸出了same_category_q,所以能夠經過測試來確保它確實返回了所需的值。

回憶咱們的目標,咱們須要將dataframe寫入HDFS,咱們能夠經過以下方法來測試函數:

def create_new_table(spark, table_paths, params, same_category_q):

    similarity_table = table_paths["product_similarity"]["table"]

    created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
                                                        same_category_q=same_category_q,
                                                        num_items=params["num_items"]))

    # Write table to some path
    created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
format="orc", mode="Overwrite")

添加查詢的第一部分和一個主方法,完成咱們的腳本,獲得:

import pyspark
from pyspark.sql import SparkSession

create_table_query = '''
SELECT
    item_id_1,
    item_id_2
FROM (
    SELECT
        item_id_1,
        item_id_2,
        ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
    FROM {similarity_table} s
    {same_category_q}
    )
WHERE row_num <= {num_items}
'''

def create_new_table(spark, table_paths, params, from_date, to_date, same_category_q):

    similarity_table = table_paths["product_similarity"]["table"]

    created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
                                                        same_category_q=same_category_q,
                                                        num_items=params["num_items"]))

    # Write table to some path
    created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
                                          format="orc", mode="Overwrite")


def make_query(same_category, table_paths): 

    if same_category is True:
        same_category_q = '''
INNER JOIN {product_table} p
ON s.item_id_1 = p.item_id
INNER JOIN {product_table} q
ON s.item_id_2 = q.item_id
WHERE item_id_1 != item_id_2
AND p.category_id = q.category_id
'''.format(product_table=table_paths["product_table"]["table"])
    else:
        same_category_q = ''

    return same_category_q
  
if __name__ == "__main__":
   
    spark = (SparkSession
             .builder
             .appName("testing_tutorial")
             .enableHiveSupport()
             .getOrCreate())

    same_category = True # or False
    table_paths = foo # Assume paths are in some JSON 
    params = bar
    
    same_category_q, target_join_q = make_query(same_category, table_paths)
create_new_table(spark, table_paths, params, same_category_q)

這裏的想法是,咱們須要建立爲腳本中的每一個函數建立function,名字通常是test_name_of_function()。須要經過斷言來驗證function的行爲是否符合預期。

測試查詢-make_query

首先,測試make_query。make_query有兩個輸入參數:一個布爾變量和某些表路徑。它會基於布爾變量same_category返回不一樣的same_category_q。咱們作的事情有點像是一個if-then語句集:

1. If same_category is True, then same_category_q = 「INNER JOIN …」
2. If same_category is False, then same_category_q = 「」 (empty)

咱們要作的是模擬make_query的參數,把它們傳遞給function,接下來測試是否獲得指望的輸出。由於test_paths是個目錄,咱們無需模擬它。測試腳本以下,說明見註釋:

def test_make_query_true(mocker):

    # Create some fake table paths
    test_paths = {
        "product_table": {
            "table": "products",
        },
        "similarity_table": {
            "table": "product_similarity"
        }
    }

    # Call the function with our paths and "True"
    same_category_q = make_query(True, test_paths)
    # We want same_category_q to be non-empty
    assert same_category_q != ''

def test_make_query_false(mocker):

    # As above, create some fake paths
    test_paths = {
        "product_table": {
            "table": "products",
        },
        "similarity_table": {
            "table": "product_similarity"
        }
    }

    same_category_q = make_query(False, test_paths)
    # This time, we want same_category_q to be empty
assert same_category_q == ''

就是這麼簡單!

測試表建立

下一步,咱們須要測試create_new_table的行爲。逐步觀察function,咱們能夠看到它作了幾件事,有幾個地方能夠進行斷言和模擬。注意,不管什麼時候,只要程序中有某些相似df.write.save.something.anotherthing的內容,咱們就須要模擬每一個操做和它們的輸出。

  1. 這個function使用spark做爲參數,這須要被模擬。
  2. 經過調用spark.sql(create_table_query.format(**some_args))來建立created_table。咱們須要斷言spark.sql()只被調用了一次。咱們也須要模擬spark.sql()的輸出。
  3. Coalesce created_table。保證調用coalesce()時的參數是1。模擬輸出。
  4. 寫coalesced table,咱們須要模擬.write,模擬調用它的輸出。
  5. 將coalesced table保存到一個路徑。確保它的調用伴隨着正確的參數。

和前面同樣,測試腳本以下:

ef test_create_new_table(mocker):

    # Mock all our variables
    mock_spark = mock.Mock()
    mock_category_q = mock.Mock()
    mock_created_table = mock.Mock()
    mock_created_table_coalesced = mock.Mock()
    # Calling spark.sql with create_table_query returns created_table - we need to mock it
    mock_spark.sql.side_effect = [mock_created_table]
    # Mock the output of calling .coalesce on created_table
    mock_created_table.coalesce.return_value = mock_created_table_coalesced
    # Mock the .write as well
    mock_write = mock.Mock()
    # Mock the output of calling .write on the coalesced created table
    mock_created_table_coalesced.write = mock_write

    test_paths = {
        "product_table": {
            "table": "products",
        },
        "similarity_table": {
            "table": "product_similarity"
        },
        "created_table": {
          "path": "path_to_table",
        }
    }
    test_params = {
        "num_items": 10,
    }

    # Call our function with our mocks
    create_new_table(mock_spark, test_paths, test_params, mock_category_q)
    # We only want spark.sql to have been called once, so assert that
    assert 1 == mock_spark.sql.call_count
    # Assert that we did in fact call created_table.coalesce(1)
    mock_created_table.coalesce.assert_called_with(1)
    # Assert that the table save path was passed in properly
    mock_write.save.assert_called_with(test_paths["created_table"]["path"],
format="orc", mode="Overwrite")

最後,把每樣東西保存在一個文件夾中,若是你想的話,你須要從相應的模塊中導入function,或者把全部東西放在同一個腳本中。

爲了測試它,在命令行導航到你的文件夾(cd xxx),而後執行:

python -m pytest final_test.py.

你能夠看到相似下面的輸出,

serena@Comp-205:~/workspace$ python -m pytest testing_tutorial.py
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/serena/workspace/Personal,
inifile: plugins: mock-1.10.0 collected 3 items testing_tutorial.py ...
[100%]
=========================== 3 passed in 0.01 seconds ===========================

結語

以上是所有內容。但願你以爲有所幫助。當我試圖弄明白如何mock的時候,我但願能夠遇到相似這樣一篇文章。

如今就去作吧,就像Stewie所說的那樣,(don’t) stop mocking me (functions)!

相關文章
相關標籤/搜索