Spark1.4中DataFrame功能加強,新增科學和數學函數

社區在Spark 1.3中開始引入了DataFrames,使得Apache Spark更加容易被使用。受R和Python中的data frames激發,Spark中的DataFrames提供了一些API,這些API在外部看起來像是操作單機的數據一樣,而數據科學家對這些API非常地熟悉。統計是日常數據科學的一個重要組成部分。在即將發佈的Spark 1.4中改進支持統計函數和數學函數(statistical and mathematical functions)。

  這篇文章中將介紹一些非常重要的函數,包括:
  1、隨機數據生成(Random data generation);
  2、總結和描述性統計(Summary and descriptive statistics);
  3、樣本協方差和相關性(Sample covariance and correlation);
  4、交叉分類彙總表(又稱列聯表)(Cross tabulation);
  5、頻繁項(Frequent items);
  6、數學函數(Mathematical functions)。

  下面的例子全部是使用Python語言實現,在Scala和Java中存在類似的API。


如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號: iteblog_hadoop

一、隨機數據生成(Random data generation)

  隨機數據生成在測試現有的算法和實現隨機算法中非常重要,比如隨機投影。在sql.functions函數裏面提供了生成包含i.i.uniform(rand)和標準的normal(randn)

In [ 1 ]: from pyspark.sql.functions import rand, randn
In [ 2 ]: # Create a DataFrame with one int column and 10 rows.
In [ 3 ]: df = sqlContext. range ( 0 , 10 )
In [ 4 ]: df.show()
+ - - +
| id |
+ - - +
| 0 |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
+ - - +
 
In [ 4 ]: # Generate two other columns using uniform distribution and normal distribution.
In [ 5 ]: df.select( "id" , rand(seed = 10 ).alias( "uniform" ), randn(seed = 27 ).alias( "normal" )).show()
+ - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
| id |            uniform|              normal|
+ - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
| 0 | 0.7224977951905031 | - 0.1875348803463305 |
| 1 | 0.2953174992603351 | - 0.26525647952450265 |
| 2 | 0.4536856090041318 | - 0.7195024130068081 |
| 3 | 0.9970412477032209 0.5181478766595276 |
| 4 | 0.19657711634539565 0.7316273979766378 |
| 5 | 0.48533720635534006 | 0.07724879367590629 |
| 6 | 0.7369825278894753 | - 0.5462256961278941 |
| 7 | 0.5241113627472694 | - 0.2542275002421211 |
| 8 | 0.2977697066654349 | - 0.5752237580095868 |
| 9 | 0.5060159582230856 1.0900096472044518 |
+ - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +

二、總結和描述性統計(Summary and descriptive statistics)

  我們在導入數據之後的第一個操作是想獲取一些數據,來看看他到底是不是我們所要的。對於數字列,瞭解這些數據的描述性統計可以幫助我們理解我們數據的分佈。describe函數返回的是一個DataFrame,而這個DataFrame中包含了每個數字列的很多信息,比如不爲空的實體總數、平均值、標準差以及最大最小值。

In [ 1 ]: from pyspark.sql.functions import rand, randn
In [ 2 ]: # A slightly different way to generate the two random columns
In [ 3 ]: df = sqlContext. range ( 0 , 10 ).withColumn( 'uniform' , rand(seed = 10 )).withColumn( 'normal' , randn(seed = 27 ))
 
In [ 4 ]: df.describe().show()
+ - - - - - - - + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
|summary|                id |            uniform|              normal|
+ - - - - - - - + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
|  count|                10 |                 10 |                  10 |
|   mean|               4.5 | 0.5215336029384192 | - 0.01309370117407197 |
| stddev| 2.8722813232690143 0.229328162820653 0.5756058014772729 |
|    min |                 0 | 0.19657711634539565 | - 0.7195024130068081 |
|    max |                 9 | 0.9970412477032209 1.0900096472044518 |
+ - - - - - - - + - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +

如果返回的DataFrame含有大量的列,你可以返回其中的一部分列:

In [ 4 ]: df.describe( 'uniform' , 'normal' ).show()
+ - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
|summary|            uniform|              normal|
+ - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
|  count|                 10 |                  10 |
|   mean| 0.5215336029384192 | - 0.01309370117407197 |
| stddev|  0.229328162820653 0.5756058014772729 |
|    min | 0.19657711634539565 | - 0.7195024130068081 |
|    max | 0.9970412477032209 1.0900096472044518 |
+ - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +

  當然,雖然describe在那些快速探索性數據分析中可以很好的工作,你還可以控制描述性統計的展示以及那些使用DataFrame中簡單選擇的列(這句話好彆扭,請看英文you can also control the list of descriptive statistics and the columns they apply to using the normal select on a DataFrame:)

In [ 5 ]: from pyspark.sql.functions import mean, min , max
In [ 6 ]: df.select([mean( 'uniform' ), min ( 'uniform' ), max ( 'uniform' )]).show()
+ - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - +
|      AVG(uniform)|       MIN (uniform)|      MAX (uniform)|
+ - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - +
| 0.5215336029384192 | 0.19657711634539565 | 0.9970412477032209 |
+ - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - +

三、樣本協方差和相關性(Sample covariance and correlation)

  協方差表示的是兩個變量的總體的誤差。正數意味着其中一個增加,另外一個也有增加的趨勢;而負數意味着其中一個數增加,另外一個有降低的趨勢。DataFrame兩列中的樣本協方差計算可以如下:

