Spark入門(五)--Spark的reduce和reduceByKey

reduce和reduceByKey的區別

reduce和reduceByKey是spark中使用地很是頻繁的,在字數統計中,能夠看到reduceByKey的經典使用。那麼reduce和reduceBykey的區別在哪呢?reduce處理數據時有着一對一的特性,而reduceByKey則有着多對一的特性。好比reduce中會把數據集合中每個元素都處理一次,而且每個元素都對應着一個輸出。而reduceByKey則不一樣,它會把全部key相同的值處理而且進行歸併,其中歸併的方法能夠本身定義。java

例子

在單詞統計中,咱們採用的就是reduceByKey,對於每個單詞咱們設置成一個鍵值對(key,value),咱們把單詞做爲key,即key=word,而value=1,由於遍歷過程當中,每一個單詞的出現一次,則標註1。那麼reduceByKey則會把key相同的進行歸併,而後根據咱們定義的歸併方法即對value進行累加處理,最後獲得每一個單詞出現的次數。而reduce則沒有相同Key歸併的操做,而是將全部值統一歸併,一併處理。python

spark的reduce

咱們採用scala來求得一個數據集中全部數值的平均值。該數據集包含5000個數值,數據集以及下列的代碼都可從github下載,數據集名稱爲"avg"。爲求得這個數據集中的平均值,咱們先用map對文本數據進行處理,將其轉換成long類型。git

數據集內容:github

reduce求平均值scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkReduce {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkReduce")

    val sc = new SparkContext(conf)

    //將String轉成Long類型
    val numData = sc.textFile("./avg").map(num => num.toLong)

    //reduce處理每一個值
    println(numData.reduce((x,y)=>{
      println("x:"+x)
      println("y:"+y)
      x+y
    })/numData.count())

  }

}
複製代碼

reduce求平均值Java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

public class SparkReduceJava {

    public static void main(String[] main){

        SparkConf conf = new SparkConf().setAppName("SparkReduceJava").setMaster("local");


        JavaSparkContext sc = new JavaSparkContext(conf);

        reduceJava(sc);

        reduceJava8(sc);
    }


    public static void reduceJava(JavaSparkContext sc){
        JavaRDD<Long>textData = sc.textFile("./avg").map(new Function<String, Long>() {
            @Override
            public Long call(String s) throws Exception {
                return Long.parseLong(s);
            };
        });

        System.out.println(
                textData.reduce(new Function2<Long, Long, Long>() {
                    @Override
                    public Long call(Long aLong, Long aLong2) throws Exception {
                        System.out.println("x:"+aLong);
                        System.out.println("y:"+aLong2);
                        return aLong+aLong2;
                    }
                })/textData.count()
        );
    }

    public static void reduceJava8(JavaSparkContext sc){
        JavaRDD<Long>textData = sc.textFile("./avg").map(s->Long.parseLong(s));
        System.out.println(textData.reduce((x,y)->x+y)/textData.count());
    }

}

複製代碼

reduce求平均值python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkReduce")

sc = SparkContext(conf=conf)


numData = sc.textFile("./avg").map(lambda s:int(s))



print(numData.reduce(lambda x,y:x+y)/numData.count())
複製代碼

運行結果

觀察運行結果,咱們不難發現,x存放的是累加後的值,y是當前值,x初始爲0。事實上,x正是存放上次處理的結果,而y則是本次的數值。不斷作x+y就而且放回累加後的結果做爲下一次x的值。這樣就能夠得 到數值總和。最後將總和除以總數就可以獲得平均值。apache

scala或java運行結果

平均值只保留了整數api

x:222783
y:48364
x:271147
y:204950
x:476097
y:261777
x:737874
y:166827
x:904701
y:154005
x:1058706
y:150029
x:1208735
y:140158
x:1348893
y:404846
x:1753739
y:542750
...
...
平均值是:334521
複製代碼

python運行結果

python默認保留了小數bash

334521.2714
複製代碼

spark的reduceByKey

spark的reduceByKey對要處理的值進行了差異對待,只有key相同的才能進行reduceByKey,則也就要求了進行reduceByKey時,輸入的數據必須知足有鍵有值。因爲上述的avg咱們是用隨機數生成的,那麼咱們能夠用reduceByKey完成一個其餘功能,即統計隨機數中末尾是0-9各個數值出現的個數。ide

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkReduceByKey {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkReduce")

    val sc = new SparkContext(conf)

    //將String轉成Long類型
    val numData = sc.textFile("./avg").map(num => (num.toLong%10,1))

    numData.reduceByKey((x,y)=>x+y).foreach(println(_))
  }

}

