Spark入門(六)--Spark的combineByKey、sortBykey

spark的combineByKey

combineByKey的特色

combineByKey的強大之處,在於提供了三個函數操做來操做一個函數。第一個函數,是對元數據處理,從而得到一個鍵值對。第二個函數,是對鍵值鍵值對進行一對一的操做,即一個鍵值對對應一個輸出,且這裏是根據key進行整合。第三個函數是對key相同的鍵值對進行操做,有點像reduceByKey,但真正實現又有着很大的不一樣。java

Spark入門(五)--Spark的reduce和reduceByKey中,咱們用reduce進行求平均值。用combineByKey咱們則能夠求比平均值更爲豐富的事情。如今有一個數據集,每一行數據包括一個a-z字母和一個整數,其中字母和整數之間以空格分隔。如今要求得每一個字母的平均數。這個場景有點像多個學生,每一個學生多門成績,求得學生的平均分。但這裏將問題簡化,其中數據集放在grades中。數據集以及下面的代碼均可以在github上下載。python

combineByKey求多個平均值

scala實現

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.{SparkConf, SparkContext}

object SparkCombineByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")

    val sc = new SparkContext(conf)

    sc.textFile("./grades").map(line=>{
      val splits = line.split(" ")
      (splits(0),splits(1).toInt)
    }).combineByKey(
      value => (value,1),
      (x:(Int,Int),y)=>(x._1+y,x._2+1),
      (x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
    ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)

  }

}
複製代碼

scala運行結果

(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)

複製代碼

java實現:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.sources.In;
import scala.Tuple2;

public class SparkCombineByKeyJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        combineByKeyJava(sc);

        combineByKeyJava8(sc);


    }


    public static void combineByKeyJava(JavaSparkContext sc){

        JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] splits = s.split(" ");
                return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
            }
        });

        splitData.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
                return new Tuple2<>(integer, 1);
            }
        }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Integer integer) throws Exception {
                return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1);
            }
        }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
                return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2);
            }
        }).map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Tuple2<String,Double>>() {
            @Override
            public Tuple2<String,Double> call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
                return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2);
            }
        }).foreach(new VoidFunction<Tuple2<String, Double>>() {
            @Override
            public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
                System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
            }
        });

    }

    public static void combineByKeyJava8(JavaSparkContext sc){

        JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(line -> {
            String[] splits = line.split(" ");
            return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
        });

        splitData.combineByKey(
                x->new Tuple2<>(x,1),
                (x,y)->new Tuple2<>(x._1+y,x._2+1),
                (x,y)->new Tuple2<>(x._1+y._1,x._2+y._2)
        ).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2));

    }
}

複製代碼

java運行結果

d 338451.6
e 335306.7480769231
a 336184.95321637427
i 346279.497029703
b 333069.8589473684
h 334343.75
f 341380.94444444444
j 320145.7618069815
g 334042.37605042016
c 325022.4183673469
複製代碼

分析

在開始python以前,咱們先觀察java和scala兩個程序。咱們發現java7的代碼很是冗餘,而java8和scala則相比起來很是乾淨利落。固然,咱們難說好壞,可是這也表現出當代語言開始從繁就簡的一個轉變。到了python這一特色就體現的更加淋漓盡致。git

但咱們不光說語言,咱們分析這個求平均的實現方式,因爲java中對數值作了一個處理,所以有保留小數,而scala則沒有,但至少能夠判斷二者的結果是一致的。固然,這不是重點,重點是,這個combinByKey很是複雜,有三個函數。咱們很難觀察到每一個過程作了什麼。所以咱們在這裏,對scala程序進行進一步的輸出,從而觀察combineByKey到底作了什麼。github

scala修改

import org.apache.spark.{SparkConf, SparkContext}

object SparkCombineByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")

    val sc = new SparkContext(conf)

    sc.textFile("./grades").map(line=>{
      val splits = line.split(" ")
      (splits(0),splits(1).toInt)
    }).combineByKey(
      value => {
        println("這是第一個函數")
        println("將全部的值遍歷,並放在元組中,標記1")
        println(value)
        (value,1)
      },
      (x:(Int,Int),y)=>{
        println("這是第二個函數")
        println("將x中的第一個值進行累加求和,第二個值加一,求得元素總個數")
        println("x:"+x.toString())
        println("y:"+y)
        (x._1+y,x._2+1)
      },
      (x:(Int,Int),y:(Int,Int))=>{
        (x._1+y._1,x._2+y._2)
      }
    ).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)

  }

}

複製代碼

獲得結果