In [ 1 ]: from pyspark.sql.functions import rand
In [ 2 ]: df = sqlContext. range ( 0 , 10 ).withColumn( 'rand1' , rand(seed = 10 )).withColumn( 'rand2' , rand(seed = 27 ))
 
In [ 3 ]: df.stat.cov( 'rand1' , 'rand2' )
Out[ 3 ]: 0.009908130446217347
 
In [ 4 ]: df.stat.cov( 'id' , 'id' )
Out[ 4 ]: 9.166666666666666

正如你從上面看到的,兩個隨機生成的列之間的協方差接近零;而id列和它自己的協方差非常大。

協方差的值爲9.17可能很難解釋,而相關是協方差的歸一化度量,這個相對更好理解,因爲它提供了兩個隨機變量之間的統計相關性的定量測量。

In [ 5 ]: df.stat.corr( 'rand1' , 'rand2' )
Out[ 5 ]: 0.14938694513735398
 
In [ 6 ]: df.stat.corr( 'id' , 'id' )
Out[ 6 ]: 1.0

在上面的例子中,ID那列完全與相關本身;而兩個隨機生成的列之間的相關性非常低。

四、交叉分類彙總表(又稱列聯表)(Cross tabulation)

  如果同時按幾個變量或特徵,把數據分類列表時,這樣的統計表叫作交叉分類彙總表,其主要用來檢驗兩個變量之間是否存在關係,或者說是否獨立。在Spark 1.4中,我們可以計算DataFrame中兩列之間的交叉分類彙總表,以便獲取計算的兩列中不同對的數量,下面是關於如何使用交叉表來獲取列聯表的例子

In [ 1 ]: # Create a DataFrame with two columns (name, item)
In [ 2 ]: names = [ "Alice" , "Bob" , "Mike" ]
In [ 3 ]: items = [ "milk" , "bread" , "butter" , "apples" , "oranges" ]
In [ 4 ]: df = sqlContext.createDataFrame([(names[i % 3 ], items[i % 5 ]) for i in range ( 100 )], [ "name" , "item" ])
 
In [ 5 ]: # Take a look at the first 10 rows.
In [ 6 ]: df.show( 10 )
+ - - - - - + - - - - - - - +
| name|   item|
+ - - - - - + - - - - - - - +
|Alice|   milk|
|  Bob|  bread|
| Mike| butter|
|Alice| apples|
|  Bob|oranges|
| Mike|   milk|
|Alice|  bread|
|  Bob| butter|
| Mike| apples|
|Alice|oranges|
+ - - - - - + - - - - - - - +
 
In [ 7 ]: df.stat.crosstab( "name" , "item" ).show()
+ - - - - - - - - - + - - - - + - - - - - + - - - - - - + - - - - - - + - - - - - - - +
|name_item|milk|bread|apples|butter|oranges|
+ - - - - - - - - - + - - - - + - - - - - + - - - - - - + - - - - - - + - - - - - - - +
|      Bob|   6 |    7 |     7 |     6 |      7 |
|     Mike|   7 |    6 |     7 |     7 |      6 |
|    Alice|   7 |    7 |     6 |     7 |      7 |
+ - - - - - - - - - + - - - - + - - - - - + - - - - - - + - - - - - - + - - - - - - - +

我們需要記住,列的基數不能太大。也就是說,name和item distinct之後的數量不能過多。試想,如果item distinct之後的數量爲10億,那麼你如何在屏幕上顯示這個表??

五、頻繁項(Frequent items)

  瞭解列中那些頻繁出現的item對於我們瞭解數據集非常重要。在Spark 1.4中,我們可以通過使用DataFrames來發現列中的頻繁項,

In [ 1 ]: df = sqlContext.createDataFrame([( 1 , 2 , 3 ) if i % 2 = = 0 else (i, 2 * i, i % 4 ) for i in range ( 100 )], [ "a" , "b" , "c" ])
 
In [ 2 ]: df.show( 10 )
+ - + - - + - +
|a| b|c|
+ - + - - + - +
| 1 | 2 | 3 |
| 1 | 2 | 1 |
| 1 | 2 | 3 |
| 3 | 6 | 3 |
| 1 | 2 | 3 |
| 5 | 10 | 1 |
| 1 | 2 | 3 |
| 7 | 14 | 3 |
| 1 | 2 | 3 |
| 9 | 18 | 1 |
+ - + - - + - +
 
In [ 3 ]: freq = df.stat.freqItems([ "a" , "b" , "c" ], 0.4 )

對應上面的DataFrame,下面的代碼可以計算出每列中出現40%的頻繁項

In [ 4 ]: freq.collect()[ 0 ]
Out[ 4 ]: Row(a_freqItems = [ 11 , 1 ], b_freqItems = [ 2 , 22 ], c_freqItems = [ 1 , 3 ])

正如你所看到的,11和1是列a的頻繁值。同樣,你也可以獲取到列組合的頻繁項,我們可以通過struct函數來創建列組合

In [ 5 ]: from pyspark.sql.functions import struct
 
In [ 6 ]: freq = df.withColumn( 'ab' , struct( 'a' ,
相關文章
相關標籤/搜索