使用Pandas_UDF快速改造Pandas代碼

1. Pandas_UDF介紹

PySpark和Pandas之間改進性能和互操做性的其核心思想是將Apache Arrow做爲序列化格式,以減小PySpark和Pandas之間的開銷。html

Pandas_UDF是在PySpark2.3中新引入的API,由Spark使用Arrow傳輸數據,使用Pandas處理數據。Pandas_UDF是使用關鍵字pandas_udf做爲裝飾器或包裝函數來定義的,不須要額外的配置。目前,有兩種類型的Pandas_UDF,分別是Scalar(標量映射)和Grouped Map(分組映射)。python

1.1 Scalar

Scalar Pandas UDF用於向量化標量操做。經常與select和withColumn等函數一塊兒使用。其中調用的Python函數須要使用pandas.Series做爲輸入並返回一個具備相同長度的pandas.Series。具體執行流程是,Spark將列分紅批,並將每一個批做爲數據的子集進行函數的調用,進而執行panda UDF,最後將結果鏈接在一塊兒。sql

下面的示例展現如何建立一個scalar panda UDF,計算兩列的乘積:apache

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# 聲明函數並建立UDF
def multiply_func(a, b): return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) x = pd.Series([1, 2, 3]) df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF df.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # | 1| # | 4| # | 9| # +-------------------+

1.2 Grouped Map

Grouped map(分組映射)panda udf與groupBy().apply()一塊兒使用,後者實現了「split-apply-combine」模式。「split-apply-combine」包括三個步驟:api

  1. 使用DataFrame.groupBy將數據分紅多個組。
  2. 對每一個分組應用一個函數。函數的輸入和輸出都是pandas.DataFrame。輸入數據包含每一個組的全部行和列。
  3. 將結果合併到一個新的DataFrame中。

要使用groupBy().apply(),須要定義如下內容:app

  • 定義每一個分組的Python計算函數,這裏可使用pandas包或者Python自帶方法。
  • 一個StructType對象或字符串,它定義輸出DataFrame的格式,包括輸出特徵以及特徵類型。

須要注意的是,StructType對象中的Dataframe特徵順序須要與分組中的Python計算函數返回特徵順序保持一致。分佈式

此外,在應用該函數以前,分組中的全部數據都會加載到內存,這可能致使內存不足拋出異常。ide

下面的例子展現瞭如何使用groupby().apply()從組中的每一個值中減去平均:函數

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

1.3 Grouped Aggregate

Grouped aggregate Panda UDF相似於Spark聚合函數。Grouped aggregate Panda UDF經常與groupBy().agg()和pyspark.sql.window一塊兒使用。它定義了來自一個或多個的聚合。級數到標量值,其中每一個pandas.Series表示組或窗口中的一列。性能

須要注意的是,這種類型的UDF不支持部分聚合,組或窗口的全部數據都將加載到內存中。此外,目前只支持Grouped aggregate Pandas UDFs的無界窗口。

下面的例子展現瞭如何使用這種類型的UDF來計算groupBy和窗口操做的平均值:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
    return v.mean()

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

2. 快速使用Pandas_UDF

須要注意的是schema變量裏的字段名稱爲pandas_dfs() 返回的spark dataframe中的字段,字段對應的格式爲符合spark的格式。

這裏,因爲pandas_dfs()功能只是選擇若干特徵,因此沒有涉及到字段變化,具體的字段格式在進入pandas_dfs()以前已經過printSchema()打印。若是在pandas_dfs()中使用了pandas的reset_index()方法,且保存index,那麼須要在schema變量中第一個字段處添加'index'字段及格式(下段代碼註釋內容)。

