Spark學習之路 (三)Spark之RDD

1、RDD的概述

1.1 什麼是RDD?

RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。java

1.2 RDD的屬性

(1)一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。數據庫

(2)一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。apache

(3)RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。編程

(4)一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。api

(5)一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。數組

1.3 WordCount粗圖解RDD

其中hello.txt緩存

2、RDD的建立方式

2.1 經過讀取文件生成的

由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等app

scala> val file = sc.textFile("/spark/hello.txt")

2.2 經過並行化的方式建立RDD

由一個已經存在的Scala集合建立。maven

scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26

scala> 

2.3 其餘方式

讀取數據庫等等其餘的操做。也能夠生成RDD。分佈式

RDD能夠經過其餘的RDD轉換而來的。

3、RDD編程API

Spark支持兩個類型(算子)操做:Transformation和Action

3.1 Transformation

主要作的是就是將一個已有的RDD生成另一個RDD。Transformation具備lazy特性(延遲加載)。Transformation算子的代碼不會真正被執行。只有當咱們的程序裏面遇到一個action算子的時候,代碼纔會真正的被執行。這種設計讓Spark更加有效率地運行。

經常使用的Transformation

轉換

含義

map(func)

返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成

filter(func)

返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成

flatMap(func)

相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)

mapPartitions(func)

相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子

union(otherDataset)

對源RDD和參數RDD求並集後返回一個新的RDD

intersection(otherDataset)

對源RDD和參數RDD求交集後返回一個新的RDD

distinct([numTasks]))

對源RDD進行去重後返回一個新的RDD

groupByKey([numTasks])

在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

先按分區聚合 再總的聚合   每次要跟初始值交流 例如:aggregateByKey(0)(_+_,_+_) 對k/y的RDD進行操做

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

與sortByKey相似,可是更靈活 第一個參數是根據什麼排序  第二個是怎麼排序 false倒序   第三個排序後分區數  默認與原RDD同樣

join(otherDataset, [numTasks])

在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD  至關於內鏈接(求交集)

cogroup(otherDataset, [numTasks])

在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

cartesian(otherDataset)

兩個RDD的笛卡爾積  的成不少個K/V

pipe(command, [envVars])

調用外部程序

coalesce(numPartitions)   

從新分區 第一個參數是要分多少區,第二個參數是否shuffle 默認false  少分區變多分區 true   多分區變少分區 false

repartition(numPartitions)

從新分區 必須shuffle  參數是要分多少區  少變多

repartitionAndSortWithinPartitions(partitioner)

從新分區+排序  比先分區再排序效率高  對K/V的RDD進行操做

foldByKey(zeroValue)(seqOp)

該函數用於K/V作摺疊,合併處理 ,與aggregate相似   第一個括號的參數應用於每一個V值  第二括號函數是聚合例如:_+_

combineByKey

合併相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)

partitionBy(partitioner)

對RDD進行分區  partitioner是分區器 例如new HashPartition(2

cache

RDD緩存,能夠避免重複計算從而減小時間,區別:cache內部調用了persist算子,cache默認就一個緩存級別MEMORY-ONLY ,而persist則能夠選擇緩存級別

persist

 

 

Subtract(rdd)

返回前rdd元素不在後rdd的rdd

leftOuterJoin

leftOuterJoin相似於SQL中的左外關聯left outer join,返回結果之前面的RDD爲主,關聯不上的記錄爲空。只能用於兩個RDD之間的關聯,若是要多個RDD關聯,多關聯幾回便可。

rightOuterJoin

rightOuterJoin相似於SQL中的有外關聯right outer join,返回結果以參數中的RDD爲主,關聯不上的記錄爲空。只能用於兩個RDD之間的關聯,若是要多個RDD關聯,多關聯幾回便可

subtractByKey

substractByKey和基本轉換操做中的subtract相似只不過這裏是針對K的,返回在主RDD中出現,而且不在otherRDD中出現的元素

3.2 Action

觸發代碼的運行,咱們一段spark代碼裏面至少須要有一個action操做。

經常使用的Action:

動做

含義

reduce(func)

經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的

collect()

在驅動程序中,以數組的形式返回數據集的全部元素

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(相似於take(1))

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本

saveAsSequenceFile(path

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。

saveAsObjectFile(path

 

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。

foreach(func)

在數據集的每個元素上,運行函數func進行更新。

aggregate

先對分區進行操做,在整體操做

reduceByKeyLocally

 

lookup

 

top

 

fold

 

foreachPartition

 

 

 

3.3 Spark WordCount代碼編寫

使用maven進行項目構建

 (1)使用scala進行編寫

查看官方網站,須要導入2個依賴包

詳細代碼

SparkWordCountWithScala.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCountWithScala {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    /**
      * 若是這個參數不設置,默認認爲你運行的是集羣模式
      * 若是設置成local表明運行的是local模式
      */
    conf.setMaster("local")
    //設置任務名
    conf.setAppName("WordCount")
    //建立SparkCore的程序入口
    val sc = new SparkContext(conf)
    //讀取文件 生成RDD
    val file: RDD[String] = sc.textFile("E:\\hello.txt")
    //把每一行數據按照,分割
    val word: RDD[String] = file.flatMap(_.split(","))
    //讓每個單詞都出現一次
    val wordOne: RDD[(String, Int)] = word.map((_,1))
    //單詞計數
    val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
    //按照單詞出現的次數 降序排序
    val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false)
    //將最終的結果進行保存
    sortRdd.saveAsTextFile("E:\\result")

    sc.stop()
  }

運行結果

(2)使用java jdk7進行編寫

SparkWordCountWithJava7.java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class SparkWordCountWithJava7 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("WordCount");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> fileRdd = sc.textFile("E:\\hello.txt");

        JavaRDD<String> wordRDD = fileRdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(",")).iterator();
            }
        });

        JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                return new Tuple2<>(tuple._2, tuple._1);
            }
        });

        JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false);

        JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
                return new Tuple2<>(tuple._2, tuple._1);
            }
        });

        resultRDD.saveAsTextFile("E:\\result7");

    }
}

 (3)使用java jdk8進行編寫

