[Pyspark]RDD經常使用方法總結

aggregate(zeroValue, seqOp, combOp)

  • 入參:html

    • zeroValue表示一組初值 Tuple
    • seqOp表示在各個分區partition中進行 什麼樣的聚合操做,支持不一樣類型的聚合 Func
    • combOp表示將不一樣分區partition聚合後的結果再進行聚合,只能進行同類型聚合 Func
  • 返回:python

    • 聚合後的結果,不是RDD,是一個python對象

下面是對一組數進行累加,並計算數據的長度的例子shell

# sum, sum1, sum2 的數據類型跟zeroValue同樣, 是一個tuple(int, int)
    seqOp = (lambda sum, item: (sum[0] + item, sum[1] + 1))
    combOp = (lambda sum1, sum2: (sum1[0] + sum2[0], sum1[1] + sum2[1]))
    result = sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

    print(result) # (10, 4)

aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions, partitionFunc)

基本跟aggregate相似,在相同的key下進行聚合操做apache

  • 入參:api

    • zeroValue表示一組初值 Tuple
    • seqFunc表示在各個分區partition中進行 什麼樣的聚合操做,支持不一樣類型的聚合 Func
    • combFunc表示將不一樣分區partition聚合後的結果再進行聚合,只能進行同類型聚合 Func
    • numPartitions表示須要將此操做分割成多少個分區
    • partitionFunc自定義分區方法
  • 返回:app

    • 聚合後的結果是一個RDD,再也不是一個python對象, 須要調用collect()方法取回

下面是對一隊成員的成績進行累加,並計算成員的總分和參加科目的總數ide

seqFunc = (lambda sum, item: (sum[0] + item, sum[1] + 1))
    combFunc = (lambda sum1, sum2: (sum1[0] + sum2[0], sum1[1] + sum2[1]))
    result = sc.parallelize(
        [("A", 83), ("A", 74), ("A", 91), ("A", 82),
         ("B", 69), ("B", 62), ("B", 97), ("B", 80), ("B", 60),
         ("C", 78), ("C", 73), ("C", 68)]) \
        .aggregateByKey((0, 0), seqFunc, combFunc)

    print(result.collect()) # [('B', (368, 5)), ('C', (219, 3)), ('A', (330, 4))]

cache()

將RDD結果存儲在內存中,以便再次利用函數

如下兩條語句相等spa

result = sc.parallelize([1, 2, 3, 4]).cache()
    result2 = sc.parallelize([1, 2, 3, 4]) \
        .persist(storageLevel=StorageLevel.MEMORY_ONLY)

cartesian(rdd)

返回本身與傳入rdd的笛卡爾積code

  • 入參:

    • rdd表示一個rdd對象,能夠存儲不一樣數據類型 RDD
  • 返回:

    • 返回的結果是一個RDD
num_rdd = sc.parallelize([1, 2])
    str_rdd = sc.parallelize(['a', 'y'])
    result = num_rdd.cartesian(str_rdd)

    print(result.collect()) # [(1, 'a'), (1, 'y'), (2, 'a'), (2, 'y')]

coalesce(numPartitions, shuffle)

經常使用於壓縮任務,當分區過多時,將形成並行計算效率下降,調度器在不一樣分區中頻繁切換,沒有充分時間去完成計算任務。

  • 入參:

    • numPartitions表示須要將此操做壓縮成多少個分區 Int
    • shuffle表示是否平均分給每一個分區,並非對數據進行打亂 Boolean
      (由於數據的偏斜, 也將影響並行計算的效率, 簡單理解=> 木桶效應)
  • 返回:

    • 返回的結果是一個RDD
num_rdd = sc.parallelize([i for i in range(0, 12)], 5)
    print(num_rdd.glom().collect()) # [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9, 10, 11]]

    new_rdd = num_rdd.coalesce(2, shuffle=True)
    print(new_rdd.glom().collect()) # [[0, 1, 4, 5, 6, 7], [2, 3, 8, 9, 10, 11]]

    new_rdd2 = num_rdd.coalesce(2, shuffle=False)
    print(new_rdd2.glom().collect()) # [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9, 10, 11]]

cogroup(rdd, numPartitions)

將兩個RDD中相同key進行合併,

  • 入參:

    • rdd表示一個rdd對象,能夠存儲不一樣數據類型 RDD
    • numPartitions表示須要將此操做壓縮成多少個分區 Int
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("y", 4)])
    z = x.cogroup(y) \
        .map(lambda item: (item[0], list(item[1][0]), list(item[1][1])))

    print(z.collect()) # [('b', [4], []), ('y', [], [4]), ('a', [1], [2])]

collect()

將數據以List取回本地
官網提示,建議只在任務結束時在調用collect方法,不然很容易OOM

  • 返回:
    • 返回的結果是一個List

