Spark2.x詳解

1、概述html

  Apache Spark 是一個快速的, 多用途的集羣計算系統。 它提供了 Java, Scala, Python 和 R 的高級 API,以及一個支持通用的執行圖計算的優化過的引擎. 它還支持一組豐富的高級工具, 包括使用 SQL 處理結構化數據處理的 Spark SQL, 用於機器學習的 MLlib, 用於圖計算的 GraphX, 以及 Spark Streaming。java

  請注意, 在 Spark 2.0 以前, Spark 的主要編程接口是彈性分佈式數據集(RDD)。 在 Spark 2.0 以後, RDD 被 Dataset 替換, 它是像RDD 同樣的 strongly-typed(強類型), 可是在引擎蓋下更加優化。 RDD 接口仍然受支持,可是, 咱們強烈建議您切換到使用 Dataset(數據集), 其性能要更優於 RDD。node

  每個 Spark 應用程序由一個在集羣上運行着用戶的 main 函數和執行各類並行操做的 driver program(驅動程序)組成。Spark 提供的主要抽象是一個彈性分佈式數據集(RDD),它是能夠執行並行操做且跨集羣節點的元素的集合。RDD 能夠從一個 Hadoop 文件系統(或者任何其它 Hadoop 支持的文件系統),或者一個在 driver program(驅動程序)中已存在的 Scala 集合,以及經過 transforming(轉換)來建立一個 RDD。用戶爲了讓它在整個並行操做中更高效的重用,也許會讓 Spark persist(持久化)一個 RDD 到內存中。最後,RDD 會自動的從節點故障中恢復。程序員

  在 Spark 中的第二個抽象是可以用於並行操做的 shared variables(共享變量),默認狀況下,當 Spark 的一個函數做爲一組不一樣節點上的任務運行時,它將每個變量的副本應用到每個任務的函數中去。有時候,一個變量須要在整個任務中,或者在任務和 driver program(驅動程序)之間來共享。Spark 支持兩種類型的共享變量 : broadcast variables(廣播變量),它能夠用於在全部節點上緩存一個值,和 accumulators(累加器),他是一個只能被 「added(增長)」 的變量,例如 counters 和 sums。web

2、Spark依賴算法

  Spark 2.x 默認使用 Scala 2.11 來構建和發佈直到運行。(固然,Spark 也能夠與其它的 Scala 版本一塊兒運行)。爲了使用 Scala 編寫應用程序,您須要使用可兼容的 Scala 版本(例如,2.11.X)。shell

  要編寫一個 Spark 的應用程序,您須要在 Spark 上添加一個 Maven 依賴。Spark 能夠經過 Maven 中央倉庫獲取:數據庫

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

  此外,若是您想訪問一個 HDFS 集羣,則須要針對您的 HDFS 版本添加一個 hadoop-client(hadoop 客戶端)依賴。apache

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

  最後,您須要導入一些 Spark classes(類)到您的程序中去。添加下面幾行:編程

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

3、初始化Spark

  Spark 程序必須作的第一件事情是建立一個 SparkContext 對象,它會告訴 Spark 如何訪問集羣。要建立一個 SparkContext,首先須要構建一個包含應用程序的信息的 SparkConf 對象。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

  這個 appName 參數是一個在集羣 UI 上展現應用程序的名稱。 master 是一個 Spark, Mesos 或 YARN 的 cluster URL,或者指定爲在 local mode(本地模式)中運行的 「local」 字符串。在實際工做中,當在集羣上運行時,您不但願在程序中將 master 給硬編碼,而是用 使用 spark-submit 啓動應用而且接收它。然而,對於本地測試和單元測試,您能夠經過 「local」 來運行 Spark 進程。

4、彈性分佈式數據集 (RDDs)

  Spark 主要以一個 彈性分佈式數據集(RDD)的概念爲中心,它是一個容錯且能夠執行並行操做的元素的集合。有兩種方法能夠建立 RDD : 在你的 driver program(驅動程序)中 parallelizing 一個已存在的集合,或者在外部存儲系統中引用一個數據集,例如,一個共享文件系統,HDFS,HBase,或者提供 Hadoop InputFormat 的任何數據源。

