版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。正則表達式
Python Spark DataFrame 基礎算法
df = spark.read.parquet('/sql/users.parquet')
df.show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
複製代碼
Python Spark DataFrame 聚合統計sql
CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)
0001,Male,19,15,39
0002,Male,21,15,81
0003,Female,20,16,6
0004,Female,23,16,77
0005,Female,31,17,40
0006,Female,22,17,76
df = spark.read.csv('/sql/customers.csv',header=True)
df.printSchema()
df.show()
root
|-- CustomerID: string (nullable = true)
|-- Genre: string (nullable = true)
|-- Age: string (nullable = true)
|-- Annual Income (k$): string (nullable = true)
|-- Spending Score (1-100): string (nullable = true)
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72|
| 0011| Male| 67| 19| 14|
| 0012|Female| 35| 19| 99|
| 0013|Female| 58| 20| 15|
| 0014|Female| 24| 20| 77|
| 0015| Male| 37| 20| 13|
| 0016| Male| 22| 20| 79|
| 0017|Female| 35| 21| 35|
| 0018| Male| 20| 21| 66|
| 0019| Male| 52| 23| 29|
| 0020|Female| 35| 23| 98|
+----------+------+---+------------------+----------------------+
df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()
+---------------------------+-----------------------+--------+
|avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|
+---------------------------+-----------------------+--------+
| 50.2| 60.56| 70|
+---------------------------+-----------------------+--------+
複製代碼
alias(alias)爲DataFrame定義一個別名,稍後再函數中就能夠利用這個別名來作相關的運 算,例如說自關聯Join:json
df1 = df.alias('cus1')
type(df1)
df2 = df.alias('cus2')
df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')
df3.count()
200
+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72| 0010|Female| 30| 19| 72|
+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
only showing top 10 rows
複製代碼
cache(),將DataFrame緩存到StorageLevel對應的緩存級別中,默認是 MEMORY_AND_DISK緩存
df = spark.read.csv('/sql/customers.csv',header=True)
a = df.cache()
a.show()
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72|
| 0011| Male| 67| 19| 14|
| 0012|Female| 35| 19| 99|
複製代碼
checkpoint(eager=True) 對DataFrame設置斷點,這個方法是Spark2.1引入的方法,這個方法的調用會斬斷在這個 DataFrame上的邏輯執行計劃,將先後的依賴關係持久化到checkpoint文件中去。app
sc
sc.setCheckpointDir('/datas/checkpoint')
a.checkpoint()
a.show()
複製代碼
coalesce(numPartitions) 重分區算法,傳入的參數是DataFrame的分區數量。dom
注意經過read方法讀取文件,建立的DataFrame默認的分區數爲文件的個數,即一個文件對
應一個分區,在分區數少於coalesce指定的分區數的時候,調用coalesce是不起做用的
df = spark.read.csv('/sql/customers.csv',header=True)
df.rdd.getNumPartitions()
1
spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()
1
df = spark.range(0,20,2,3)
df.rdd.getNumPartitions()
df.coalesce(2).rdd.getNumPartitions()
2
複製代碼
repartition(numPartitions, *cols)這個方法和coalesce(numPartitions) 方法同樣,都是 對DataFrame進行從新的分區,可是repartition這個方法會使用hash算法,在整個集羣中進 行shuffle,效率較低。repartition方法不只能夠指定分區數,還能夠指定按照哪些列來作分 區。函數
df = spark.read.csv('/sql/customers.csv',header=True)
df.rdd.getNumPartitions()
1
df2 = df.repartition(3)
df2.rdd.getNumPartitions()
3
df2.columns
df3 = df2.repartition(6,'Genre')
df3.show(20)
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0010|Female| 30| 19| 72|
| 0012|Female| 35| 19| 99|
| 0013|Female| 58| 20| 15|
df3.rdd.getNumPartitions()
6
複製代碼
colRegex(colName)用正則表達式的方式返回咱們想要的列。測試
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
df.select(df.colRegex("`(Col1)?+.+`")).show()
+---+
| a|
+---+
| 1|
| 2|
| 3|
+---+
複製代碼
collect(),返回DataFrame中的全部數據,注意數據量大了容易形成Driver節點內存溢 出!大數據
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
df.collect()
[Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)]
複製代碼
columns,以列表的形式返回DataFrame的全部列名
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
複製代碼
SparkSQL DataFrame 轉換爲 PandasDataFrame
df = spark.read.csv('/sql/customers.csv',header=True)
pdf = df.toPandas()
複製代碼
Pandas 相關數據處理操做
pdf.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries, 0 to 199
Data columns (total 5 columns):
CustomerID 200 non-null object
Genre 200 non-null object
Age 200 non-null object
Annual Income (k$) 200 non-null object
Spending Score (1-100) 200 non-null object
dtypes: object(5)
memory usage: 7.9+ KB
pdf['Age'] = pdf['Age'].astype('int')
pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')
pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')
pdf.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries, 0 to 199
Data columns (total 5 columns):
CustomerID 200 non-null object
Genre 200 non-null object
Age 200 non-null int64
Annual Income (k$) 200 non-null int64
Spending Score (1-100) 200 non-null int64
dtypes: int64(3), object(2)
memory usage: 7.9+ KB
複製代碼
PandasDataFrame 轉換爲 SparkSQL DataFrame
df1 = spark.createDataFrame(pdf)
df1.corr("Age","Annual Income (k$)")
df1.corr("Spending Score (1-100)","Annual Income (k$)")
0.009902848094037492
複製代碼
count()返回DataFrame中Row的數量
df = spark.read.csv('/sql/customers.csv',header=True)
df.count()
200
複製代碼
createGlobalTempView(name)使用DataFrame建立一個全局的臨時表,其生命週期 和啓動的app的週期一致,即啓動的spark應用存在則這個臨時的表就一直能訪問。直到 sparkcontext的stop方法的調用退出應用爲止。建立的臨時表保存在global_temp這個庫 中。
df = spark.read.csv('/sql/customers.csv',header=True)
#df.createGlobalTempView('TT')
spark.sql('select * from global_temp.TT').show()
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
複製代碼
createOrReplaceGlobalTempView(name)上面的方法當遇到已經建立了的臨時表名 的話會報錯,而這個方法遇到已經存在的臨時表會進行替換,沒有則建立。
df = spark.read.csv('/sql/customers.csv',header=True)
df.createOrReplaceGlobalTempView('TT')
spark.sql('select * from global_temp.TT').show()
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
複製代碼
crossJoin(other)返回兩個DataFrame的笛卡爾積組合。不要輕易嘗試這個方法,很是 耗時且費資源
df1 = spark.createDataFrame([('regan',27),('ting',24)],schema=['name','age'])
df2 = spark.createDataFrame([('regan',65),('ting',48)],schema=['name','weight'])
df3 = df1.coalesce(3).crossJoin(df2.coalesce(3))
df3.show()
+-----+---+-----+------+
| name|age| name|weight|
+-----+---+-----+------+
|regan| 27|regan| 65|
|regan| 27| ting| 48|
| ting| 24|regan| 65|
| ting| 24| ting| 48|
+-----+---+-----+------+
複製代碼
cube(*cols)在當前的DataFrame上建立多維的數據立方體
from pyspark.sql.functions import *
df = spark.read.csv('/sql/customers.csv',header=True)
df.cube('Age','Genre').count().orderBy(desc("count"), asc("Age")).show()
+----+------+-----+
| Age| Genre|count|
+----+------+-----+
|null| null| 200|
|null|Female| 112|
|null| Male| 88|
| 32| null| 11|
| 35| null| 9|
| 19| null| 8|
| 31| null| 8|
| 30| null| 7|
| 31|Female| 7|
| 49| null| 7|
| 19| Male| 6|
| 23|Female| 6|
| 23| null| 6|
| 27| null| 6|
| 32|Female| 6|
| 35|Female| 6|
| 36| null| 6|
| 38| null| 6|
| 40| null| 6|
| 47| null| 6|
+----+------+-----+
only showing top 20 rows
複製代碼
describe(*cols)統計cols對應的基本的統計信息,包括數量、最大值、最小值、均值及標 準差
df = spark.read.csv('/sql/customers.csv',header=True)
#df.describe('Age')
df.describe('Age','Genre').show()
+-------+-----------------+------+
|summary| Age| Genre|
+-------+-----------------+------+
| count| 200| 200|
| mean| 38.85| null|
| stddev|13.96900733155888| null|
| min| 18|Female|
| max| 70| Male|
+-------+-----------------+------+
df.describe().show()
+-------+------------------+------+-----------------+------------------+----------------------+
|summary| CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|
+-------+------------------+------+-----------------+------------------+----------------------+
| count| 200| 200| 200| 200| 200|
| mean| 100.5| null| 38.85| 60.56| 50.2|
| stddev|57.879184513951124| null|13.96900733155888| 26.26472116527124| 25.823521668370173|
| min| 0001|Female| 18| 101| 1|
| max| 0200| Male| 70| 99| 99|
+-------+------------------+------+-----------------+------------------+----------------------+
pdf=df.toPandas()
pdf.describe()
複製代碼
distinct()返回DataFrame中非重複的數據
df = spark.createDataFrame([(1,1),(1,2),(1,2),(5,5)])
df.count()
df.distinct().count()
df = spark.createDataFrame([(1,1),(1,2),(1,2),(5,5)])
df.count()
4
df.distinct().count()
3
複製代碼
drop(*cols)按照列名刪除DataFrame中的列,返回新的DataFrame
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
df1 = df.drop('Age')
df1.columns
['CustomerID', 'Genre', 'Annual Income (k$)', 'Spending Score (1-100)']
複製代碼
dropDuplicates(subset=None)刪除重複行,subset用於指定在刪除重複行的時候考 慮那幾列。
from pyspark.sql import Row
df = sc.parallelize([
Row(name='regan', age=27, height=170),
Row(name='regan', age=27, height=170),
Row(name='regan', age=27, height=155)],3).toDF()
df.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 27| 170|regan|
| 27| 170|regan|
| 27| 155|regan|
+---+------+-----+
df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 27| 155|regan|
| 27| 170|regan|
+---+------+-----+
df.dropDuplicates(subset=['age','name']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 27| 170|regan|
+---+------+-----+
複製代碼
numpy自由引入
dropna(how='any', thresh=None, subset=None)刪除DataFrame中的na數據,關鍵字參 數how指定如何刪,有「any」和‘all’兩種選項,thresh指定行中na數據有多少個時刪除整行數 據,這個設置將覆蓋how關鍵字參數的設置,subset指定在那幾列中刪除na數據。
import numpy as np
df = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight'])
df.show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| NaN|27.0| 170.0|
|44.0|27.0| 170.0|
| NaN| NaN| 170.0|
+----+----+------+
df.dropna(how='any').show()
+----+----+------+
|luck| age|weight|
+----+----+------+
|44.0|27.0| 170.0|
+----+----+------+
df.dropna(how='all').show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| NaN|27.0| 170.0|
|44.0|27.0| 170.0|
| NaN| NaN| 170.0|
+----+----+------+
df.dropna(thresh=2).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| NaN|27.0| 170.0|
|44.0|27.0| 170.0|
+----+----+------+
複製代碼
dtypes返回DataFrame列的名字及對應的數據類型組成的tuple列表
import numpy as np
df = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight'])
df.dtypes
[('luck', 'double'), ('age', 'double'), ('weight', 'double')]
複製代碼
fillna(value, subset=None)用於DataFrame中空數據的填充。
import numpy as np
f = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight']).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| NaN|27.0| 170.0|
|44.0|27.0| 170.0|
| NaN| NaN| 170.0|
+----+----+------+
df.na.fill(0.0).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| 0.0|27.0| 170.0|
|44.0|27.0| 170.0|
| 0.0| 0.0| 170.0|
+----+----+------+
df.fillna(0.0).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| 0.0|27.0| 170.0|
|44.0|27.0| 170.0|
| 0.0| 0.0| 170.0|
+----+----+------+
df.na.fill(False).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| NaN|27.0| 170.0|
|44.0|27.0| 170.0|
| NaN| NaN| 170.0|
+----+----+------+
df.na.fill({'luck':0.0,'age':50.0}).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
| 0.0|27.0| 170.0|
|44.0|27.0| 170.0|
| 0.0|50.0| 170.0|
+----+----+------+
複製代碼
filter(condition)按照傳入的條件進行過濾,其實where方法就是filter方法的一個別名 而已。
import numpy as np
df = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight'])
df.filter(df.luck != np.nan).show()
+----+----+------+
|luck| age|weight|
+----+----+------+
|44.0|27.0| 170.0|
+----+----+------+
import numpy as np
df = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight'])
df.filter('luck <> "NaN" ').show()
+----+----+------+
|luck| age|weight|
+----+----+------+
|44.0|27.0| 170.0|
+----+----+------+
複製代碼
first()返回DataFrame的第一條記錄
import numpy as np
df = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight'])
df.show()
df.first()
Row(luck=nan, age=27.0, weight=170.0)
複製代碼
foreach(f),在每個Row上運用f方法,實際上它調用的是df.rdd.foreach這個機遇 RDD上的foreach方法。(測試未經過)
import numpy as np
df = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),
(np.nan,np.nan,170.)],schema=['luck','age','weight'])
def myprint(x):
print(x.age)
df.foreach(lambda x:print(x))
def pprint(x):
for p in x:
print(p.luck)
df.foreachPartition(pprint)
複製代碼
groupBy(*cols)使用給定的列進行分組,返回GroupedData對象
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
df.groupby('Genre').agg({'Age':'mean'}).show()
+------+------------------+
| Genre| avg(Age)|
+------+------------------+
|Female|38.098214285714285|
| Male| 39.80681818181818|
+------+------------------+
複製代碼
head(n=None)返回DataFrame前n行數據,默認是返回1行,能夠經過n關鍵字參數指定
df = spark.read.csv('/sql/customers.csv',header=True)
df.head(6)
複製代碼
hint(name, *parameters),hint方法用於兩個DataFrame作Join操做的時候,指定Join的 方式,通常爲broadcast的方式。hint是暗示的意思,能夠看出做者仍是挺幽默的,給程序一 個暗示,按照那種方式join。
df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])
df2 = spark.createDataFrame([('regan',130),('ting',90)],schema=['name','weight'])
df3 = df1.join(df2.hint('broadcast'),'name').show()
+-----+---+------+
| name|age|weight|
+-----+---+------+
|regan| 23| 130|
| ting| 24| 90|
+-----+---+------+
複製代碼
intersect(other)返回兩個DataFrame的交集是集合中的概念
df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])
df2 = spark.createDataFrame([('regan',23),('ting',90)],schema=['name','age'])
df3 = df1.intersect(df2).show()
+-----+---+
| name|age|
+-----+---+
|regan| 23|
+-----+---+
複製代碼
join(other, on=None, how=None),用來對兩個DataFrame作鏈接關聯操做,other是另 外一個DataFrame,on指定以哪一個字段作關聯,how指定怎麼關聯,有 inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti選項,默認是inner。
df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])
df2 = spark.createDataFrame([('regan',130),('ting2',90)],schema=['name','weight'])
df1.join(df2,on='name',how='left_outer').show()
+-----+---+------+
| name|age|weight|
+-----+---+------+
|regan| 23| 130|
| ting| 24| null|
+-----+---+------+
df1.join(df2,on='name',how='right_outer').show()
+-----+----+------+
| name| age|weight|
+-----+----+------+
|regan| 23| 130|
|ting2|null| 90|
+-----+----+------+
df1.join(df2,on='name',how='left_semi').show()
+-----+---+
| name|age|
+-----+---+
|regan| 23|
+-----+---+
df1.join(df2,on='name',how='left_anti').show()
+----+---+
|name|age|
+----+---+
|ting| 24|
+----+---+
複製代碼
limit(num)限制返回的數據的條數,防止返回到driver節點的數據過大形成OOM
df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])
df1.limit(1).collect()
複製代碼
orderBy(*cols, **kwargs),返回按照指定列排好序的新的DataFrame。
df = spark.read.csv('/sql/customers.csv',header=True)
df.orderBy('Age').show(3)
df.orderBy('Age',ascending=False).show(3)
+----------+-----+---+------------------+----------------------+
|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+-----+---+------------------+----------------------+
| 0034| Male| 18| 33| 92|
| 0066| Male| 18| 48| 59|
| 0092| Male| 18| 59| 41|
+----------+-----+---+------------------+----------------------+
only showing top 3 rows
+----------+-----+---+------------------+----------------------+
|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+-----+---+------------------+----------------------+
| 0061| Male| 70| 46| 56|
| 0071| Male| 70| 49| 55|
| 0058| Male| 69| 44| 46|
+----------+-----+---+------------------+----------------------+
only showing top 3 rows
df.orderBy(desc("Age")).show(3)
df.orderBy(df.Age.desc()).show(3)
orderBy方法和sort方法相似
df.sort(desc("Age")).show(3)
df.sort(df.Age.desc()).show(3)
複製代碼
persist(storageLevel=StorageLevel(True, True, False, False, 1))用來指定DataFrame 的緩存級別,默認爲內存和磁盤。
from pyspark import StorageLevel
df = spark.read.csv('/sql/customers.csv',header=True)
df.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2)
DataFrame[CustomerID: string, Genre: string, Age: string, Annual Income (k$): string, Spending Score (1-100): string]
複製代碼
randomSplit(weights, seed=None),按照給定的權重將DataFrame分爲幾個 DataFrame,seed關鍵字參數用來指定隨機種子,用於復現結果。
df = spark.range(0.,30.,2,3)
df.show()
df.describe().show()
dfs = df.randomSplit([1.0,4.0],24)
for df in dfs:
df.show()
複製代碼
rdd,返回DataFrame對應的RDD對象,利用這個對象能夠調用RDD上的全部的方法,但 是這些方法是比較底層的方法,在處理一些特殊任務的時候,頂層的DataFrame的方法可 能沒法解決,須要轉換到更底層的RDD上來進行操做。
df = spark.range(0.,30.,2,3)
rdd = df.rdd
rdd.map(lambda x:x.id ** 2).collect()
複製代碼
replace(to_replace, value=, subset=None)這個方法經過第一個參數指定要 被替換掉的老的值,第二個參數指定新的值,subset關鍵字參數指定子集,默認是在整個 DataFrame上進行替換。把數據集中的99換成100
注意上面在替換的過程當中to_replace和value的類型必需要相同,並且to_replace數據類型只
能是:bool, int, long, float, string, list or dict。value數據類型只能是: bool, int, long, float,
string, list or None
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
df.show()
df2 = df.replace('99','100')
df2.show()
df.replace(['Female','Male'],['F','M'],'Genre').show()
+----------+-----+---+------------------+----------------------+
|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+-----+---+------------------+----------------------+
| 0001| M| 19| 15| 39|
| 0002| M| 21| 15| 81|
| 0003| F| 20| 16| 6|
| 0004| F| 23| 16| 77|
| 0005| F| 31| 17| 40|
| 0006| F| 22| 17| 76|
| 0007| F| 35| 18| 6|
| 0008| F| 23| 18| 94|
| 0009| M| 64| 19| 3|
| 0010| F| 30| 19| 72|
| 0011| M| 67| 19| 14|
| 0012| F| 35| 19| 99|
| 0013| F| 58| 20| 15|
| 0014| F| 24| 20| 77|
| 0015| M| 37| 20| 13|
| 0016| M| 22| 20| 79|
| 0017| F| 35| 21| 35|
| 0018| M| 20| 21| 66|
| 0019| M| 52| 23| 29|
| 0020| F| 35| 23| 98|
+----------+-----+---+------------------+----------------------+
df.na.replace(['Female','Male'],['F','M'],'Genre').show()
複製代碼
rollup(*cols),按照指定的列名進行彙總,這樣就能夠在彙總的數據集上運用聚合函數
from pyspark.sql.functions import *
df = spark.read.csv('/sql/customers.csv',header=True)
df.rollup('Genre','Age').count().orderBy(desc('count'),'Genre').show()
+------+----+-----+
| Genre| Age|count|
+------+----+-----+
| null|null| 200|
|Female|null| 112|
| Male|null| 88|
|Female| 31| 7|
|Female| 23| 6|
|Female| 49| 6|
|Female| 32| 6|
|Female| 35| 6|
| Male| 19| 6|
|Female| 30| 5|
| Male| 32| 5|
| Male| 48| 5|
|Female| 21| 4|
|Female| 47| 4|
|Female| 50| 4|
|Female| 36| 4|
|Female| 29| 4|
|Female| 27| 4|
|Female| 38| 4|
| Male| 59| 4|
+------+----+-----+
複製代碼
sample(withReplacement=None, fraction=None, seed=None),用於從DataFrame中進行 採樣的方法,withReplacement關鍵字參數用於指定是否採用有放回的採樣,true爲有放回 採用,false爲無放回的採樣,fraction指定採樣的比例,seed採樣種子,相同的種子對應的 採樣老是相同的,用於場景的復現。
df = spark.read.csv('/sql/customers.csv',header=True)
df.count()
200
df2 = df.sample(withReplacement=True,fraction=0.2,seed=1)
df2.count()
35
複製代碼
sampleBy(col, fractions, seed=None),按照指定的col列根據fractions指定的比例進行分 層抽樣,seed是隨機種子,用於場景的復現。
df = spark.read.csv('/sql/customers.csv',header=True)
df.sampleBy('Genre',{'Male':0.1,'Female':0.15}).groupBy('Genre').count().show()
+------+-----+
| Genre|count|
+------+-----+
|Female| 15|
| Male| 11|
+------+-----+
複製代碼
select(*cols),經過表達式選取DataFrame中符合條件的數據,返回新的DataFrame
f = spark.read.csv('/sql/customers.csv',header=True)
df.select('*').count()
df.select('Age','Genre').show(10)
df.select(df.Age.alias('age')).show(10)
複製代碼
selectExpr(*expr),這個方法是select方法的一個變體,他能夠接收一個SQL表達式, 返回新的DataFrame
df = spark.read.csv('/sql/customers.csv',header=True)
df.selectExpr('Age * 2','sqrt(Age)').show(10)
df = spark.read.csv('/sql/customers.csv',header=True)
df.selectExpr('Age * 2','sqrt(Age)').show(10)
+---------+-------------------------+
|(Age * 2)|SQRT(CAST(Age AS DOUBLE))|
+---------+-------------------------+
| 38.0| 4.358898943540674|
| 42.0| 4.58257569495584|
| 40.0| 4.47213595499958|
| 46.0| 4.795831523312719|
| 62.0| 5.5677643628300215|
| 44.0| 4.69041575982343|
| 70.0| 5.916079783099616|
| 46.0| 4.795831523312719|
| 128.0| 8.0|
| 60.0| 5.477225575051661|
+---------+-------------------------+
複製代碼
show(n=20, truncate=True, vertical=False),這個方法默認返回DataFrame的前20行記 錄,能夠經過truncate指定超過20個字符的記錄將會被截斷,vertical指定是否垂直顯示。
df = spark.read.csv('/sql/customers.csv',header=True)
df.selectExpr('Age * 2','sqrt(Age)').show(10,truncate=False,vertical=False)
複製代碼
sortWithinPartitions(*cols, **kwargs)和sort(*cols, **kwargs),這兩個方法都是 用指定的cols列進行排序,經過kwargs參數指定升序降序。
sortWithinPartitions(*cols, **kwargs)和sort(*cols, **kwargs),這兩個方法都是 用指定的cols列進行排序,經過kwargs參數指定升序降序。
df = spark.read.csv('/sql/customers.csv',header=True)
df.sort(['Age','Genre'],ascending=True).show(10)
df.sort(df.Age.desc()).show(10)
from pyspark.sql.functions import *
df.sortWithinPartitions(['Age','Genre'],ascending=False).show(10)
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0061| Male| 70| 46| 56|
| 0071| Male| 70| 49| 55|
| 0058| Male| 69| 44| 46|
| 0109| Male| 68| 63| 43|
| 0068|Female| 68| 48| 48|
| 0091|Female| 68| 59| 55|
| 0011| Male| 67| 19| 14|
| 0083| Male| 67| 54| 41|
| 0103| Male| 67| 62| 59|
| 0063|Female| 67| 47| 52|
+----------+------+---+------------------+----------------------+
df.sortWithinPartitions(desc('Age')).show(10)
複製代碼
subtract(other),這個方法用來獲取在A集合裏而再也不B集合裏的數據,返回新的 DataFrame
df1 = spark.createDataFrame([('regan',),('ting',),('yu',)],schema=['name'])
df2 = spark.createDataFrame([('regan',),('ting',),('sha',)],schema=['name'])
df3 = df1.subtract(df2)
df3.show()
複製代碼
summary(*statistics),用傳入的統計方法返回概要信息。不傳參數會默認計算count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max, *statistics參數能夠是: count mean stddev min max arbitrary approximate percentiles
f = spark.read.csv('/sql/customers.csv',header=True)
df.summary().show()
df.summary('min','count','75%').show()
+-------+------------------+------+-----------------+------------------+----------------------+
|summary| CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|
+-------+------------------+------+-----------------+------------------+----------------------+
| count| 200| 200| 200| 200| 200|
| mean| 100.5| null| 38.85| 60.56| 50.2|
| stddev|57.879184513951124| null|13.96900733155888| 26.26472116527124| 25.823521668370173|
| min| 0001|Female| 18| 101| 1|
| 25%| 50.0| null| 28.0| 40.0| 34.0|
| 50%| 100.0| null| 36.0| 61.0| 50.0|
| 75%| 150.0| null| 49.0| 78.0| 73.0|
| max| 0200| Male| 70| 99| 99|
+-------+------------------+------+-----------------+------------------+----------------------+
+-------+----------+------+----+------------------+----------------------+
|summary|CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|
+-------+----------+------+----+------------------+----------------------+
| min| 0001|Female| 18| 101| 1|
| count| 200| 200| 200| 200| 200|
| 75%| 150.0| null|49.0| 78.0| 73.0|
+-------+----------+------+----+------------------+----------------------+
複製代碼
take(num),返回DataFrame的前num個Row數據組成的列表,注意num不要太大,容易 形成driver節點的OOM。
df = spark.read.csv('/sql/customers.csv',header=True)
df.take(3)
複製代碼
toDF(*cols),返回新的帶有指定cols名字的DataFrame對象
df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
df1 = df.toDF('id','sex','age','income','score')
df1.columns
df1.show(5)
複製代碼
toJSON(use_unicode=True),將DataFrame中的Row對象轉換爲json字符串,默認使用 unicode編碼。toJSON方法返回的是RDD對象,而不是DataFrame對象。
df = spark.read.csv('/sql/customers.csv',header=True)
df.show(5)
df1 = df.toJSON()
df1.collect()
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
+----------+------+---+------------------+----------------------+
only showing top 5 rows
['{"CustomerID":"0001","Genre":"Male","Age":"19","Annual Income (k$)":"15","Spending Score (1-100)":"39"}',
'{"CustomerID":"0002","Genre":"Male","Age":"21","Annual Income (k$)":"15","Spending Score (1-100)":"81"}',
'{"CustomerID":"0003","Genre":"Female","Age":"20","Annual Income (k$)":"16","Spending Score (1-100)":"6"}',
......]
複製代碼
toLocalIterator(),將DataFrame中全部數據返回爲本地的可迭代的數據,數據量大 了容易OOM。(調試未經過)
df = spark.read.csv('/sql/customers.csv',header=True)
results = df.toLocalIterator()
for data in results:
print(data)
複製代碼
toPandas(),將Spark中的DataFrame對象轉換爲pandas中的DataFrame對象
df = spark.read.csv('/sql/customers.csv',header=True)
pan_df = df.toPandas()
pan_df
pan_df.head(10)
複製代碼
union(other),返回兩個DataFrame的合集。
df1 = spark.createDataFrame([('regan',),('ting',),('yu',)],schema=['name'])
df2 = spark.createDataFrame([('regan',),('ting',),('sha',)],schema=['name'])
+-----+
| name|
+-----+
|regan|
| ting|
| yu|
|regan|
| ting|
| sha|
+-----+
複製代碼
unionByName(other)根據名字來找出兩個DataFrame的合集,與字段的順序不要緊,只 要字段名稱能對應上便可。
unpersist(blocking=False),這個方法用於將DataFrame上持久化的數據所有清除掉。
df1 = spark.createDataFrame([('regan',11),('ting',1),('yu',2)],schema=['name','score'])
df1.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
df1.storageLevel
df1.unpersist()
df1.storageLevel
複製代碼
where(condition),這個方法和filter方法相似。更具傳入的條件做出選擇。
df = spark.read.csv('/sql/customers.csv',header=True)
df.where('Age >= 30').show()
複製代碼
withColumn(colName, col),返回一個新的DataFrame,這個DataFrame中新增長 colName的列,或者原來自己就有colName的列,則替換掉。
f = spark.read.csv('/sql/customers.csv',header=True)
df.withColumn('Age',df.Age**2).show(10)
df.withColumn('Age2',df.Age**2).show(10)
+----------+------+------+------------------+----------------------+
|CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+------+------------------+----------------------+
| 0001| Male| 361.0| 15| 39|
| 0002| Male| 441.0| 15| 81|
| 0003|Female| 400.0| 16| 6|
| 0004|Female| 529.0| 16| 77|
| 0005|Female| 961.0| 17| 40|
| 0006|Female| 484.0| 17| 76|
| 0007|Female|1225.0| 18| 6|
| 0008|Female| 529.0| 18| 94|
| 0009| Male|4096.0| 19| 3|
| 0010|Female| 900.0| 19| 72|
+----------+------+------+------------------+----------------------+
only showing top 10 rows
+----------+------+---+------------------+----------------------+------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)| Age2|
+----------+------+---+------------------+----------------------+------+
| 0001| Male| 19| 15| 39| 361.0|
| 0002| Male| 21| 15| 81| 441.0|
| 0003|Female| 20| 16| 6| 400.0|
| 0004|Female| 23| 16| 77| 529.0|
| 0005|Female| 31| 17| 40| 961.0|
| 0006|Female| 22| 17| 76| 484.0|
| 0007|Female| 35| 18| 6|1225.0|
| 0008|Female| 23| 18| 94| 529.0|
| 0009| Male| 64| 19| 3|4096.0|
| 0010|Female| 30| 19| 72| 900.0|
+----------+------+---+------------------+----------------------+------+
only showing top 10 rows
複製代碼
withColumnRenamed(existing, new),對已經存在的列名重命名爲new,若名稱不存在 則這個操做不作任何事情。
df = spark.read.csv('/sql/customers.csv',header=True)
df.withColumnRenamed('Age','age').show(10)
df.withColumnRenamed('Age2','age').show(10)
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72|
+----------+------+---+------------------+----------------------+
複製代碼
write,藉助這個接口將DataFrame的內容保存到外部的系統
df = spark.read.csv('/sql/customers.csv',header=True)
df.write
複製代碼
group by:主要用來對查詢的結果進行分組,相同組合的分組條件在結果集中只顯示一行記錄。能夠添加聚合函數。
grouping sets:對分組集中指定的組表達式的每一個子集執行group by,group by A,B grouping sets(A,B)就等價於 group by A union group by B,其中A和B也能夠是一個集合,好比group by A,B,C grouping sets((A,B),(A,C))。
rollup:在指定表達式的每一個層次級別建立分組集。group by A,B,C with rollup首先會對(A、B、C)進行group by,而後對(A、B)進行group by,而後是(A)進行group by,最後對全表進行group by操做。
cube:爲指定表達式集的每一個可能組合建立分組集。首先會對(A、B、C)進行group by,而後依次是(A、B),(A、C),(A),(B、C),(B),( C),最後對全表進行group by操做。
case class MemberOrderInfo(area:String,memberType:String,product:String,price:Int)
import spark.implicits._
val orders=Seq(
MemberOrderInfo("深圳","鑽石會員","鑽石會員1個月",25),
MemberOrderInfo("深圳","鑽石會員","鑽石會員1個月",25),
MemberOrderInfo("深圳","鑽石會員","鑽石會員3個月",70),
MemberOrderInfo("深圳","鑽石會員","鑽石會員12個月",300),
MemberOrderInfo("深圳","鉑金會員","鉑金會員3個月",60),
MemberOrderInfo("深圳","鉑金會員","鉑金會員3個月",60),
MemberOrderInfo("深圳","鉑金會員","鉑金會員6個月",120),
MemberOrderInfo("深圳","黃金會員","黃金會員1個月",15)
)
把seq轉換成DataFrame
val memberDF:DataFrame =orders.toDF()
把DataFrame註冊成臨時表
memberDF.createOrReplaceGlobalTempView("orderTempTable")
group by
spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product").show
+----+----------+--------+-----+
|area|memberType| product|total|
+----+----------+--------+-----+
| 深圳| 鑽石會員| 鑽石會員3個月| 70|
| 深圳| 鑽石會員|鑽石會員12個月| 300|
| 深圳| 鉑金會員| 鉑金會員6個月| 120|
| 深圳| 鉑金會員| 鉑金會員3個月| 120|
| 深圳| 鑽石會員| 鑽石會員1個月| 50|
| 深圳| 黃金會員| 黃金會員1個月| 15|
+----+----------+--------+-----+
spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product grouping sets(area,memberType,product)").show
+----+----------+--------+-----+
|area|memberType| product|total|
+----+----------+--------+-----+
|null| null| 鉑金會員3個月| 120|
|null| 鉑金會員| null| 240|
|null| null|鑽石會員12個月| 300|
| 深圳| null| null| 675|
|null| 鑽石會員| null| 420|
|null| null| 鑽石會員1個月| 50|
|null| null| 黃金會員1個月| 15|
|null| null| 鑽石會員3個月| 70|
|null| 黃金會員| null| 15|
|null| null| 鉑金會員6個月| 120|
+----+----------+--------+-----+
spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product grouping sets((area,memberType),(memberType,product))").show
+----+----------+--------+-----+
|area|memberType| product|total|
+----+----------+--------+-----+
|null| 鉑金會員| 鉑金會員6個月| 120|
|null| 鑽石會員|鑽石會員12個月| 300|
|null| 鑽石會員| 鑽石會員3個月| 70|
| 深圳| 鑽石會員| null| 420|
|null| 鉑金會員| 鉑金會員3個月| 120|
|null| 黃金會員| 黃金會員1個月| 15|
|null| 鑽石會員| 鑽石會員1個月| 50|
| 深圳| 黃金會員| null| 15|
| 深圳| 鉑金會員| null| 240|
+----+----------+--------+-----+
spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product with rollup").show
+----+----------+--------+-----+
|area|memberType| product|total|
+----+----------+--------+-----+
| 深圳| 鑽石會員| 鑽石會員1個月| 50|
| 深圳| 鑽石會員|鑽石會員12個月| 300|
| 深圳| 鉑金會員| 鉑金會員3個月| 120|
| 深圳| 鑽石會員| null| 420|
| 深圳| null| null| 675|
|null| null| null| 675|
| 深圳| 鑽石會員| 鑽石會員3個月| 70|
| 深圳| 黃金會員| 黃金會員1個月| 15|
| 深圳| 黃金會員| null| 15|
| 深圳| 鉑金會員| null| 240|
| 深圳| 鉑金會員| 鉑金會員6個月| 120|
+----+----------+--------+-----+
spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product with cube").show
+----+----------+--------+-----+
|area|memberType| product|total|
+----+----------+--------+-----+
| 深圳| null| 黃金會員1個月| 15|
|null| null| 鉑金會員3個月| 120|
| 深圳| null| 鉑金會員6個月| 120|
|null| 鉑金會員| 鉑金會員6個月| 120|
|null| 鉑金會員| null| 240|
| 深圳| 鑽石會員| 鑽石會員1個月| 50|
| 深圳| null| 鑽石會員1個月| 50|
|null| 鑽石會員|鑽石會員12個月| 300|
| 深圳| 鑽石會員|鑽石會員12個月| 300|
| 深圳| 鉑金會員| 鉑金會員3個月| 120|
|null| 鑽石會員| 鑽石會員3個月| 70|
| 深圳| 鑽石會員| null| 420|
|null| null|鑽石會員12個月| 300|
| 深圳| null| null| 675|
|null| 鉑金會員| 鉑金會員3個月| 120|
|null| 鑽石會員| null| 420|
|null| 黃金會員| 黃金會員1個月| 15|
|null| 鑽石會員| 鑽石會員1個月| 50|
|null| null| 鑽石會員1個月| 50|
|null| null| null| 675|
+----+----------+--------+-----+
複製代碼
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。
Python技術棧與Spark交叉數據分析雙向整合,讓咱們在大數據融合分析達到了通用,能夠發現Spark SQL 其實很大部分功能和Pandas雷同
秦凱新 於深圳 201812172352