collectAsMap()

將數據以key-value對的形式取回本地

  • 返回:
    • 返回的結果是一個Dict

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions, partitionFunc)

基本跟aggregate相似,在相同的key下進行聚合操做, 計算過程發生在Driver端

  • 入參:
    • createCombiner表示一個處理初值的函數 Func
    • mergeValue表示在所在的Map節點上進行什麼樣的聚合操做,支持不一樣類型的聚合 Func
    • mergeCombiners表示將不一樣Map節點上相同key聚合後的結果再進行聚合,只能進行同類型聚合 Func
    • numPartitions表示須要將此操做分割成多少個分區
    • partitionFunc自定義分區方法
init = (lambda val: [val])
    seqFunc = (lambda sum_list, item: sum_list + [item])
    combFunc = (lambda sum_list1, sum_list2: sum_list1 + sum_list2)
    result = sc.parallelize(
        [("A", 83), ("A", 74), ("A", 91), ("A", 82),
         ("B", 69), ("B", 62), ("B", 97), ("B", 80), ("B", 60),
         ("C", 78), ("C", 73), ("C", 68)]) \
        .combineByKey(init, seqFunc, combFunc)

    print(result.collect()) 
    # [('B', [69, 62, 97, 80, 60]), ('C', [78, 73, 68]), ('A', [83, 74, 91, 82])]

count()

返回RDD內存儲的數據長度(List形式)

  • 返回:
    • 返回的結果是一個Int

countApprox(timeout, confidence)

計算結果的估計數量;返回在timeout時間內完成的計算任務 的數據長度(List形式)

  • 入參:

    • timeout表示一個最大的計算時間 (毫秒) Int
    • confidence表示置信區間 Float
  • 返回:

    • 返回的結果是一個Int
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    print(rdd.countApprox(100)) # 3

countByKey()

返回每一個key對應的元素數量

  • 返回:
    • 返回的結果是一個Dict
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    print(rdd.countByKey()) # defaultdict(<class 'int'>, {'a': 2, 'b': 1})

countByValue()

返回每一個value出現的次數

  • 返回:
    • 返回的結果是一個Dict
rdd2 = sc.parallelize([1, 2, 1, 2, 2], 2)
    print(rdd2.countByValue())  # defaultdict(<class 'int'>, {1: 2, 2: 3})

distinct()

遍歷所有元素,並返回包含的不一樣元素的總數

  • 入參:

    • numPartitions表示須要將此操做分割成多少個分區
  • 返回:

    • 返回的結果是一個Int

filter(func)

遍歷所有元素,篩選符合傳入方法的元素

  • 入參:

    • func表示須要應用到每一個元素的篩選方法
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
    rdd.filter(lambda x: x % 2 == 0)
    print(rdd.collect()) # [2, 4]

flatMap(func, preservesPartitioning)

遍歷所有元素,將傳入方法應用到每一個元素上,並將最後結果展平(壓成一個List)

  • 入參:

    • func表示須要應用到每一個元素的方法
    • preservesPartitioning是否保持當前分區方式,默認從新分區
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([2, 3, 4])
    sorted(rdd.flatMap(lambda x: range(1, x)).collect()) #[1, 1, 1, 2, 2, 3]
    sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) #[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

flatMapValues(func)

遍歷某個元素的元素值,將傳入方法應用到每一個元素值上,並將最後結果展平(壓成一個List)

  • 入參:

    • func表示須要應用到每一個元素值的方法
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    x.flatMapValues(lambda val: val).collect() # [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

fold(zeroValue, func)

fold()與reduce()相似,接收與reduce接收的函數簽名相同的函數,另外再加上一個初始值做爲第一次調用的結果。(例如,加法初始值應爲0,乘法初始值應爲1)

  • 入參:

    • zeroValue表示一組初值 Tuple
    • func表示須要應用到每一個元素上的方法 Func
  • 返回:

    • 聚合後的結果,不是RDD,是一個python對象
x = sc.parallelize([1, 2, 3, 4, 5])
    x.fold(0, add) # 15

foldByKey(zeroValue, func, numPartitions, partitionFunc)

基本跟fold()相似,在相同的key下進行聚合操做

  • 入參:

    • zeroValue表示一組初值 Tuple
    • func表示須要應用到每一個元素上的方法 Func
    • numPartitions表示須要將此操做分割成多少個分區
    • partitionFunc自定義分區方法
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    x.foldByKey(0, add).collect() # [('a', 2), ('b', 1)]

foreach(func)

用於遍歷RDD中的元素,將函數func應用於每個元素。

  • 入參:

    • func表示須要應用到每一個元素的方法, 但這個方法不會在客戶端執行
  • 返回:

    • 返回的結果是一個RDD
def f(x): print(x)
    sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

foreachPartition(func)