1、A list of partiotions
一組分區(partition),partiotion是一個具體概念,指在一個節點中的連續的空間。一個partiotione確定使在一個節點上,可是一個節點上能夠有多個partiotione。用戶能夠在建立RDD時指定RDD的分區個數。
二、A function for computing each split
對RDD作計算,至關於對RDD的每一個split或partition作計算
3、A list of dependencies on other RDDs
RDD之間有依賴關係,可溯源。
依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。 
RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
四、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
能夠按key的hash值分區
五、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
數據本地性。計算每一個split時,在split所在機器的本地上運行task是最好的,避免了數據的移動;split有多個副本,因此preferred location不止一個
RDD五大特性

 4.1建立RDD

  • 並行集合

  能夠在您的 driver program (a Scala Seq) 中已存在的集合上經過調用 SparkContext 的 parallelize 方法來建立並行集合。該集合的元素從一個能夠並行操做的 distributed dataset(分佈式數據集)中複製到另外一個 dataset(數據集)中去。例如,這裏是一個如何去建立一個保存數字 1 ~ 5 的並行集合。

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

  在建立後,該 distributed dataset(分佈式數據集)(distData)能夠並行的執行操做。例如,咱們能夠調用 distData.reduce((a, b) => a + b) 來合計數組中的元素。後面咱們將介紹 distributed dataset(分佈式數據集)上的操做。

  並行集合中一個很重要參數是 partitions(分區)的數量,它可用來切割 dataset(數據集)。Spark 將在集羣中的每個分區上運行一個任務。一般您但願羣集中的每個 CPU 計算 2-4 個分區。通常狀況下,Spark 會嘗試根據您的羣集狀況來自動的設置的分區的數量。固然,您也能夠將分區數做爲第二個參數傳遞到 parallelize (例如sc.parallelize(data, 10)) 方法中來手動的設置它。

  • 外部 Datasets(數據集)

  Spark 能夠從 Hadoop 所支持的任何存儲源中建立 distributed dataset(分佈式數據集),包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3 等等。 Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。

  可使用 SparkContext 的 textFile 方法來建立文本文件的 RDD。此方法須要一個文件的 URI(計算機上的本地路徑 ,hdfs://s3n:// 等等的 URI),而且讀取它們做爲一個 lines(行)的集合。下面是一個調用示例:

JavaRDD<String> distFile = sc.textFile("data.txt");

  使用 Spark 讀取文件時須要注意:

    • 全部 Spark 基於文件的 input 方法, 包括 textFile, 支持在目錄上運行, 壓縮文件, 和通配符. 例如, 您可使用 textFile("/my/directory")textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

    • textFile 方法也能夠經過第二個可選的參數來控制該文件的分區數量. 默認狀況下, Spark 爲文件的每個 block(塊)建立的一 個 partition 分區(HDFS 中塊大小默認是 128MB),固然你也能夠經過傳遞一個較大的值來要求一個較高的分區數量。請注意,分區的數量不可以小於塊的數量。

  除了文本文件以外,Spark 也支持一些其它的數據格式:

    • JavaSparkContext.wholeTextFile 能夠讀取包含多個小文本文件的目錄, 而且將它們做爲一個 (filename, content) pairs 來返回. 這與 textFile 相比, 它的每個文件中的每一行將返回一個記錄. 分區由數據量來肯定, 某些狀況下, 可能致使分區太少. 針對這些狀況, wholeTextFiles 在第二個位置提供了一個可選的參數用戶控制分區的最小數量.
    • 針對 SequenceFiles, 使用 SparkContext 的 sequenceFile[K, V] 方法,其中 K 和 V 指的是文件中 key 和 values 的類型. 這些應該是 Hadoop 的 Writable 接口的子類, 像 IntWritable and Text. 此外, Spark 可讓您爲一些常見的 Writables 指定原生類型; 例如, sequenceFile[Int, String]會自動讀取 IntWritables 和 Texts.
    • 針對其它的 Hadoop InputFormats, 您可使用 SparkContext.hadoopRDD 方法, 它接受一個任意的 JobConf 和 input format class, key class 和 value class. 經過相同的方法你能夠設置你的 input source(輸入源). 你還能夠針對 InputFormats 使用基於 「new」 MapReduce API (org.apache.hadoop.mapreduce) 的 SparkContext.newAPIHadoopRDD.
    • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持使用簡單的序列化的 Java objects 來保存 RDD. 雖然這不像 Avro 這種專用的格式同樣高效,但其提供了一種更簡單的方式來保存任何的 RDD。

 4.2 RDD操做

  RDDs support 兩種類型的操做: transformations(轉換), 它會在一個已存在的 dataset 上建立一個新的 dataset, 和 actions(動做), 將在 dataset 上運行的計算後返回到 driver 程序. 例如, map 是一個經過讓每一個數據集元素都執行一個函數,並返回的新 RDD 結果的 transformation。reduce是 經過執行一些函數,聚合 RDD 中全部元素,並將最終結果給返回驅動程序的action.

  Spark 中全部的 transformations 都是 lazy(懶加載的), 所以它不會馬上計算出結果. 他們只應用於一些基本數據集的轉換 (例如. 文件). 只有當須要返回結果給驅動程序時(action操做時),transformations 纔開始計算. 這種設計使 Spark 的運行更高效.

  默認狀況下,每次你在 RDD 運行一個 action 的時, 每一個 transformed RDD 都會被從新計算。可是,您也可用 persist (或 cache) 方法將 RDD persist(持久化)到內存中;在這種狀況下,Spark 爲了下次查詢時能夠更快地訪問,會把數據保存在集羣上。此外,還支持持續持久化 RDDs 到磁盤,或複製到多個結點。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

  第一行從外部文件中定義了一個基本的 RDD,但這個數據集並未加載到內存中或即將被行動: line 僅僅是一個相似指針的東西,指向該文件. 第二行定義了 lineLengths 做爲 map 的結果。請注意,因爲 laziness(延遲加載)lineLengths 不會被當即計算. 最後,咱們運行 reduce,這是一個 action。此時,Spark 分發計算任務到不一樣的機器上運行,每臺機器都運行 map 的一部分並本地運行 reduce,僅僅返回它聚合後的結果給驅動程序.

  若是咱們也但願之後再次使用 lineLengths,咱們能夠在 reduce 以前添加如下代碼,這樣它就會被保存在 memory 中。

lineLengths.persist(StorageLevel.MEMORY_ONLY());

  4.2.1 理解閉包

    在集羣中執行代碼時,一個關於 Spark 更難的事情是理解變量和方法的範圍和生命週期。

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);   
  • Local(本地)vs. cluster(集羣)模式

    • 上面的代碼行爲是不肯定的,而且可能沒法按預期正常工做。執行做業時,Spark 會分解 RDD 操做到每一個 executor 中的 task 裏。在執行以前,Spark 計算任務的 closure(閉包)。閉包是指 executor 要在RDD上進行計算時必須對執行節點可見的那些變量和方法(在這裏是foreach())。閉包被序列化並被髮送到每一個 executor。
    • 閉包的變量副本發給每一個 executor ,當 counter 被 foreach 函數引用的時候,它已經再也不是 driver node 的 counter 了。雖然在 driver node 仍然有一個 counter 在內存中,可是對 executors 已經不可見。executor 看到的只是序列化的閉包一個副本。因此 counter 最終的值仍是 0,由於對 counter 全部的操做均引用序列化的 closure 內的值。
    • 在 local 本地模式,在某些狀況下的 foreach 功能其實是同一 JVM 上的驅動程序中執行,並會引用同一個原始的 counter 計數器,實際上可能更新值。
    • 若是須要一些全局的聚合功能,應使用 Accumulator(累加器)。當一個執行的任務分配到集羣中的各個 worker 結點時,Spark 的累加器是專門提供安全更新變量的機制。
  • 打印 RDD 的 elements
    • 另外一種常見的語法用於打印 RDD 的全部元素使用 rdd.foreach(println) 或 rdd.map(println)。在一臺機器上,這將產生預期的輸出和打印 RDD 的全部元素。
    • 然而,在集羣 cluster 模式下,stdout 輸出正在被執行寫操做 executors 的 stdout 代替,而不是在一個驅動程序上,所以 stdout 的 driver 程序不會顯示這些!
    • 要打印 driver 程序的全部元素,可使用的 collect() 方法首先把 RDD 放到 driver 程序節點上rdd.collect().foreach(println)。這可能會致使 driver 程序耗盡內存,雖然說,由於 collect() 獲取整個 RDD 到一臺機器; 若是你只須要打印 RDD 的幾個元素,一個更安全的方法是使用 take()rdd.take(100).foreach(println)

  4.2.2 Key-Value Pairs 

    雖然大多數 Spark 操做工做在包含任何類型對象的 RDDs 上,只有少數特殊的操做可用於 Key-Value 對的 RDDs. 最多見的是分佈式 「shuffle」 操做,如經過元素的 key 來進行 grouping 或 aggregating(聚合) 操做。

    在java中, key-value pairs 是使用Scala標準庫中的 scala.Tuple2 類來表明,你可使用new Tuple2(a, b) 來建立一個Tuple,訪問它的元素使用tuple._1() 和 tuple._2()

    可使用 mapToPair 和 flatMapToPair將JavaRDDs轉換爲JavaPairRDDs.

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

    咱們也可使用 counts.sortByKey() ,例如,在對按字母順序排序,最後 counts.collect() 把他們做爲一個數據對象返回給 driver 程序。

    當在 key-value pair 操做中使用自定義的 objects 做爲 key 時, 您必須確保有一個自定義的 equals() 方法和一個 hashCode() 方法。

