[Spark] 關於函數 combineByKey

combineByKey:python

Generic function to combine the elements for each key using a custom set of aggregation functions.函數

概述

.combineByKey 方法是基於鍵進行聚合的函數(大多數基於鍵聚合的函數都是用它實現的),因此這個方法仍是挺重要的。3d

咱們設聚合前Pair RDD的鍵值對格式爲:鍵爲K,鍵值格式爲V;而聚合後,鍵格式不便,鍵值格式爲C。code

combineByKey函數的定義爲:blog

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)

該函數的參數主要爲前三個:element

  • createCombiner
  • mergeValue
  • mergeCombiners

示意圖以下:hash

combineByKey-illustrated

一個例子

仍是先看一個例子,暫時看不懂能夠先看下面再回來。it

>>> test = sc.parallelize([('panda', (1,2)), ('pink',(7,2)), ('pirate',(3,1))])
>>> xx = test.combineByKey((lambda x : (x,1)),\
...                     (lambda x,y: (x[0] + y, x[1]+ 1)),\
...                     (lambda x,y : (x[0] + y[0], x[1] + y[1])) )
>>> xx.collect()
[('coffee', (3, 2)), ('panda', (3, 1))]

這裏,三個參數分別用了3個lambda表達式代替,分別爲:io

  • createCombiner : lambda x : (x,1)
  • mergeValue : lambda x , y : (x[0] + y , x[1] + 1 )
  • mergeCombiners : lambda x , y : (x[0] + y[0], x[1] + y[1])

下面解釋這三個參數。table

createCombiner

因爲聚合操做會遍分區中全部的元素,所以每一個元素(這裏指的是鍵值對)的鍵只有兩種狀況:

  • 之前沒出現過
  • 之前出現過

若是之前沒出現過,則執行的是createCombiner方法;不然執行mergeValue方法,即:

Key-Value-Pair

.createCombiner()會在新遇到的鍵對應的累加器中賦予初始值。

該函數在格式上是由 V -> C 的,在上面的例子裏面,是由 整數類型 -> 二元元組類型,這個二元元組第二個元素爲1。

mergeValue

對於已經出現過的鍵(key),調用mergeValue來進行聚合操做,對該鍵的累加器對應的當前值(C格式)於這個新的值(V格式)進行合併。

mergeCombiners

若是有兩個或者更多的分區(這裏的例子裏沒提到)都有對應同一個鍵的累加器,就須要使用用戶提供的mergeCombiners()方法將各個分區的結果(全是C格式)進行合併。

相關文章
相關標籤/搜索