lambda表達式

SparkWordCountWithJava8.java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class SparkWordCountWithJava8 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("WortCount");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("E:\\hello.txt");
        JavaRDD<String> wordRdd = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
        JavaPairRDD<String, Integer> wordOneRDD = wordRdd.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((x, y) -> x + y);
        JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false);
        JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        resultRDD.saveAsTextFile("E:\\result8");

    }

3.4 WordCount執行過程圖

 

 

4、RDD的寬依賴和窄依賴

4.1 RDD依賴關係的本質內幕

因爲RDD是粗粒度的操做數據集,每一個Transformation操做都會生成一個新的RDD,因此RDD之間就會造成相似流水線的先後依賴關係;RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。如圖所示顯示了RDD之間的依賴關係。

從圖中可知:

窄依賴:是指每一個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操做都會產生窄依賴;(獨生子女)

寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操做都會產生寬依賴;(超生)

須要特別說明的是對join操做有兩種狀況:

(1)圖中左半部分join:若是兩個RDD在進行join操做時,一個RDD的partition僅僅和另外一個RDD中已知個數的Partition進行join,那麼這種類型的join操做就是窄依賴,例如圖1中左半部分的join操做(join with inputs co-partitioned);

(2)圖中右半部分join:其它狀況的join操做就是寬依賴,例如圖1中右半部分的join操做(join with inputs not co-partitioned),因爲是須要父RDD的全部partition進行join的轉換,這就涉及到了shuffle,所以這種類型的join操做也是寬依賴。

總結:

在這裏咱們是從父RDD的partition被使用的個數來定義窄依賴和寬依賴,所以能夠用一句話歸納下:若是父RDD的一個Partition被子RDD的一個Partition所使用就是窄依賴,不然的話就是寬依賴。由於是肯定的partition數量的依賴關係,因此RDD之間的依賴關係就是窄依賴;由此咱們能夠得出一個推論:即窄依賴不只包含一對一的窄依賴,還包含一對固定個數的窄依賴。

一對固定個數的窄依賴的理解:即子RDD的partition對父RDD依賴的Partition的數量不會隨着RDD數據規模的改變而改變;換句話說,不管是有100T的數據量仍是1P的數據量,在窄依賴中,子RDD所依賴的父RDD的partition的個數是肯定的,而寬依賴是shuffle級別的,數據量越大,那麼子RDD所依賴的父RDD的個數就越多,從而子RDD所依賴的父RDD的partition的個數也會變得愈來愈多。

4.2 依賴關係下的數據流視圖

在spark中,會根據RDD之間的依賴關係將DAG圖(有向無環圖)劃分爲不一樣的階段,對於窄依賴,因爲partition依賴關係的肯定性,partition的轉換處理就能夠在同一個線程裏完成,窄依賴就被spark劃分到同一個stage中,而對於寬依賴,只能等父RDD shuffle處理完成後,下一個stage才能開始接下來的計算。

所以spark劃分stage的總體思路是:從後往前推,遇到寬依賴就斷開,劃分爲一個stage;遇到窄依賴就將這個RDD加入該stage中。所以在圖2中RDD C,RDD D,RDD E,RDDF被構建在一個stage中,RDD A被構建在一個單獨的Stage中,而RDD B和RDD G又被構建在同一個stage中。

在spark中,Task的類型分爲2種:ShuffleMapTaskResultTask

簡單來講,DAG的最後一個階段會爲每一個結果的partition生成一個ResultTask,即每一個Stage裏面的Task的數量是由該Stage中最後一個RDD的Partition的數量所決定的!而其他全部階段都會生成ShuffleMapTask;之因此稱之爲ShuffleMapTask是由於它須要將本身的計算結果經過shuffle到下一個stage中;也就是說上圖中的stage1和stage2至關於mapreduce中的Mapper,而ResultTask所表明的stage3就至關於mapreduce中的reducer。

在以前動手操做了一個wordcount程序,所以可知,Hadoop中MapReduce操做中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過區別在於:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據Key進行reduce,但spark除了這兩個算子還有其餘的算子;所以從這個意義上來講,Spark比Hadoop的計算算子更爲豐富。

相關文章
相關標籤/搜索