import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate()
df3 = spark.createDataFrame(
[(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
(18862228190, '/移動終端', '移動終端應用', '移動騰訊視頻', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582),
(18862669710, '/未知類型', '訪問網站', '搜索引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)],
('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class'))


def compute(x):
result = x[['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class',
'start_time', 'end_time']]
return result


schema = StructType([
# StructField("index", DoubleType()),
StructField("online_account", LongType()),
StructField("terminal_type", StringType()),
StructField("action_type", StringType()),
StructField("app", StringType()),
StructField("access_seconds", DoubleType()),
StructField("datetime", StringType()),
StructField("outid", LongType()),
StructField("class", LongType()),
StructField("end_time", TimestampType()),
StructField("start_time", TimestampType()),

])


@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
print('ok')
mid = df.groupby(['online_account']).apply(lambda x: compute(x))
result = pd.DataFrame(mid)
# result.reset_index(inplace=True, drop=False)
return result


df3 = df3.withColumn("end_time", df3['datetime'].cast(TimestampType()))
df3 = df3.withColumn('end_time_convert_seconds', df3['end_time'].cast('long').cast('int'))
time_diff = df3.end_time_convert_seconds - df3.access_seconds
df3 = df3.withColumn('start_time', time_diff.cast('int').cast(TimestampType()))
df3 = df3.drop('end_time_convert_seconds')
df3.printSchema()
aa = df3.groupby(['online_account']).apply(g)
aa.show()

3. 優化Pandas_UDF代碼

在上一小節中,咱們是經過Spark方法進行特徵的處理,而後對處理好的數據應用@pandas_udf裝飾器調用自定義函數。但這樣看起來有些凌亂,所以能夠把這些Spark操做都寫入pandas_udf方法中。

注意:上小節中存在一個字段沒有正確對應的bug,而pandas_udf方法返回的特徵順序要與schema中的字段順序保持一致!

import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate()
df3 = spark.createDataFrame(
    [(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
     (18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
     (18862228190, '/移動終端', '移動終端應用', '移動騰訊視頻', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582),
     (18862669710, '/未知類型', '訪問網站', '搜索引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)],
    ('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class'))


def compute(x):
    x['end_time'] = pd.to_datetime(x['datetime'], errors='coerce', format='%Y-%m-%d')
    x['end_time_convert_seconds'] = pd.to_timedelta(x['end_time']).dt.total_seconds().astype(int)
    x['start_time'] = pd.to_datetime(x['end_time_convert_seconds'] - x['access_seconds'], unit='s')
    x = x.sort_values(by=['start_time'], ascending=True)
    result = x[['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class',
                'start_time', 'end_time']]
    return result


schema = StructType([
    StructField("online_account", LongType()),
    StructField("terminal_type", StringType()),
    StructField("action_type", StringType()),
    StructField("app", StringType()),
    StructField("access_seconds", DoubleType()),
    StructField("datetime", StringType()),
    StructField("outid", LongType()),
    StructField("class", LongType()),
    StructField("start_time", TimestampType()),
    StructField("end_time", TimestampType()),

])


@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    print('ok')
    mid = df.groupby(['online_account']).apply(lambda x: compute(x))
    result = pd.DataFrame(mid)
    return result


df3.printSchema()
aa = df3.groupby(['online_account']).apply(g)
aa.show()

4. Pandas_UDF與toPandas的區別

  • @pandas_udf 建立一個向量化的用戶定義函數(UDF),利用了panda的矢量化特性,是udf的一種更快的替代方案,所以適用於分佈式數據集。
  • toPandas將分佈式spark數據集轉換爲pandas數據集,對pandas數據集進行本地化,而且全部數據都駐留在驅動程序內存中,所以此方法僅在預期生成的pandas DataFrame較小的狀況下使用。

換句話說,@pandas_udf使用panda API來處理分佈式數據集,而toPandas()將分佈式數據集轉換爲本地數據,而後使用pandas進行處理。

5. 參考文獻

[1] PySpark Usage Guide for Pandas with Apache Arrow

[2] pyspark.sql.functions.pandas_udf

相關文章
相關標籤/搜索