Spark RDD算子、分區與Shuffle

RDD

Spark的主要抽象是分佈式的元素集合(distributed collection of items),稱爲RDD(Resilient Distributed Dataset,彈性分佈式數據集)html

它可被分發到集羣各個節點上,進行並行操做。RDD能夠經過Hadoop InputFormats建立,如 HDFS,或者從其餘RDD轉化而來。java

RDD的操做分爲兩種,一種是轉化(transformation)操做,一種是執行(action)操做,相似於SQL中的聚合函數。算法

Spark Transformation

Transformation數據庫

Spark Action

Actionapache

RDD建立

RDD的建立2種方式:api

  1. 從已經存在的集合建立RDD
  2. 從外部存儲系統建立RDD(文件系統、hdfs、hbase、MySQL等)

從集合中建立RDD

主要使用parallelize和makeRDD函數網絡

sc.parallelize(List(1,2,3))
sc.makeRDD(List(1,2,3))
sc.parallelize(Array(1 to 10))

從外部存儲建立RDD

sc.textFile("hdfs://127.0.0.1:9000/tmp/in.txt")
sc.textFile("file://G:/tmp/in.txt")

分區(partition)-重要

前面的內容算是一個簡單的複習,這裏咱們介紹Spark中另外一個很是重要的的知識,partition,能夠翻譯爲分區或者分片,在不少涉及分佈式的應用中都有這個概念。分佈式

本質就是將大的數據拆分爲小的數據集合,好比咱們從hdfs目錄建立一個RDD,這個目錄中的數據可能有幾百個G,顯然放在一個一塊兒對於處理是不利的。ide

算法中有一個重要的思想就是分而治之,Spark的partition就利用了這種思想,將一個RDD拆分到不一樣的partition,這樣咱們的算法就能夠利用多機器、多核並行來處理這些數據了。函數

下文中sc沒有特殊說明,都是:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("create JavaSparkContext");
sparkConf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

partition方式

HashPartitioner

sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3)
.partitionBy(new HashPartitioner(3))

HashPartitioner肯定分區的方式:partition = key.hashCode () % numPartitions

RangePartitioner

sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3)
.partitionBy(new RangePartitioner(3,counts))

RangePartitioner會對key值進行排序,而後將key值被劃分紅3份key值集合。

自定義

繼承org.apache.spark.Partitioner,重寫getPartition方法。

sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))

默認分區

sc.defaultParallelism sc.defaultMinPartitions

sc.defaultParallelism的值能夠經過spark-defaults.conf配置文件配置:

spark.default.parallelism=20

還會受代碼設置影響:

sparkConf.setMaster("local[*]");

local[*]和local,分區數就是運行機器cpu的核數,local[n],分區數就是n的值

還和提交時下面兩個參數相關: --num-executors --executor-cores

從HDFS讀入文件的分區數默認等於HDFS文件的blocks,例如,咱們有一個目錄的數據量是100G, HDFS默認的塊容量大小128MB,那麼hdfs的塊數爲: 100G/128M = 800 那麼Spark讀取SparkContext.textFile()讀取該目錄,默認分區數爲800。

合理設置分區數

分區數太多意味着任務數太多,每次調度任務也是很耗時的,因此分區數太多會致使整體耗時增多。

分區數太少的話,會致使一些結點沒有分配到任務;另外一方面,分區數少則每一個分區要處理的數據量就會增大,從而對每一個結點的內存要求就會提升;還有分區數不合理,會致使數據傾斜問題。

根據任務是IO型仍是CPU型,分區數設置爲executor-cores * num-executor的2到3倍。

shuffle

首先咱們來梳理一下Spark的執行,咱們使用Java編寫Spark應用,會有一個包含main方法的Class,這個通常被稱做是Driver program。

咱們經過集羣運行Spark應用,會把應用和應用依賴的jar包拷貝到全部的節點,固然爲了不拷貝能夠指定一個網絡磁盤來存放依賴jar包。

Spark本質是處理數據,當Spark讀取數據的時候,就會把這些數據拆分爲n個partition,而後封裝爲不一樣的Task發給不一樣的機器去執行。

對於不少操做是有效的,例如map、foreach等,可是對於另外一些操做就須要額外的處理,例如reduceByKey,由於相同的key可能分佈在不一樣的partition中,甚至不一樣的機器節點上,因此必須讀取全部partition中的數據到一個節點,而後執行誇partition的計算來獲取最終結果,這個過程就稱之爲shuffle。

可以形成shuffle的操做:

repartition、coalesce、repartitionAndSortWithinPartitions、groupByKey、reduceByKey、sortByKey、aggregateByKey、groupByKey、cogroup、join

實例