遍歷某個分區下的所有元素,將函數func應用於每個元素。

  • 入參:

    • func表示須要應用到每一個元素的方法, 但這個方法不會在客戶端執行
  • 返回:

    • 返回的結果是一個RDD
def f(iterator):
      for x in iterator:
           print(x)
    sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)

glom()

按分區對元素進行聚合, 返回一個二維列表

  • 返回:
    • 返回的結果是一個RDD
rdd = sc.parallelize([1, 2, 3, 4], 2)
    sorted(rdd.glom().collect()) # [[1, 2], [3, 4]]

groupyBy(func, numPartitions, partitionFunc)

這個算子接收一個Func,應用函數後的返回值做爲key,而後經過這個key來對裏面的元素進行分組。

  • 入參:

    • func表示須要應用到每一個元素上的方法 Func
    • numPartitions表示須要將此操做分割成多少個分區
    • partitionFunc自定義分區方法
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
    result = rdd.groupBy(lambda x: x % 2).collect()
    sorted([(x, sorted(y)) for (x, y) in result]) # [(0, [2, 8]), (1, [1, 1, 3, 5])]

groupByKey(numPartitions, partitionFunc)

與groupBy相似,不須要再傳入func

groupWith(rdd, *rdd)

cogroup的增強版,能夠用於多於兩個的RDD合併

  • 入參:

    • rdd表示一個rdd對象,能夠存儲不一樣數據類型 RDD
    • *rdd表示一個rdd可變列表對象,能夠是多個RDD對象 RDD
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("y", 4)])
    w = sc.parallelize([("c", 3), ("a", 6)])
    z = x.groupWith(y, w) \
        .map(lambda item: (item[0], list(item[1][0]), list(item[1][1])))
    print(z.collect()) # [('b', [4], []), ('y', [], [4]), ('a', [1], [2]), ('c', [], [])]

join(rdd, numPartitions)

內鏈接,將兩個RDD中具備相同的key時進行鏈接

  • 入參:

    • rdd表示一個rdd對象,能夠存儲不一樣數據類型 RDD
    • numPartitions表示須要將此操做分割成多少個分區
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("a", 3)])
    sorted(x.join(y).collect()) # [('a', (1, 2)), ('a', (1, 3))]

leftOuterJoin(other, numPartitions=None)

左外鏈接, 與join相似

lookup(key)

  • 入參:

    • key表示須要查找元素的key
  • 返回:

    • 返回的結果是一個List
rdd = sc.parallelize(list(zip(range(100), range(100, 200))), 10) #[(0,100), (1,101), ...]
    result = rdd.lookup(6)
    print(result) # 106

map(func, preservesPartitioning)

對於每一個元素都應用這個func

  • 入參:

    • func表示須要應用到每一個元素的方法
    • preservesPartitioning是否保持當前分區方式,默認從新分區
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize(["b", "a", "c"])
    sorted(rdd.map(lambda x: (x, 1)).collect()) #[('a', 1), ('b', 1), ('c', 1)]

mapPartitions(func, preservesPartitioning)

對於每一個分區應用這個func

  • 入參:

    • func表示須要應用到每一個元素的方法
    • preservesPartitioning是否保持當前分區方式,默認從新分區
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([1, 2, 3, 4], 2)
    def f(iterator): yield sum(iterator)
    rdd.mapPartitions(f).collect() # [3, 7]

mapPartitionsWithIndex(func, preservesPartitioning)

對於每一個分區應用這個func,但同時會被傳入分區的index

  • 入參:

    • func表示須要應用到每一個元素的方法
    • preservesPartitioning是否保持當前分區方式,默認從新分區
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([1, 2, 3, 4], 4)
  def f(splitIndex, iterator): yield splitIndex
  rdd.mapPartitionsWithIndex(f).sum() # 6 =>  0 + 1 + 2 + 3

mapValues(func)

對鍵值對中每一個value都應用這個func,並保持key不變

  • 入參:

    • func表示須要應用到每一個元素值上的方法
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
    def f(x): return len(x)
    x.mapValues(f).collect() # [('a', 3), ('b', 1)]

partitionBy(numPartitions, partitionFunc)

返回一個原始RDD通過自定義分區方法的拷貝

  • 入參:

    • numPartitions表示須要將此操做分割成多少個分區
    • partitionFunc自定義分區方法
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([(0, 1), (1, 2), (1, 3), (0, 2), (3, 5), (5, 6)])
    new_rdd = rdd.partitionBy(numPartitions=3, partitionFunc=lambda x: hash(x))
    print(rdd.glom().collect()) # [[], [(0, 1)], [(1, 2)], [(1, 3)], [], [(0, 2)], [(3, 5)], [(5, 6)]]
    print(new_rdd.glom().collect()) # [[(0, 1), (0, 2), (3, 5)], [(1, 2), (1, 3)], [(5, 6)]]

