社區在Spark 1.3中開始引入了DataFrames,使得Apache Spark更加容易被使用。受R和Python中的data frames激發,Spark中的DataFrames提供了一些API,這些API在外部看起來像是操作單機的數據一樣,而數據科學家對這些API非常地熟悉。統計是日常數據科學的一個重要組成部分。在即將發佈的Spark 1.4中改進支持統計函數和數學函數(statistical and mathematical functions)。
這篇文章中將介紹一些非常重要的函數,包括:
1、隨機數據生成(Random data generation);
2、總結和描述性統計(Summary and descriptive statistics);
3、樣本協方差和相關性(Sample covariance and correlation);
4、交叉分類彙總表(又稱列聯表)(Cross tabulation);
5、頻繁項(Frequent items);
6、數學函數(Mathematical functions)。
下面的例子全部是使用Python語言實現,在Scala和Java中存在類似的API。
文章目錄
隨機數據生成在測試現有的算法和實現隨機算法中非常重要,比如隨機投影。在sql.functions
函數裏面提供了生成包含i.i.uniform(rand)
和標準的normal(randn)
。
In [
1
]:
from
pyspark.sql.functions
import
rand, randn
In [
2
]:
# Create a DataFrame with one int column and 10 rows.
In [
3
]: df
=
sqlContext.
range
(
0
,
10
)
In [
4
]: df.show()
+
-
-
+
|
id
|
+
-
-
+
|
0
|
|
1
|
|
2
|
|
3
|
|
4
|
|
5
|
|
6
|
|
7
|
|
8
|
|
9
|
+
-
-
+
In [
4
]:
# Generate two other columns using uniform distribution and normal distribution.
In [
5
]: df.select(
"id"
, rand(seed
=
10
).alias(
"uniform"
), randn(seed
=
27
).alias(
"normal"
)).show()
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
id
| uniform| normal|
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
0
|
0.7224977951905031
|
-
0.1875348803463305
|
|
1
|
0.2953174992603351
|
-
0.26525647952450265
|
|
2
|
0.4536856090041318
|
-
0.7195024130068081
|
|
3
|
0.9970412477032209
|
0.5181478766595276
|
|
4
|
0.19657711634539565
|
0.7316273979766378
|
|
5
|
0.48533720635534006
|
0.07724879367590629
|
|
6
|
0.7369825278894753
|
-
0.5462256961278941
|
|
7
|
0.5241113627472694
|
-
0.2542275002421211
|
|
8
|
0.2977697066654349
|
-
0.5752237580095868
|
|
9
|
0.5060159582230856
|
1.0900096472044518
|
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
我們在導入數據之後的第一個操作是想獲取一些數據,來看看他到底是不是我們所要的。對於數字列,瞭解這些數據的描述性統計可以幫助我們理解我們數據的分佈。describe
函數返回的是一個DataFrame,而這個DataFrame中包含了每個數字列的很多信息,比如不爲空的實體總數、平均值、標準差以及最大最小值。
In [
1
]:
from
pyspark.sql.functions
import
rand, randn
In [
2
]:
# A slightly different way to generate the two random columns
In [
3
]: df
=
sqlContext.
range
(
0
,
10
).withColumn(
'uniform'
, rand(seed
=
10
)).withColumn(
'normal'
, randn(seed
=
27
))
In [
4
]: df.describe().show()
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|summary|
id
| uniform| normal|
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
| count|
10
|
10
|
10
|
| mean|
4.5
|
0.5215336029384192
|
-
0.01309370117407197
|
| stddev|
2.8722813232690143
|
0.229328162820653
|
0.5756058014772729
|
|
min
|
0
|
0.19657711634539565
|
-
0.7195024130068081
|
|
max
|
9
|
0.9970412477032209
|
1.0900096472044518
|
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
如果返回的DataFrame含有大量的列,你可以返回其中的一部分列:
In [
4
]: df.describe(
'uniform'
,
'normal'
).show()
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|summary| uniform| normal|
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
| count|
10
|
10
|
| mean|
0.5215336029384192
|
-
0.01309370117407197
|
| stddev|
0.229328162820653
|
0.5756058014772729
|
|
min
|
0.19657711634539565
|
-
0.7195024130068081
|
|
max
|
0.9970412477032209
|
1.0900096472044518
|
+
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
當然,雖然describe在那些快速探索性數據分析中可以很好的工作,你還可以控制描述性統計的展示以及那些使用DataFrame中簡單選擇的列(這句話好彆扭,請看英文you can also control the list of descriptive statistics and the columns they apply to using the normal select on a DataFrame:)
In [
5
]:
from
pyspark.sql.functions
import
mean,
min
,
max
In [
6
]: df.select([mean(
'uniform'
),
min
(
'uniform'
),
max
(
'uniform'
)]).show()
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
| AVG(uniform)|
MIN
(uniform)|
MAX
(uniform)|
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
0.5215336029384192
|
0.19657711634539565
|
0.9970412477032209
|
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
協方差表示的是兩個變量的總體的誤差。正數意味着其中一個增加,另外一個也有增加的趨勢;而負數意味着其中一個數增加,另外一個有降低的趨勢。DataFrame兩列中的樣本協方差計算可以如下:
In [
1
]:
from
pyspark.sql.functions
import
rand
In [
2
]: df
=
sqlContext.
range
(
0
,
10
).withColumn(
'rand1'
, rand(seed
=
10
)).withColumn(
'rand2'
, rand(seed
=
27
))
In [
3
]: df.stat.cov(
'rand1'
,
'rand2'
)
Out[
3
]:
0.009908130446217347
In [
4
]: df.stat.cov(
'id'
,
'id'
)
Out[
4
]:
9.166666666666666
|
正如你從上面看到的,兩個隨機生成的列之間的協方差接近零;而id列和它自己的協方差非常大。
協方差的值爲9.17可能很難解釋,而相關是協方差的歸一化度量,這個相對更好理解,因爲它提供了兩個隨機變量之間的統計相關性的定量測量。
In [
5
]: df.stat.corr(
'rand1'
,
'rand2'
)
Out[
5
]:
0.14938694513735398
In [
6
]: df.stat.corr(
'id'
,
'id'
)
Out[
6
]:
1.0
|
在上面的例子中,ID那列完全與相關本身;而兩個隨機生成的列之間的相關性非常低。
如果同時按幾個變量或特徵,把數據分類列表時,這樣的統計表叫作交叉分類彙總表,其主要用來檢驗兩個變量之間是否存在關係,或者說是否獨立。在Spark 1.4中,我們可以計算DataFrame中兩列之間的交叉分類彙總表,以便獲取計算的兩列中不同對的數量,下面是關於如何使用交叉表來獲取列聯表的例子
In [
1
]:
# Create a DataFrame with two columns (name, item)
In [
2
]: names
=
[
"Alice"
,
"Bob"
,
"Mike"
]
In [
3
]: items
=
[
"milk"
,
"bread"
,
"butter"
,
"apples"
,
"oranges"
]
In [
4
]: df
=
sqlContext.createDataFrame([(names[i
%
3
], items[i
%
5
])
for
i
in
range
(
100
)], [
"name"
,
"item"
])
In [
5
]:
# Take a look at the first 10 rows.
In [
6
]: df.show(
10
)
+
-
-
-
-
-
+
-
-
-
-
-
-
-
+
| name| item|
+
-
-
-
-
-
+
-
-
-
-
-
-
-
+
|Alice| milk|
| Bob| bread|
| Mike| butter|
|Alice| apples|
| Bob|oranges|
| Mike| milk|
|Alice| bread|
| Bob| butter|
| Mike| apples|
|Alice|oranges|
+
-
-
-
-
-
+
-
-
-
-
-
-
-
+
In [
7
]: df.stat.crosstab(
"name"
,
"item"
).show()
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
+
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
-
+
|name_item|milk|bread|apples|butter|oranges|
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
+
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
-
+
| Bob|
6
|
7
|
7
|
6
|
7
|
| Mike|
7
|
6
|
7
|
7
|
6
|
| Alice|
7
|
7
|
6
|
7
|
7
|
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
+
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
-
+
|
我們需要記住,列的基數不能太大。也就是說,name和item distinct之後的數量不能過多。試想,如果item distinct之後的數量爲10億,那麼你如何在屏幕上顯示這個表??
瞭解列中那些頻繁出現的item對於我們瞭解數據集非常重要。在Spark 1.4中,我們可以通過使用DataFrames來發現列中的頻繁項,
In [
1
]: df
=
sqlContext.createDataFrame([(
1
,
2
,
3
)
if
i
%
2
=
=
0
else
(i,
2
*
i, i
%
4
)
for
i
in
range
(
100
)], [
"a"
,
"b"
,
"c"
])
In [
2
]: df.show(
10
)
+
-
+
-
-
+
-
+
|a| b|c|
+
-
+
-
-
+
-
+
|
1
|
2
|
3
|
|
1
|
2
|
1
|
|
1
|
2
|
3
|
|
3
|
6
|
3
|
|
1
|
2
|
3
|
|
5
|
10
|
1
|
|
1
|
2
|
3
|
|
7
|
14
|
3
|
|
1
|
2
|
3
|
|
9
|
18
|
1
|
+
-
+
-
-
+
-
+
In [
3
]: freq
=
df.stat.freqItems([
"a"
,
"b"
,
"c"
],
0.4
)
|
對應上面的DataFrame,下面的代碼可以計算出每列中出現40%的頻繁項
In [
4
]: freq.collect()[
0
]
Out[
4
]: Row(a_freqItems
=
[
11
,
1
], b_freqItems
=
[
2
,
22
], c_freqItems
=
[
1
,
3
])
|
正如你所看到的,11和1是列a的頻繁值。同樣,你也可以獲取到列組合的頻繁項,我們可以通過struct函數來創建列組合
In [
5
]:
from
pyspark.sql.functions
import
struct
In [
6
]: freq
=
df.withColumn(
'ab'
, struct(
'a'
,
|