Spark譯文(二)

PySpark Usage Guide for Pandas with Apache Arrow(使用Apache Arrow的Pandas PySpark使用指南)

Apache Arrow in Spark(Spark中的Apache Arrow)

·Apache Arrow是一種內存中的列式數據格式,在Spark中用於在JVM和Python進程之間有效地傳輸數據。
·這對於使用Pandas / NumPy數據的Python用戶來講是最有益的。
·它的使用不是自動的,可能須要對配置或代碼進行一些小的更改才能充分利用並確保兼容性。
·本指南將提供有關如何在Spark中使用Arrow的高級描述,並在使用啓用箭頭的數據時突出顯示任何差別。

Ensure PyArrow Installed(確保PyArrow已安裝)

·若是使用pip安裝PySpark,則能夠使用命令pip install pyspark [sql]將PyArrow做爲SQL模塊的額外依賴項引入。
·不然,您必須確保在全部羣集節點上安裝並可用PyArrow。
·當前支持的版本是0.8.0。
·您能夠使用conda-forge通道中的pip或conda進行安裝。
·有關詳細信息,請參閱PyArrow安裝。

Enabling for Conversion to/from Pandas(啓用與Pandas的轉換)

·使用調用toPandas()將Spark DataFrame轉換爲Pandas DataFrame時以及使用createDataFrame(pandas_df)從Pandas DataFrame建立Spark DataFrame時,Arrow可用做優化。
·要在執行這些調用時使用Arrow,用戶須要先將Spark配置'spark.sql.execution.arrow.enabled'設置爲'true'。
·默認狀況下禁用此功能。
·此外,若是在Spark中的實際計算以前發生錯誤,則由'spark.sql.execution.arrow.enabled'啓用的優化能夠自動回退到非Arrow優化實現。
·這能夠經過'spark.sql.execution.arrow.fallback.enabled'來控制。
import numpy as np import pandas as pd # Enable Arrow-based columnar data transfers spark.conf.set("spark.sql.execution.arrow.enabled", "true") # Generate a Pandas DataFrame pdf = pd.DataFrame(np.random.rand(100, 3)) # Create a Spark DataFrame from a Pandas DataFrame using Arrow df = spark.createDataFrame(pdf) # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow result_pdf = df.select("*").toPandas() 
·在Spark repo中的「examples / src / main / python / sql / arrow.py」中找到完整的示例代碼。
·使用上述箭頭優化將產生與未啓用箭頭時相同的結果。
·請注意,即便使用Arrow,toPandas()也會將DataFrame中全部記錄收集到驅動程序中,而且應該在一小部分數據上完成。
·當前不支持全部Spark數據類型,若是列具備不受支持的類型,則可能引起錯誤,請參閱支持的SQL類型。
·若是在createDataFrame()期間發生錯誤,Spark將回退以建立沒有Arrow的DataFrame。

Pandas UDFs (a.k.a. Vectorized UDFs)

·Pandas UDF是用戶定義的函數,由Spark使用Arrow執行傳輸數據和Pandas以處理數據。
·Pandas UDF使用關鍵字pandas_udf做爲裝飾器定義或包裝函數,不須要其餘配置。
·目前,有兩種類型的Pandas UDF:Scalar和Grouped Map。

Scalar

·標量Pandas UDF用於矢量化標量操做。
·它們能夠與select和withColumn等函數一塊兒使用。
·Python函數應該將pandas.Series做爲輸入並返回相同長度的pandas.Series。
·在內部,Spark將執行Pandas UDF,方法是將列拆分爲批次,並將每一個批次的函數做爲數據的子集調用,而後將結果鏈接在一塊兒。
·如下示例顯示如何建立計算2列乘積的標量Pandas UDF。
import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType # Declare the function and create the UDF def multiply_func(a, b): return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) # The function for a pandas_udf should be able to execute with local Pandas data x = pd.Series([1, 2, 3]) print(multiply_func(x, x)) # 0 1 # 1 4 # 2 9 # dtype: int64 # Create a Spark DataFrame, 'spark' is an existing SparkSession df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF df.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # | 1| # | 4| # | 9| # +-------------------+ 
Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo.

Grouped Map(分組圖)