persist(storageLevel)

當RDD第一次計算完成以後,保存起來,具體保存在什麼位置根據storageLevel來決定

  • 入參:
    • storageLevel表示RDD的計算結果保存的位置, 內存或者硬盤中(以文件形式)
rdd = sc.parallelize(["b", "a", "c"])
    rdd.persist().is_cached # True

pipe(command, env=None, checkCode=False)

將由管道命令建立的數據以RDD形式拉取到內存中

  • 入參:

    • command表示須要執行的命令
    • env前置環境配置參數
    • checkCode是否檢查shell命令的返回值
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
    y = x.pipe('grep -i "A"') 
    print(x.collect()) # ['A', 'Ba', 'C', 'AD']
    print(y.collect()) # ['A', 'Ba', 'AD']

reduce(func)

對於每一個元素值都應用這個func

  • 入參:

    • func表示須要應用到每一個元素的方法
  • 返回:

    • 返回的結果是一個Python obj, 與元素值得數據類型一致
x = sc.parallelize([1, 2, 3])
    y = x.reduce(lambda a, b : a + b )
    print(x.collect()) # [1, 2, 3]
    print(y) # 6

reduceByKey(func, numPartitions=None, partitionFunc)

對於這個key對應的元素值都應用這個func

  • 入參:

    • func表示須要應用到每一個元素值的方法
    • numPartitions表示須要將此操做分割成多少個分區
    • partitionFunc自定義分區方法
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    sorted(rdd.reduceByKey(add).collect()) # [('a', 2), ('b', 1)]

reduceByKeyLocally(func)

功能跟reduceByKey相同,可是計算髮生在mapper節點中,計算結果直接傳回主節點,相似combiner

  • 入參:

    • func表示須要應用到每一個元素值的方法 Func
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    sorted(rdd.reduceByKey(add).collect()) # [('a', 2), ('b', 1)]

repartition(numPartitions)

對RDD按指定分區數量進行從新分區

  • 入參:

    • numPartitions表示須要將此操做分割成多少個分區 Int
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
    sorted(rdd.glom().collect()) # [[1], [2, 3], [4, 5], [6, 7]]
    len(rdd.repartition(2).glom().collect()) # 2
    len(rdd.repartition(10).glom().collect()) # 10

rightOuterJoin(other, numPartitions)

右外鏈接, 與join相似

sample(withReplacement, fraction, seed)

返回RDD數據的一個子集

  • 入參:

    • withReplacement表示是否試放回抽樣 Boolean
    • fraction表示抽樣比例 Float
    • seed隨機種子
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize(range(100), 4)
    sample = rdd.sample(False, 0.1, 666)
    print(sample.count()) # 11

sortBy(keyfunc, ascending, numPartitions)

根據keyfunc對RDD進行排序

  • 入參:

    • keyfunc表示須要被排序的key Func
    • ascending表示升序或者降序 Boolean 默認升序
    • numPartitions表示須要將此操做分割成多少個分區 Int
  • 返回:

    • 返回的結果是一個RDD
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

sortByKey(ascending, numPartitions, keyfunc)

對RDD進行排序,默認RDD內的數據是tuple(key,value)形式

  • 入參:

    • ascending表示升序或者降序 Boolean 默認升序
    • numPartitions表示須要將此操做分割成多少個分區 Int
    • keyfunc表示須要被排序的key Func
  • 返回:

    • 返回的結果是一個RDD
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    sc.parallelize(tmp).sortByKey(True, 1).collect() # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

takeOrdered(num, key)

返回排序後的前num個數據

  • 入參:

    • num表示須要返回元素的數量
    • key表示須要被排序的key Func
  • 返回:

    • 返回的結果是一個List
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) # [1, 2, 3, 4, 5, 6]
    sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) # [10, 9, 7, 6, 5, 4]

takeSample(withReplacement, num, seed)

返回RDD數據的一個子集

  • 入參:

    • withReplacement表示是否試放回抽樣 Boolean
    • num表示須要返回元素的數量
    • seed隨機種子
  • 返回:

    • 返回的結果是一個RDD
rdd = sc.parallelize(range(0, 10))
    len(rdd.takeSample(True, 20, 1)) # 20

zip(rdd)

參考python的內置方法zip

  • 入參:

    • rdd表示一個rdd對象,能夠存儲不一樣數據類型,但數量須要相同 RDD
  • 返回:

    • 返回的結果是一個RDD
x = sc.parallelize(range(0,5))
    y = sc.parallelize(range(1000, 1005))
    x.zip(y).collect() # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

zipWithIndex()

與元素自己的index進行zip操做

  • 返回:
    • 返回的結果是一個RDD
rdd = sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
    print(rdd.collect()) #[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
相關文章
相關標籤/搜索