版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。sql
groupBy與agg搭配使用,agg(*exprs),這個方法是GroupedData上用於計算聚合數據的方法,*exprs能夠是一個 string到string組成的字典,key是須要聚合的列名,value是用於計算的聚合函數的名稱。聚 合函數能夠的取值有:avg, max, min, sum, count,mean。agg方法返回一個聚合後的 DataFrame對象。app
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
df.groupBy('Genre').agg({"Age":"mean","Annual Income (k$)":"max","Spending Score (1-100)":"count"}).show()
+------+-----------------------------+-----------------------+------------------+
| Genre|count(Spending Score (1-100))|max(Annual Income (k$))| avg(Age)|
+------+-----------------------------+-----------------------+------------------+
|Female| 112| 99|38.098214285714285|
| Male| 88| 99| 39.80681818181818|
+------+-----------------------------+-----------------------+------------------+
複製代碼
除了使用字符串以字典的方式指定,其實還能夠使用聚合後的列的表達式來作相同的是,這 須要藉助pyspark.sql.functions模塊中的方法。函數
from pyspark.sql.functions import *
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
df.groupBy('Genre').agg(mean(df.Age)).show()
+------+------------------+
| Genre| avg(Age)|
+------+------------------+
|Female|38.098214285714285|
| Male| 39.80681818181818|
+------+------------------+
複製代碼
apply(udf),使用pandas中的用戶自定義函數做用在GroupedData的每一組數據之上,返 回結果做爲一個DataFrame。udf用戶自定義函數接收pandas.DataFrame做爲參數,返回另 外一個pandas.DataFrame對象。這個方法是pyspark2.3中加入的新方法。經過@pandas_udf表示這是一個pandas的方法,參數爲id long,v double,指定PandasUDFType 爲分組map操做。(測試未經過)oop
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('spark://hadoopmaste:7077').appName('apply').getOrCreate()
df = spark.createDataFrame( [(1, 10.0), (1, 21.0), (2, 34.0), (2, 56.0), (2, 19.0)], ("id",
"v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
v = pdf.v
print(type(v),type(pdf))
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").apply(normalize).show()
spark.stop()
Pandas中DataFrame的assign方法是新建一個DataFrame而不會改變原來的DataFrame。
複製代碼
avg(*cols),給算給定的數值類型的列的平均值測試
df = spark.read.csv('/sql/customers.csv',header=True)
df.select(df.Age.cast('int').alias('age'),'Genre').groupBy('Genre').avg('age').show()
+------+------------------+
| Genre| avg(age)|
+------+------------------+
|Female|38.098214285714285|
| Male| 39.80681818181818|
+------+------------------+
複製代碼
count(),返回每一個分組中數據的條數大數據
df = spark.read.csv('/sql/customers.csv',header=True)
df.select(df.Age.cast('int').alias('age'),'Genre').groupBy('Genre').count().show()
複製代碼
max(*cols),計算給定列中數值最大的值。ui
df = spark.read.csv('/sql/customers.csv',header=True)
df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income
(k$)').groupBy('Genre').max().show()
複製代碼
mean(*cols)計算對應列的均值,列須要是數值類型spa
spark.read.csv('/sql/customers.csv',header=True)
df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income
(k$)').groupBy('Genre').mean().show()
複製代碼
min(*cols) 計算對應列的最小值,列數值類型須要是數值類型code
spark.read.csv('/sql/customers.csv',header=True)
df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income
(k$)').groupBy('Genre').min().show()
複製代碼
sum(*cols),計算指定列的和,列的類型須要是數值類型。orm
spark.read.csv('/sql/customers.csv',header=True)
df.select(df.Age.cast('int').alias('age'),'Genre','Annual Income (k$)').groupBy('Genre').sum().show()
複製代碼
未完待續
Python技術棧與Spark交叉數據分析雙向整合,讓咱們在大數據融合分析達到了通用,能夠發現Spark SQL 其實很大部分功能和Pandas雷同
秦凱新 於深圳 201812172352