·分組映射Pandas UDF與groupBy()。apply()一塊兒使用,它實現了「split-apply-combine」模式。
·Split-apply-combine包含三個步驟:
·使用DataFrame.groupBy將數據拆分爲組。
·在每一個組上應用一個功能。
·該函數的輸入和輸出都是pandas.DataFrame。
·輸入數據包含每一個組的全部行和列。
·將結果合併到一個新的DataFrame中。
·要使用groupBy()。apply(),用戶須要定義如下內容:
·一個Python函數,用於定義每一個組的計算。
·StructType對象或定義輸出DataFrame架構的字符串。
·若是指定爲字符串,則返回的pandas.DataFrame的列標籤必須與定義的輸出模式中的字段名稱匹配,或者若是不是字符串,則必須按字段匹配字段數據類型,例如,
·整數指數。
·有關如何在構造pandas.DataFrame時標記列的信息,請參閱pandas.DataFrame。
·請注意,在應用函數以前,組的全部數據都將加載到內存中。
·這可能致使內存不足異常,尤爲是在組大小偏斜的狀況下。
·maxRecordsPerBatch的配置不適用於組,而且由用戶決定分組數據是否適合可用內存。
·如下示例顯示如何使用groupby()。apply()從組中的每一個值中減去均值。
from pyspark.sql.functions import pandas_udf, PandasUDFType df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def subtract_mean(pdf): # pdf is a pandas.DataFrame v = pdf.v return pdf.assign(v=v - v.mean()) df.groupby("id").apply(subtract_mean).show() # +---+----+ # | id| v| # +---+----+ # | 1|-0.5| # | 1| 0.5| # | 2|-3.0| # | 2|-1.0| # | 2| 4.0| # +---+----+ 
·在Spark repo中的「examples / src / main / python / sql / arrow.py」中找到完整的示例代碼。
·有關詳細用法,請參閱pyspark.sql.functions.pandas_udf和pyspark.sql.GroupedData.apply

Grouped Aggregate(分組聚合)

·分組聚合Pandas UDF相似於Spark聚合函數。
·分組聚合Pandas UDF與groupBy()。agg()和pyspark.sql.Window一塊兒使用。
·它定義了從一個或多個pandas.Series到標量值的聚合,其中每一個pandas.Series表示組或窗口中的列。
·請注意,此類型的UDF不支持部分聚合,組或窗口的全部數據都將加載到內存中。
·此外,目前只有Grouped聚合Pandas UDF支持無界窗口。
·如下示例顯示如何使用此類型的UDF來計算groupBy和窗口操做的平均值:
from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql import Window df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) @pandas_udf("double", PandasUDFType.GROUPED_AGG) def mean_udf(v): return v.mean() df.groupby("id").agg(mean_udf(df['v'])).show() # +---+-----------+ # | id|mean_udf(v)| # +---+-----------+ # | 1| 1.5| # | 2| 6.0| # +---+-----------+ w = Window \ .partitionBy('id') \ .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # +---+----+------+ # | id| v|mean_v| # +---+----+------+ # | 1| 1.0| 1.5| # | 1| 2.0| 1.5| # | 2| 3.0| 6.0| # | 2| 5.0| 6.0| # | 2|10.0| 6.0| # +---+----+------+ 
·在Spark repo中的「examples / src / main / python / sql / arrow.py」中找到完整的示例代碼。
·有關詳細用法,請參閱pyspark.sql.functions.pandas_udf

Usage Notes(使用說明)

Supported SQL Types(支持的SQL類型)

·目前,基於箭頭的轉換支持全部Spark SQL數據類型,但MapType,TimestampType的ArrayType和嵌套的StructType除外。
·僅當安裝的PyArrow等於或高於0.10.0時才支持BinaryType。

Setting Arrow Batch Size(設置箭頭批量大小)

·Spark中的數據分區將轉換爲箭頭記錄批次,這可能會暫時致使JVM中的高內存使用量。
·爲避免可能的內存不足異常,能夠經過將conf「spark.sql.execution.arrow.maxRecordsPerBatch」設置爲一個整數來調整箭頭記錄批次的大小,該整數將肯定每一個批次的最大行數。
·默認值爲每批10,000條記錄。
·若是列數很大,則應相應地調整該值。
·使用此限制,每一個數據分區將被製成一個或多個記錄批次以進行處理。

Timestamp with Time Zone Semantics

·Spark在內部將時間戳存儲爲UTC值,而且在沒有指定時區的狀況下引入的時間戳數據將以本地時間轉換爲UTC,並具備微秒分辨率。
·在Spark中導出或顯示時間戳數據時,會話時區用於本地化時間戳值。
·會話時區使用配置'spark.sql.session.timeZone'設置,若是未設置,將默認爲JVM系統本地時區。
·Pandas使用具備納秒分辨率的datetime64類型,datetime64 [ns],而且每列具備可選的時區。
·當時間戳數據從Spark傳輸到Pandas時,它將轉換爲納秒,每列將轉換爲Spark會話時區,而後本地化到該時區,這將刪除時區並將值顯示爲本地時間。
·使用timestamp列調用toPandas()或pandas_udf時會發生這種狀況。
·當時間戳數據從Pandas傳輸到Spark時,它將轉換爲UTC微秒。
·當使用Pandas DataFrame調用createDataFrame或從pandas_udf返回時間戳時,會發生這種狀況。
·這些轉換是自動完成的,以確保Spark具備預期格式的數據,所以沒必要本身進行任何這些轉換。
·任何納秒值都將被截斷。
·請注意,標準UDF(非Pandas)會將時間戳數據做爲Python日期時間對象加載,這與Pandas時間戳不一樣。
·在pandas_udfs中使用時間戳時,建議使用Pandas時間序列功能以得到最佳性能,有關詳細信息,請參閱此處。
相關文章
相關標籤/搜索