4.3 算子  

Transformations(轉換)

下表列出了一些 Spark 經常使用的 transformations(轉換). 詳情請參考 RDD API 文檔 (Scala, Java, Python, R) 和 pair RDD 函數文檔 (Scala, Java).

Transformation(轉換) Meaning(含義)
map(func) 返回一個新的 distributed dataset(分佈式數據集),它由每一個 source(數據源)中的元素應用一個函數 func 來生成.
filter(func) 返回一個新的 distributed dataset(分佈式數據集),它由每一個 source(數據源)中應用一個函數 func 且返回值爲 true 的元素來生成.
flatMap(func) 與 map 相似,可是每個輸入的 item 能夠被映射成 0 個或多個輸出的 items(因此 func 應該返回一個 Seq 而不是一個單獨的 item).
mapPartitions(func) 與 map 相似,可是單獨的運行在在每一個 RDD 的 partition(分區,block)上,因此在一個類型爲 T 的 RDD 上運行時 func 必須是 Iterator<T> => Iterator<U> 類型.(一次處理一個partition的數據,當不會內存溢出時可代替map,減小如數據庫鏈接等次數)
mapPartitionsWithIndex(func) 與 mapPartitions 相似,可是也須要提供一個表明 partition 的 index(索引)的 interger value(整型值)做爲參數的 func,因此在一個類型爲 T 的 RDD 上運行時 func 必須是 (Int, Iterator<T>) => Iterator<U> 類型.
sample(withReplacementfractionseed) 樣本數據,設置是否放回(withReplacement), 採樣的百分比(fraction)、使用指定的隨機數生成器的種子(seed).
union(otherDataset) 反回一個新的 dataset,它包含了 source dataset(源數據集)和 otherDataset(其它數據集)的並集.
intersection(otherDataset) 返回一個新的 RDD,它包含了 source dataset(源數據集)和 otherDataset(其它數據集)的交集.
distinct([numTasks])) 返回一個新的 dataset,它包含了 source dataset(源數據集)中去重的元素.
groupByKey([numTasks]) 在一個 (K, V) pair 的 dataset 上調用時,返回一個 (K, Iterable<V>) . 
Note: 若是分組是爲了在每個 key 上執行聚合操做(例如,sum 或 average),此時使用 reduceByKey 或 aggregateByKey 來計算性能會更好(shuffle的map以後自帶combiner,會執行邏輯運算,同一個parition中的相同key,只用傳一次結果到reduce). 
Note: 默認狀況下,並行度取決於父 RDD 的分區數。能夠傳遞一個可選的 numTasks 參數來設置不一樣的任務數.
reduceByKey(func, [numTasks]) 在 (K, V) pairs 的 dataset 上調用時, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是針對每一個 key 使用給定的函數 func 來進行聚合的, 它必須是 type (V,V) => V 的類型. 像 groupByKey 同樣, reduce tasks 的數量是能夠經過第二個可選的參數來配置的.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 在 (K, V) pairs 的 dataset 上調用時, 返回 (K, U) pairs 的 dataset,其中的 values 是針對每一個 key 使用給定的 combine 函數以及一個 neutral "0" 值來進行聚合的. 容許聚合值的類型與輸入值的類型不同, 同時避免沒必要要的配置. 像 groupByKey 同樣, reduce tasks 的數量是能夠經過第二個可選的參數來配置的.(shuffle的map和reduce邏輯不同時候,不能用reducebykey,好比求average,不能先在map階段就取平均,而是總體取平均)
sortByKey([ascending], [numTasks]) 在一個 (K, V) pair 的 dataset 上調用時,其中的 K 實現了 Ordered,返回一個按 keys 升序或降序的 (K, V) pairs 的 dataset, 由 boolean 類型的 ascending 參數來指定.
join(otherDataset, [numTasks]) 在一個 (K, V) 和 (K, W) 類型的 dataset 上調用時,返回一個 (K, (V, W)) pairs 的 dataset,它擁有每一個 key 中全部的元素對。Outer joins 能夠經過 leftOuterJoinrightOuterJoin 和 fullOuterJoin 來實現.
cogroup(otherDataset, [numTasks]) 在一個 (K, V) 和的 dataset 上調用時,返回一個 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset. 這個操做也調用了 groupWith.
cartesian(otherDataset) 在一個 T 和 U 類型的 dataset 上調用時,返回一個 (T, U) pairs 類型的 dataset(全部元素的 pairs,即笛卡爾積).
pipe(command[envVars]) 經過使用 shell 命令來將每一個 RDD 的分區給 Pipe。例如,一個 Perl 或 bash 腳本。RDD 的元素會被寫入進程的標準輸入(stdin),而且 lines(行)輸出到它的標準輸出(stdout)被做爲一個字符串型 RDD 的 string 返回.
coalesce(numPartitions) Decrease(下降)RDD 中 partitions(分區)的數量爲 numPartitions。對於執行filter後一個大的 dataset 操做是更有效的.默認無shuffle
repartition(numPartitions) Reshuffle(從新洗牌)RDD 中的數據以建立或者更多的 partitions(分區)並將每一個分區中的數據儘可能保持均勻. 該操做老是經過網絡來 shuffles 全部的數據.
repartitionAndSortWithinPartitions(partitioner) 根據給定的 partitioner(分區器)對 RDD 進行從新分區,並在每一個結果分區中,按照 key 值對記錄排序。這比每個分區中先調用 repartition 而後再 sorting(排序)效率更高,由於它能夠將排序過程推送到 shuffle 操做的機器上進行.