bykey

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class ByKeyAccumulatorTest implements Serializable {

    private transient JavaSparkContext sc;

    @Before
    public void setUp(){
        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
        sc = new JavaSparkContext(conf);
    }

    @Test
    public void reduceByKey(){
        List<Tuple2<String, Integer>> datas = Arrays.asList(
                new Tuple2<>("A", 10),
                new Tuple2<>("B", 20),
                new Tuple2<>("A", 30),
                new Tuple2<>("A", 60),
                new Tuple2<>("C", 30),
                new Tuple2<>("B", 40)
        );
        JavaRDD<Tuple2<String, Integer>> RDDOne = sc.parallelize(datas);

        JavaPairRDD<String, Integer> pairRDDOne = JavaPairRDD.fromJavaRDD(RDDOne);
        JavaPairRDD<String, Integer> rdd = pairRDDOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                System.out.println("v1:" + v1);
                System.out.println("v2:" + v2);
                return v1 + v2;
            }
        });
        Map<String, Integer> stringIntegerMap = rdd.collectAsMap();
        System.out.println(stringIntegerMap);
        rdd.saveAsTextFile("G:\\tmp\\spark");
    }

    @Test
    public void aggregateByKey() {
        List<Tuple2<String, Integer>> datas = Arrays.asList(
                new Tuple2<>("A", 10),
                new Tuple2<>("B", 20),
                new Tuple2<>("A", 30),
                new Tuple2<>("A", 60),
                new Tuple2<>("C", 30),
                new Tuple2<>("B", 40)
        );
        JavaRDD<Tuple2<String, Integer>> RDDOne = sc.parallelize(datas);

        JavaPairRDD<String, Integer> pairRDDOne = JavaPairRDD.fromJavaRDD(RDDOne);

        JavaPairRDD<String, Integer> rdd = pairRDDOne.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 > v2 ? v1 : v2;
            }
        }, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        Map<String, Integer> stringIntegerMap = rdd.collectAsMap();
        System.out.println(stringIntegerMap);
    }

    @Test
    public void countByKey(){
        List<Tuple2<String, Integer>> datas = Arrays.asList(
                new Tuple2<>("A", 10),
                new Tuple2<>("B", 20),
                new Tuple2<>("A", 30),
                new Tuple2<>("A", 60),
                new Tuple2<>("C", 30),
                new Tuple2<>("B", 40)
        );
        JavaRDD<Tuple2<String, Integer>> RDDOne = sc.parallelize(datas);

        JavaPairRDD<String, Integer> pairRDDOne = JavaPairRDD.fromJavaRDD(RDDOne);
        Map<String, Long> stringLongMap = pairRDDOne.countByKey();
        System.out.println(stringLongMap);
    }
}

foreach\filter

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.junit.Before;
import org.junit.Test;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

public class ForeachAccumulatorTest implements Serializable {

    private transient JavaSparkContext sc;

    private transient List<String> datas;

    @Before
    public void setUp(){
        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
        sc = new JavaSparkContext(conf);
        datas = Arrays.asList("hello", "spark", "hadoop", "瑞雪兆豐年","Java");
    }

    @Test
    public void foreach(){
        JavaRDD<String> dataRDD = sc.parallelize(datas);
        dataRDD.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s.length());
            }
        });
    }

    @Test
    public void filter(){
        JavaRDD<String> dataRDD = sc.parallelize(datas);
        JavaRDD<String> resultRDD = dataRDD.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {
                return v1.contains("h");
            }
        });
        List<String> collect = resultRDD.collect();
        collect.forEach(System.out::println);
    }
}

map

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFlatMapFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

public class MapAccumulatorTest implements Serializable {

    private transient JavaSparkContext sc;

    private transient List<String> datas;

    @Before
    public void setUp(){
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("create JavaSparkContext");
        sparkConf.setMaster("local[*]");
        sc = new JavaSparkContext(sparkConf);
        //datas = sc.textFile("file:///G:/tmp/in.txt");
        datas = Arrays.asList("hello", "spark", "hadoop", "瑞雪兆豐年","Java");
    }

    @Test
    public void map(){
        JavaRDD<String> toDealRDD = sc.parallelize(datas, 3);
        JavaRDD<Integer> map = toDealRDD.map(s -> s.length());
        //收集因此數據到驅動節點
        List<Integer> results = map.collect();
        results.forEach(System.out::println);
    }