複製代碼

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkReduceByKeyJava {

    public static void main(String[] main){

        SparkConf conf = new SparkConf().setAppName("SparkReduceJava").setMaster("local");


        JavaSparkContext sc = new JavaSparkContext(conf);

        reduceByKeyJava(sc);

        reduceByKeyJava8(sc);

    }


    public static void reduceByKeyJava(JavaSparkContext sc){

        JavaPairRDD<Integer,Integer> numData = sc.textFile("./avg").mapToPair(new PairFunction<String, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(String s) throws Exception {
                return new Tuple2<Integer, Integer>(Integer.parseInt(s)%10,1);
            }
        });


        System.out.println(numData.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        }).collectAsMap());

    }

    public static void reduceByKeyJava8(JavaSparkContext sc){
        JavaPairRDD<Integer,Integer> numData = sc.textFile("./avg").mapToPair(s->new Tuple2<>(Integer.parseInt(s)%10,1));

        System.out.println(numData.reduceByKey((x,y)->x+y).collectAsMap());
    }

}

複製代碼

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkReduce")

sc = SparkContext(conf=conf)


print(sc.textFile("./avg").map(lambda s:(int(s)%10,1)).reduceByKey(lambda x,y:x+y).collectAsMap())

複製代碼

運行結果

scala運行結果

(4,522)
(0,462)
(1,495)
(6,519)
(3,463)
(7,544)
(9,518)
(8,533)
(5,483)
(2,461)
複製代碼

java運行結果

{8=533, 2=461, 5=483, 4=522, 7=544, 1=495, 9=518, 3=463, 6=519, 0=462}
複製代碼

python運行結果

{3: 463, 4: 522, 0: 462, 7: 544, 5: 483, 9: 518, 8: 533, 6: 519, 2: 461, 1: 495}
複製代碼

咱們注意到三個程序輸出的順序不同,可是本質的結果都是一致的。這裏體現了spark的一個優勢,因爲是在單機本地上,該優勢表現出來的是相同輸入輸出結果順序不一樣。可是在集羣中,該優勢表現出來的是在集羣中各自處理,然後返回結果。當數量足夠大的時候,這個優勢就更加明顯。post

對結果進行排序

那麼爲了可以使得輸出結果順序一致,咱們能夠對數據進行排序後輸出,那麼這裏就涉及到了sortByKey。ui

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkReduceByKey {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkReduce")

    val sc = new SparkContext(conf)

    //將String轉成Long類型
    val numData = sc.textFile("./avg").map(num => (num.toLong%10,1))

    
    //根據key排序後輸出
    numData.reduceByKey((x,y)=>x+y).sortByKey().foreach(println(_))
  }

}

複製代碼

java實現

特別注意這裏用的是collect,而不是collectMap,由於java中轉換成Map會打亂順序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkReduceByKeyJava {

    public static void main(String[] main){

        SparkConf conf = new SparkConf().setAppName("SparkReduceJava").setMaster("local");


        JavaSparkContext sc = new JavaSparkContext(conf);

        reduceByKeyJava(sc);

        reduceByKeyJava8(sc);

    }


    public static void reduceByKeyJava(JavaSparkContext sc){

        JavaPairRDD<Integer,Integer> numData = sc.textFile("./avg").mapToPair(new PairFunction<String, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(String s) throws Exception {
                return new Tuple2<Integer, Integer>(Integer.parseInt(s)%10,1);
            }
        });


        System.out.println(numData.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        }).sortByKey().collect());

    }

    public static void reduceByKeyJava8(JavaSparkContext sc){
        JavaPairRDD<Integer,Integer> numData = sc.textFile("./avg").mapToPair(s->new Tuple2<>(Integer.parseInt(s)%10,1));

        System.out.println(numData.reduceByKey((x,y)->x+y).sortByKey().collect());
    }

}

複製代碼

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkReduce")

sc = SparkContext(conf=conf)


print(sc.textFile("./avg").map(lambda s:(int(s)%10,1)).reduceByKey(lambda x,y:x+y).sortByKey().collectAsMap())

複製代碼

獲得結果,這裏只給出scala輸出的結果,其餘輸出的結果一致,只是表現形式不一樣

(0,462)
(1,495)
(2,461)
(3,463)
(4,522)
(5,483)
(6,519)
(7,544)
(8,533)
(9,518)
複製代碼

數據集以及代碼均可以在github上下載。

相關文章
相關標籤/搜索