這是第一個函數
將全部的值遍歷,並放在元組中,標記1
222783
這是第一個函數
將全部的值遍歷,並放在元組中,標記1
48364
這是第一個函數
將全部的值遍歷,並放在元組中,標記1
204950
這是第一個函數
將全部的值遍歷,並放在元組中,標記1
261777
...
...
...
這是第二個函數
將x中的第一個值進行累加求和,第二個值加一,求得元素總個數
x:(554875,2)
y:357748
這是第二個函數
將x中的第一個值進行累加求和,第二個值加一,求得元素總個數
x:(912623,3)
y:202407
這是第一個函數
將全部的值遍歷,並放在元組中,標記1
48608
這是第二個函數
將x中的第一個值進行累加求和,第二個值加一,求得元素總個數
x:(1115030,4)
y:69003
這是第一個函數
將全部的值遍歷,並放在元組中,標記1
476893

...
...
...

(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)
複製代碼

這裏咱們發現了,函數的順序並不先所有執行完第一個函數,再執行第二個函數。而是分區並行,即第一個分區執行完第一個函數,並不等待其餘分區執行完第一個函數,而是緊接着執行第二個函數,最後在第三個函數進行處理。在本地單機下,該並行特色並不能充分發揮,但在集羣環境中,各個分區在不一樣節點計算,而後處理完結果彙總處理。這樣,當數據量十分龐大時,集羣節點數越多,該優點就表現地越明顯。sql

此外還有一個很是值得關注的特色,當咱們把foreach(println)這句話去掉時apache

foreach(println)
複製代碼

咱們運行程序,發現程序沒有任何輸出。這是因爲spark的懶加載特色,spark只用在對數據執行具體操做時,如輸出、保存等纔會執行計算。這看起來有點不合理,但實際上這樣作在不少場景下能大幅度提高效率,但若是沒有處理好,可能會致使spark每次執行操做都會從頭開始計算該過程。所以當一個操做結果須要被頻繁或者屢次調用的時候,咱們應該將結果存下來。api

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

sc.textFile("./grades")\
    .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
    .combineByKey(
    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)
複製代碼

獲得結果

('b', 333069.8589473684)
('f', 341380.94444444444)
('j', 320145.7618069815)
('h', 334343.75)
('a', 336184.95321637427)
('g', 334042.37605042016)
('d', 338451.6)
('e', 335306.7480769231)
('c', 325022.4183673469)
複製代碼

spark的sortByKey

sortByKey進行排序

sortByKey很是簡單,也很是經常使用。這裏依然採用上述文本,將處理後的結果,進行排序,獲得平均值最大的字母。在實際運用中咱們這裏能夠當作求得按照成績排序,或者按照姓名排序。bash

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkSortByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")

    val sc = new SparkContext(conf)


    val result = sc.textFile("./grades").map(line=>{
      val splits = line.split(" ")
      (splits(0),splits(1).toInt)
    }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
    ).map(x=>(x._1,x._2._1/x._2._2))

    //按照名字排序,順序
    result.sortByKey(true).foreach(println)

    //按照名字排序,倒序
    result.sortByKey(false).foreach(println)


    val result1 = sc.textFile("./grades").map(line=>{
      val splits = line.split(" ")
      (splits(0),splits(1).toInt)
    }).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
    ).map(x=>(x._2._1/x._2._2,x._1))

    //按照成績排序,順序
    result1.sortByKey(true).foreach(println)

    //按照成績排序,倒序
    result1.sortByKey(false).foreach(println)


  }

}
複製代碼

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

result = sc.textFile("./grades")\
    .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
    .combineByKey(
    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1]))

result.sortByKey(True).foreach(print)

result.sortByKey(False).foreach(print)


result1 = sc.textFile("./grades")\
    .map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
    .combineByKey(
    lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[1][0]/x[1][1],x[0]))

result1.sortByKey(True).foreach(print)

result1.sortByKey(False).foreach(print)
複製代碼

獲得結果

(a,336184)
(b,333069)
(c,325022)
(d,338451)
(e,335306)
(f,341380)
(g,334042)
(h,334343)
(i,346279)
(j,320145)



(j,320145)
(i,346279)
(h,334343)
(g,334042)
(f,341380)
(e,335306)
(d,338451)
(c,325022)
(b,333069)
(a,336184)


(320145,j)
(325022,c)
(333069,b)
(334042,g)
(334343,h)
(335306,e)
(336184,a)
(338451,d)
(341380,f)
(346279,i)


(346279,i)
(341380,f)
(338451,d)
(336184,a)
(335306,e)
(334343,h)
(334042,g)
(333069,b)
(325022,c)
(320145,j)
複製代碼

數據集以及代碼均可以在github上下載。ide

相關文章
相關標籤/搜索