Actions(動做)

下表列出了一些 Spark 經常使用的 actions 操做。詳細請參考 RDD API 文檔 (Scala, Java, Python, R)和 pair RDD 函數文檔 (Scala, Java).

Action(動做) Meaning(含義)
reduce(func) 使用函數 func 聚合 dataset 中的元素,這個函數 func 輸入爲兩個元素,返回爲一個元素。這個函數應該是可交換(commutative )和關聯(associative)的,這樣才能保證它能夠被並行地正確計算.
collect() 在 driver 程序中,以一個 array 數組的形式返回 dataset 的全部元素。這在過濾器(filter)或其餘操做(other operation)以後返回足夠小(sufficiently small)的數據子集一般是有用的.
count() 返回 dataset 中元素的個數.
first() 返回 dataset 中的第一個元素(相似於 take(1).
take(n) 將數據集中的前 n 個元素做爲一個 array 數組返回.
takeSample(withReplacementnum, [seed]) 對一個 dataset 進行隨機抽樣,返回一個包含 num 個隨機抽樣(random sample)元素的數組,參數 withReplacement 指定是否有放回抽樣,參數 seed 指定生成隨機數的種子.
takeOrdered(n[ordering]) 返回 RDD 按天然順序(natural order)或自定義比較器(custom comparator)排序後的前 n 個元素.
saveAsTextFile(path) 將 dataset 中的元素以文本文件(或文本文件集合)的形式寫入本地文件系統、HDFS 或其它 Hadoop 支持的文件系統中的給定目錄中。Spark 將對每一個元素調用 toString 方法,將數據元素轉換爲文本文件中的一行記錄.
saveAsSequenceFile(path
(Java and Scala)
將 dataset 中的元素以 Hadoop SequenceFile 的形式寫入到本地文件系統、HDFS 或其它 Hadoop 支持的文件系統指定的路徑中。該操做能夠在實現了 Hadoop 的 Writable 接口的鍵值對(key-value pairs)的 RDD 上使用。在 Scala 中,它還能夠隱式轉換爲 Writable 的類型(Spark 包括了基本類型的轉換,例如 Int, Double, String 等等).
saveAsObjectFile(path
(Java and Scala)
使用 Java 序列化(serialization)以簡單的格式(simple format)編寫數據集的元素,而後使用 SparkContext.objectFile() 進行加載.
countByKey() 僅適用於(K,V)類型的 RDD 。返回具備每一個 key 的計數的 (K , Int)pairs 的 hashmap.
foreach(func) 對 dataset 中每一個元素運行函數 func 。這一般用於反作用(side effects),例如更新一個 Accumulator(累加器)或與外部存儲系統(external storage systems)進行交互。Note:修改除 foreach()以外的累加器之外的變量(variables)可能會致使未定義的行爲(undefined behavior)。詳細介紹請閱讀 Understanding closures(理解閉包) 部分.

4.2.4  shuffle操做

  Spark 裏的某些操做會觸發 shuffle。shuffle 是spark 從新分配數據的一種機制,使得這些數據能夠跨不一樣的區域進行分組。這一般涉及在 executors和機器之間拷貝數據,這使得 shuffle 成爲一個複雜的、代價高的操做。

  爲了明白 reduceByKey 操做的過程,咱們以 reduceByKey 爲例。reduceBykey 操做產生一個新的 RDD,其中 key 全部相同的的值組合成爲一個 tuple - key 以及與 key 相關聯的全部值在 reduce 函數上的執行結果。面臨的挑戰是,一個 key 的全部值不必定都在一個同一個 paritition 分區裏,甚至是不必定在同一臺機器裏,可是它們必須共同被計算。

  在 spark 裏,特定的操做須要數據不跨分區分佈。在計算期間,一個任務在一個分區上執行,爲了全部數據都在單個 reduceByKey 的 reduce 任務上運行,咱們須要執行一個 all-to-all 操做。它必須從全部分區讀取全部的 key 和 key對應的全部的值,而且跨分區彙集去計算每一個 key 的結果 - 這個過程就叫作 shuffle.。

  儘管每一個分區新 shuffle 的數據集將是肯定的,分區自己的順序也是這樣,可是這些數據的順序是不肯定的。若是但願 shuffle 後的數據是有序的,可使用:

  • mapPartitions 對每一個 partition 分區進行排序,例如, .sorted
  • repartitionAndSortWithinPartitions 在分區的同時對分區進行高效的排序.
  • sortBy 對 RDD 進行全局的排序

觸發的 shuffle 操做包括 repartition 操做,如 repartition 和 coalesce‘ByKey 操做像 groupByKey 和 reduceByKey, 和 join操做, 像 cogroup 和 join.

4.2.5 RDD Persistence(持久化)

  Spark 中一個很重要的能力是將數據 persisting 持久化(或稱爲 caching 緩存),在多個操做間均可以訪問這些持久化的數據。當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。

  RDD 可使用 persist() 方法或 cache() 方法進行持久化。數據將會在第一次 action 操做時進行計算,並緩存在節點的內存中。Spark 的緩存具備容錯機制,若是一個緩存的 RDD 的某個分區丟失了,Spark 將按照原來的計算過程,自動從新計算並進行緩存。

  另外,每一個持久化的 RDD 可使用不一樣的 storage level 存儲級別進行緩存,例如,持久化到磁盤、已序列化的 Java 對象形式持久化到內存(能夠節省空間)、跨節點間複製、以 off-heap 的方式存儲在 Tachyon。這些存儲級別經過傳遞一個 StorageLevel 對象給 persist() 方法進行設置。cache() 方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是 StorageLevel.MEMORY_ONLY(將反序列化的對象存儲到內存中)。詳細的存儲級別介紹以下:

Storage Level(存儲級別) Meaning(含義)
MEMORY_ONLY 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中. 若是內存空間不夠,部分數據分區將再也不緩存,在每次須要用到這些數據時從新進行計算. 這是默認的級別.
MEMORY_AND_DISK 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。若是內存空間不夠,將未緩存的數據分區存儲到磁盤,在須要使用這些分區時從磁盤讀取.
MEMORY_ONLY_SER 
(Java and Scala)
將 RDD 以序列化的 Java 對象的形式進行存儲(每一個分區爲一個 byte 數組)。這種方式會比反序列化對象的方式節省不少空間,尤爲是在使用 fast serializer 時會節省更多的空間,可是在讀取時會增長 CPU 的計算負擔.
MEMORY_AND_DISK_SER 
(Java and Scala)
相似於 MEMORY_ONLY_SER ,可是溢出的分區會存儲到磁盤,而不是在用到它們時從新計算.
DISK_ONLY 只在磁盤上緩存 RDD.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 與上面的級別功能相同,只不過每一個分區在集羣中兩個節點上創建副本.
OFF_HEAP (experimental 實驗性) 相似於 MEMORY_ONLY_SER, 可是將數據存儲在 off-heap memory 中. 這須要啓用 off-heap 內存.

  Spark 會自動監視每一個節點上的緩存使用狀況,並使用 least-recently-used(LRU)的方式來丟棄舊數據分區。 若是您想手動刪除 RDD 而不是等待它掉出緩存,使用 RDD.unpersist() 方法。

4.2.6 共享變量

  • 廣播變量
    • Broadcast variables(廣播變量)容許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務傳遞一個副本。它們是如何來使用呢,例如,廣播變量能夠用一種高效的方式給每一個節點傳遞一份比較大的 input dataset(輸入數據集)副本。在使用廣播變量時,Spark 也嘗試使用高效廣播算法分發 broadcast variables(廣播變量)以下降通訊成本。
    • Spark 的 action(動做)操做是經過一系列的 stage(階段)進行執行的,這些 stage(階段)是經過分佈式的 「shuffle」 操做進行拆分的。Spark 會自動廣播出每一個 stage(階段)內任務所須要的公共數據。這種狀況下廣播的數據使用序列化的形式進行緩存,並在每一個任務運行前進行反序列化。這也就意味着,只有在跨越多個 stage(階段)的多個任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的狀況下,使用廣播變量會有比較好的效果。
    • 廣播變量經過在一個變量 v 上調用 SparkContext.broadcast(v) 方法來進行建立。廣播變量是 v 的一個 wrapper(包裝器),能夠經過調用 value方法來訪問它的值。代碼示例以下:
      Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
      broadcastVar.value();
      // returns [1, 2, 3]
    • 在建立廣播變量以後,在集羣上執行的全部的函數中,應該使用該廣播變量代替原來的 v 值,因此節點上的 v 最多分發一次。另外,對象 v 在廣播後不該該再被修改,以保證分發到全部的節點上的廣播變量具備一樣的值。

  • Accumulators(累加器)

    • Accumulators(累加器)是一個僅能夠執行 「added」(添加)的變量來經過一個關聯和交換操做,所以能夠高效地執行支持並行。累加器能夠用於實現 counter( 計數,相似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數值型的累加器,而且程序員能夠添加新的支持類型。

    • 做爲一個用戶,您能夠建立 accumulators(累加器)而且重命名. 以下圖所示, 一個命名的 accumulator 累加器(在這個例子中是 counter)將顯示在 web UI 中,用於修改該累加器的階段。 Spark 在 「Tasks」 任務表中顯示由任務修改的每一個累加器的值。

    • 能夠經過調用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 方法建立數值類型的 accumulator(累加器)以分別累加 Long 或 Double 類型的值。集羣上正在運行的任務就可使用 add 方法來累計數值。然而,它們不可以讀取它的值。只有 driver program(驅動程序)纔可使用 value 方法讀取累加器的值。
      LongAccumulator accum = jsc.sc().longAccumulator();
      
      sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
      // ...
      // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
      
      accum.value();
      // returns 10
    • 雖然此代碼使用 Long 類型的累加器的內置支持, 可是開發者經過 AccumulatorV2 它的子類來建立本身的類型. AccumulatorV2 抽象類有幾個須要 override(重寫)的方法: reset 方法可將累加器重置爲 0, add 方法可將其它值添加到累加器中, merge 方法可將其餘一樣類型的累加器合併爲一個. 其餘須要重寫的方法可參考 API documentation. 例如, 假設咱們有一個表示數學上 vectors(向量)的 MyVector 類,咱們能夠寫成:

      class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
      
        private MyVector myVector = MyVector.createZeroVector();
      
        public void reset() {
          myVector.reset();
        }
      
        public void add(MyVector v) {
          myVector.add(v);
        }
        ...
      }
      
      // Then, create an Accumulator of this type:
      VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
      // Then, register it into spark context:
      jsc.sc().register(myVectorAcc, "MyVectorAcc1");

      注意,在開發者定義本身的 AccumulatorV2 類型時, resulting type(返回值類型)可能與添加的元素的類型不一致。

5、調度流程

5.1 寬窄依賴

  

窄依賴:

父RDD和子RDD partition之間的關係是一對一的。或者父RDD一個partition只對應一個子RDD的partition狀況下的父RDD和子RDD partition關係是多對一的。不會有shuffle的產生。父RDD一個分區去到子RDD的一個分區

寬依賴:

父RDD與子RDD partition之間的關係是一對多會有shuffle的產生。父RDD的一個分區的數據去到子RDD的不一樣分區裏面。

(其實區分寬窄依賴主要就是看父RDD的一個Partition的流向,要是流向一個的話就是窄依賴,流向多個的話就是寬依賴。)

 5.2 stage切分

  

  Spark任務會根據RDD之間的依賴關係,造成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG劃分相互依賴的多個stage,劃分stage的依據就是RDD之間的寬窄依賴。遇到寬依賴就劃分stage,每一個stage包含一個或多個task任務。而後將這些task以taskSet的形式提交給TaskScheduler運行stage是由一組並行的task組成。

  切割規則:從後往前遇到寬依賴就切割stage。

  一個stage內的窄依賴是pipeline管道計算模式,pipeline只是一種計算思想,模式。

  • Spark的pipeLine的計算模式,至關於執行了一個高階函數f3(f2(f1(textFile))) !+!+!=3 也就是來一條數據而後計算一條數據,把全部的邏輯走完,而後落地,準確的說一個task處理遺傳分區的數據 由於跨過了不一樣的邏輯的分區。而MapReduce是 1+1=2,2+1=3的模式,也就是計算完落地,而後在計算,而後再落地到磁盤或內存,最後數據是落在計算節點上,按reduce的hash分區落地。因此這也是比Mapreduce快的緣由,徹底基於內存計算。
  • 管道中的數據什麼時候落地:shuffle write的時候,對RDD進行持久化的時候。
  • Stage的task並行度是由stage的最後一個RDD的分區數來決定的 。通常來講,一個partiotion對應一個task,但最後reduce的時候能夠手動改變reduce的個數,也就是分區數,即改變了並行度。例如reduceByKey(XXX,3),GroupByKey(4),union由的分區數由前面的相加。
  • 如何提升stage的並行度:reduceBykey(xxx,numpartiotion),join(xxx,numpartiotion)

5.3 執行流程

   

  Driver運行在客戶端:

    • 客戶端啓動後直接運行用戶程序,啓動Driver相關的工做,初始化SparkContext時候最重要的就是構造一個DAGScheduler和TaskScheduler。
    • DAGScheduler首先建立一個finalStage,而後遞歸方式倒着切割Stage
    • 客戶端的Driver向Master註冊。
    • Master還會讓Worker啓動Exeuctor。Worker建立一個ExecutorRunner線程,ExecutorRunner會啓動ExecutorBackend進程。
    • ExecutorBackend啓動後會向Driver的SchedulerBackend註冊。
    • 每一個Stage包含的Task經過TaskScheduler分配給Executor執行。
    • 全部stage都完成後做業結束。

    (若是Driver運行在Worker上,客戶端提交做業給Master,Master讓一個Worker啓動Driver,即SchedulerBackend。)

  Spark on Yarn:

  

 5.4 任務調度

  

相關文章
相關標籤/搜索