    @Test
    public void mapPartitions(){
        JavaRDD<String> toDealRDD = sc.parallelize(datas, 3);
        toDealRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
            @Override
            public Iterator<Integer> call(Iterator<String> stringIterator) throws Exception {
                LinkedList<Integer> lengths = new LinkedList<>();
                //處理外部資源等公共耗時操做
                while (stringIterator.hasNext()) {
                    String content = stringIterator.next();
                    lengths.add(content.length());
                }
                //返回的是iterator
                return lengths.iterator();
            }
        });
    }

    @Test
    public void mapPartitionsToParis(){
        JavaRDD<String> toDealRDD = sc.parallelize(datas, 3);
        toDealRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(Iterator<String> stringIterator) throws Exception {
                List<Tuple2<String,Integer>> result = new LinkedList<>();
                //處理外部資源等公共耗時操做
                while (stringIterator.hasNext()) {
                    String content = stringIterator.next();
                    Tuple2<String, Integer> tuple = new Tuple2<>(content, content.length());
                    result.add(tuple);
                }
                return result.iterator();
            }
        });
    }

    @Test
    public void mapPartitionsToDouble(){
        JavaRDD<String> toDealRDD = sc.parallelize(datas, 3);
        toDealRDD.mapPartitionsToDouble(new DoubleFlatMapFunction<Iterator<String>>() {
            @Override
            public Iterator<Double> call(Iterator<String> stringIterator) throws Exception {
                LinkedList<Double> result = new LinkedList<>();
                while (stringIterator.hasNext()) {
                    String content = stringIterator.next();
                    result.add(Double.valueOf(content.length()));
                }
                return result.iterator();
            }
        });
    }
    
    @Test
    public void flatMap(){
        List<String> listOne = Arrays.asList("hello", "spark", "hadoop", "瑞雪兆豐年", "Java");
        List<String> listTwo = Arrays.asList("語言", "C", "C++", "Java", "Python");
        List<List<String>> datas = Arrays.asList(listOne, listTwo);
//        JavaRDD<List<String>> toDealRDD = sc.parallelize(datas, 3);
        JavaRDD<List<String>> toDealRDD = sc.parallelize(datas);
        JavaRDD<Integer> mapResult = toDealRDD.map(new Function<List<String>, Integer>() {
            @Override
            public Integer call(List<String> content) throws Exception {
                return content.size();
            }
        });

        List<Integer> mapList = mapResult.collect();
        mapList.forEach(System.out::println);
        System.out.println("------------------");
        JavaRDD<Integer> flatMapRDD = toDealRDD.flatMap(new FlatMapFunction<List<String>, Integer>() {
            @Override
            public Iterator<Integer> call(List<String> strings) throws Exception {
                LinkedList<Integer> result = new LinkedList<>();
                strings.forEach(content -> result.add(content.length()));
                return result.iterator();
            }
        });
        List<Integer> flatMapResult = flatMapRDD.collect();
        flatMapResult.forEach(System.out::println);
    }

}

save

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SaveAccumulatorTest implements Serializable {

    private transient JavaSparkContext sc;

    private transient List<Tuple2<String, Integer>> datas;

    @Before
    public void setUp(){
        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
        sc = new JavaSparkContext(conf);
        datas = new ArrayList<>();
        datas.add(new Tuple2<>("A", 10));
        datas.add(new Tuple2<>("B", 1));
        datas.add(new Tuple2<>("A", 6));
        datas.add(new Tuple2<>("C", 5));
        datas.add(new Tuple2<>("B", 3));
    }

    @Test
    public void saveAsTextFile(){
        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(datas);
        String path = "G:\\tmp\\spark\\saveAsTextFile\\" + System.currentTimeMillis();
        dataRDD.saveAsTextFile(path);
    }

    @Test
    public void saveAsSequenceFile(){
        List<Tuple2<String,Integer>> datas = new ArrayList<>();
        datas.add(new Tuple2<>("A", 10));
        datas.add(new Tuple2<>("B", 1));
        datas.add(new Tuple2<>("A", 6));
        datas.add(new Tuple2<>("C", 5));
        datas.add(new Tuple2<>("B", 3));
        JavaPairRDD<String, Integer> dataRDD = sc.parallelizePairs(datas);
        String path = "G:\\tmp\\spark\\saveAsHadoopFile\\" + System.currentTimeMillis();
        dataRDD.saveAsHadoopFile(path, String.class, IntWritable.class, TextOutputFormat.class);
    }

    @Test
    public void saveAsObjectFile(){
        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(datas);
        String path = "G:\\tmp\\spark\\saveAsObjectFile\\" + System.currentTimeMillis();
        dataRDD.saveAsObjectFile(path);
    }
}

map、filter、foreach

map、filter、foreach都會遍歷元素,可是他們仍是有一些差異的。

map是把dataset中的元素映射爲另外一個元素,甚至是另外一種類型。

filter接收的函數返回值是boolean類型,主要是過濾元素用,減小dataset中的數據量。

foreach沒有返回值。

coalesce、repartition、repartitionAndSortWithinPartitions

coalesce、repartition、repartitionAndSortWithinPartitions這三個算子都會從新分區,可是仍是有明顯的區別。

coalesce主要是用於減小分區數量,在filter算子以後數據量大量減小以後,減小分區數量能夠提升效率。

repartition能夠用於增長或者減小分區數量,老是會shuffle全部的數據

repartitionAndSortWithinPartitions,若是分區以後要排序,那麼使用repartitionAndSortWithinPartitions,而不是先使用repartition,而後使用sortBy。

有partition的算子與無partition算子的區別

  1. 無partition的算子,是對RDD的每個元素都調用一次處理函數
  2. 有partition的算子,是對RDD中的每個partition調用一次處理函數

有什麼區別呢?

假如處理函數內部存在數據庫連接、文件等的建立及關閉,無partition的算子會致使處理每一個元素時建立一次連接或者句柄,致使性能問題。

因此在處理函數中須要訪問外部資源的時候,咱們應該儘可能選擇有partition的算子。這樣函數傳入的參數是整個分區數據的迭代器,能夠一次處理一個partition,能夠減小外部資源處理時間。

Spark RDD

相關文章
相關標籤/搜索