spark2.2官方教程筆記-spark編程嚮導

總括

  首先,spark應用程序由一個驅動程序構成,由它運行用戶的main函數,而且在集羣上執行各類並行化操做。這個抽象的spark應用函數提供彈性分部式數據集【Spark provides is a resilient distributed dataset (RDD)】,一個rdd能夠從hadoop文件系統,或者現存的scala集合,或者從其它集合中轉換生成。咱們可讓rdd保存在內存中,可讓rdd能夠高效的作序列化操做。而且RDD還有一個牛逼的功能,就是自動恢復無效的節點。html

  其次,spark支持並行化操做中共享變量。當一個spark在不一樣的節點上運行一系列任務,spark能複製函數中每個變量到每個任務節點中。然而,有時一個變量要在不一樣集羣中共享,spark支持兩種方式解決這個問題:廣播變量(緩存一個數值到全部節點中),累加器(只能用來累加和求和的變量)java

連接相關庫

  創建spark2.2.0須要依賴相關庫,若是須要使用hadoop 集羣還須要使用hdfs的庫,maven示例node

spark coreweb

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

hadoopshell

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

初始化spark

 spark編程的第一件事就是建立sparkContext對象,該對象告訴spark如何訪問集羣。數據庫

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

每一個jvm只能實例化一個spark上下文,在建立一個新的sparkContext以前你必須調用stop函數(估計是單例模式?)apache

appName是顯示在集羣控制界面Ui上的名稱,master是運行spark的模式,有 Spark, Mesos or YARN cluster URL,或者是本地模式「local」,若是是運行集羣模式,就不須要以硬編碼方式設置master,而是以spark-submit的方式啓動程序,而且在集羣中接受運行。對於本地測試和單機測試,建議使用設置「local」的方式去在進程中運行spark。編程

在shell中運行

 在spark shell中,一個sparkContext已經內建好了,變量名是sc,建立的SparkContext是不生效的。api

在shell中,能夠經過--master參數設置運行模式,--jars添加jar包,--packages添加額外包數組

$ ./bin/spark-shell --master local[4]
$ ./bin/spark-shell --master local[4] --jars code.jar
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

彈性分佈式數據集(RDD)

spark的核心概念是rdd,一個容錯的可並行處理的分佈式數據集合框架。rdd經過兩種方式建立:1,外部數據源(各類集合對象並行化,文件系統,hdfs,hbase等);2,rdd轉換。

並行化集合

在程序中現有的集合中調用sparkContext的parallelize(集合對象)方法。舉個栗子:建立1到5的並行化集合對象

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

代碼很簡單,spark官方文檔這裏絮絮不休的講了一些廢話,只有一句是重點,

We describe operations on distributed datasets later on.spark是懶執行,既是遇到action操做的時候才運行程序。

對於並行化集合的一個重要的參數是數據集拆分的數量。spark將在集羣上的每一個節點運行一個任務。一般集羣的cpu數量和分區數一致比較好。

外部數據集

spark能經過如下幾種方式建立rdd:本地文件系統,hdfs, Cassandra(一款開源分佈式NOSql圖形數據庫), HBase, Amazon S3等,spark支持文本文件,序列化文件,其它hadoop輸入格式(官方文檔重複了n次了,估計是spark引以自豪的東西)。

對於文本文件,rdd可以使用 SparkContext 的 textFile 方法建立rdd對象。這個方法參數uri能夠是本地路徑,hdfs://,s3n://,而後讀取其中的每一行。舉個栗子,咱們能夠經過這種方式累加文本文件的行數的長度。

distFile.map(s => s.length).reduce((a, b) => a + b).

一些注意點

  • 若是uri是本地參數,那麼集羣中的每一個節點相同路徑下均要可以訪問到這個文件,可使用文件拷貝或者網絡mount共享這文件。
  • spark文件輸入中,支持輸入文件夾,壓縮文件和甚至支持通配符。舉個栗子
textFile("/my/directory"), 
textFile("/my/directory/*.txt"),
textFile("/my/directory/*.gz")
  • 這裏講textFile(filepath,minPartitions)第二個參數minPartitions,指定數據最小分區。The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.舉個栗子
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/"  //local file  
val rdd1 = sc.textFile(path,2)

從本地系統中讀取licenses這個文件夾下的全部文件

這裏特別注意的是,好比這個文件夾下有35個文件,上面分區數設置是2,那麼整個RDD的分區數是35*2?這是錯誤的,這個RDD的分區數無論你的partition數設置爲多少時,只要license這個文件夾下的這個文件a.txt(好比有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個文件其中沒有一個超過128m,那麼分區數就是 35個。

  • SparkContext.wholeTextFiles() 讀取路徑下的全部文本文件,返回 (filename, content) 對。
  • RDD.saveAsObjectFile and SparkContext.objectFile 將rdd對象序列化後保存。
  • SequenceFiles,文件序列化,使用SparkContext的sequenceFiles【k,v】方法。實際上這是Hadoop的Writable接口的子類,相似的子類有IntWritable和Text。另外,spark容許你對部分Writables進行自定義類型,好比sequenceFile[Int,String]將自動讀取IntWritables和Texts。
  • 對於其餘Hadoop 輸入格式,可使用SparkContext.hadoopRDD方法。

RDD操做

RDD實現類支持兩種類型的操做:Transformations,從另外一個RDD轉換成一個新的數據集;Action,通過從數據中計算後返回一個新的值。好比 map 函數,傳入一個rdd,返回一個新的處理過的rdd。另外一方面,reduce是action操做,其對rdd中全部元素通過某種函數處理,返回一個最終結果給驅動程序。

  全部spark的Transformation是懶處理的,其不會馬上進行計算,而是記錄須要用於處理的數據集。只有當遇到action操做後才進行計算處理。這種方式讓spark運行更高效。

  所以,當你運行一個action的時候每一個rdd會被再次計算。然而,經過persist/cache你也能持久化一個rdd在內存中,在你下次查詢這個rdd的時候,程序能從集羣中快速讀取。

   rdd也支持在硬盤或者在多個節點持久化。

基礎

  先上一段代碼

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

  第一行定義一個來自外部文件的簡單rdd對象。這個數據集不會立刻加載到內存中,lines幾乎只是一個指向文件的指針。第二行定義一個lineLengths做爲map轉換的結果。因此lineLengths由於懶原則不會馬上處理。最終,咱們運行一個reduce,reduce做爲action操做,會立刻出發程序,將任務下發到不一樣機器,每一個機器運行map的一部分和reduction的一部分,最終返回驅動一個結果。

若是咱們會再次使用到lineLengths,建議持久化

lineLengths.persist()

在運行reduce以前,lineLengths在第一次計算後依舊會被保存在內存中數據不會被釋放,下次計算時不須要再次出來以前的lines.map(s => s.length)操做。

給spark傳函數

spark api在集羣中運行過程高度依賴傳函數功能。有兩種推薦的方法:

  1. 匿名函數,經過短碼便可實現。
  2. 全局單例對象的靜態方法。
    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    
    myRdd.map(MyFunctions.func1)

     

注意,你也能夠傳遞一個類對象實例山谷的方法(而不是單個對象),不過這會致使傳遞函數的同時,須要把相應的對象也發送到集羣中各個節點上。例如,咱們定義一個MyClass以下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

若是咱們 new MyClass 建立一個實例,並調用其 doStuff 方法,同時doStuff中的 map引用了該MyClass實例上的 func1 方法,那麼接下來,這個MyClass對象將被髮送到集羣中全部節點上。rdd.map(x => this.func1(x)) 也會有相似的效果。

 

相似地,若是引用外部對象的成員變量,也會致使對整個對象實例的引用:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上面的代碼對field的引用等價於 rdd.map(x => this.field + x),這將致使引用整個this對象。

爲了不相似問題,最簡單的方式就是,將field執拗到一個本地臨時變量中,而不是從外部直接訪問之,以下:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解閉包

spark中比較須要注意的點是:跨節點執行代碼中的變量和方法的做用域和生命週期。在生命週期範圍外修改rdd中的數值容易形成錯誤。在spark單機模式和集羣模式下,結果每每不同。

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

運行結果counter:0

本地模式和集羣模式的區別

spark把rdd操做分割成多個獨立子操做,每一個任務發佈給不一樣計算節點。執行操做前,rdd會計算閉包,把計算所須要的變量和方法副本序列化後發送給每一個計算節點。節點上的counter變量和驅動器上的counter變量不是同一個變量。因此二者值不同。

未解決這類問題,須要使用累加器。累加器是Spark中專門用於集羣跨節點分佈式執行計算中,安全地更新同一變量的機制。

一般來講,閉包(由循環或本地方法組成),不該該改寫全局狀態。Spark中改寫閉包以外對象的行爲是未定義的。這種代碼,有可能在本地模式下能正常工做,但這只是偶然狀況,一樣的代碼在分佈式模式下其行爲極可能不是你想要的。因此,若是須要全局聚合,請記得使用累加器。

打印rdd

在實際編程中,咱們常常須要把RDD中的元素打印輸出到屏幕上(標準輸出stdout),通常會採用語句rdd.foreach(println)或者rdd.map(println)。當採用本地模式(local)在單機上執行時,這些語句會打印出一個RDD中的全部元素。可是,當採用集羣模式執行時,在worker節點上執行打印語句是輸出到worker節點的stdout中,而不是輸出到任務控制節點Driver Program中,所以,任務控制節點Driver Program中的stdout是不會顯示打印語句的這些輸出內容的。爲了可以把全部worker節點上的打印輸出信息也顯示到Driver Program中,可使用collect()方法,好比,rdd.collect().foreach(println),可是,因爲collect()方法會把各個worker節點上的全部RDD元素都抓取到Driver Program中,所以,這可能會致使內存溢出。所以,當你只須要打印RDD的部分元素時,能夠採用語句rdd.take(100).foreach(println)。

使用鍵值對

當大部分spark均可以在任意類型對象上進行rdd操做,然然而也有部分操做只能在鍵值對上進行。其中最多見的是「shuffle」操做(對應中文是拖動的意思),好比經過key進行分組或聚合元素操做。

Transformations操做

一些通用的rddtransformation函數;

函數名  說明
map

返回一個新的數據集,其中每一個元素都是通過func處理後得來

filter 返回一個新的數據集,其中每一個元素都是通過func過濾後得來
flatmap 和map類似,可是其中的每一個輸入元素可能有0或多個輸出。
mapPartitions 和map類似,只是map輸入對應於每一個分區
mapPartitionsWithIndex  
sample(withReplacement, fraction, seed)     採樣部分(比例取決於 fraction )數據,同時能夠指定是否使用回置採樣(withReplacement),以及隨機數種子(seed)
Union(other Dataset) 返回數據集的並集
intersection(otherDataset) 返回原數據集和參數數據集的交集
distinct([numTasks]) 去重
groupByKey([numTasks]) 只對包含鍵值對的RDD有效,如源RDD包含 (K, V) 對,則該算子返回一個新的數據集包含 (K, Iterable<V>) 對。
注意:若是你須要按key分組聚合的話(如sum或average),推薦使用 reduceByKey或者 aggregateByKey 以得到更好的性能。
注意:默認狀況下,輸出計算的並行度取決於源RDD的分區個數。固然,你也能夠經過設置可選參數 numTasks 來指定並行任務的個數。
reduceByKey(func, [numTasks]) 若是源RDD包含元素類型 (K, V) 對,則該算子也返回包含(K, V) 對的RDD,只不過每一個key對應的value是通過func聚合後的結果,而func自己是一個 (V, V) => V 的映射函數。
另外,和 groupByKey 相似,能夠經過可選參數 numTasks 指定reduce任務的個數。
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 若是源RDD包含 (K, V) 對,則返回新RDD包含 (K, U) 對,其中每一個key對應的value都是由 combOp 函數 和 一個「0」值zeroValue 聚合獲得。容許聚合後value類型和輸入value類型不一樣,避免了沒必要要的開銷。和 groupByKey 相似,能夠經過可選參數 numTasks 指定reduce任務的個數。
sortByKey([ascending], [numTasks]) 若是源RDD包含元素類型 (K, V) 對,其中K可排序,則返回新的RDD包含 (K, V) 對,並按照 K 排序(升序仍是降序取決於 ascending 參數)
join(otherDataset, [numTasks]) 若是源RDD包含元素類型 (K, V) 且參數RDD(otherDataset)包含元素類型(K, W),則返回的新RDD中將包含內關聯後key對應的 (K, (V, W)) 對。外關聯(Outer joins)操做請參考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numTasks]) 若是源RDD包含元素類型 (K, V) 且參數RDD(otherDataset)包含元素類型(K, W),則返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。該算子還有個別名:groupWith
cartesian(otherDataset) 若是源RDD包含元素類型 T 且參數RDD(otherDataset)包含元素類型 U,則返回的新RDD包含前兩者的笛卡爾積,其元素類型爲 (T, U) 對。
pipe(command[envVars]) 以shell命令行管道處理RDD的每一個分區,如:Perl 或者 bash 腳本。
RDD中每一個元素都將依次寫入進程的標準輸入(stdin),而後按行輸出到標準輸出(stdout),每一行輸出字符串即成爲一個新的RDD元素。
coalesce(numPartitions) 將RDD的分區數減小到numPartitions。當之後大數據集被過濾成小數據集後,減小分區數,能夠提高效率。
repartition(numPartitions) 將RDD數據從新混洗(reshuffle)並隨機分佈到新的分區中,使數據分佈更均衡,新的分區個數取決於numPartitions。該算子老是須要經過網絡混洗全部數據。
repartitionAndSortWithinPartitions(partitioner) 根據partitioner(spark自帶有HashPartitioner和RangePartitioner等)從新分區RDD,而且在每一個結果分區中按key作排序。這是一個組合算子,功能上等價於先 repartition 再在每一個分區內排序,但這個算子內部作了優化(將排序過程下推到混洗同時進行),所以性能更好。
 

mapPartitions說明

map()的輸入函數是應用於RDD中每一個元素,而mapPartitions()的輸入函數是應用於每一個分區

package test

import scala.Iterator

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TestRdd {
  def sumOfEveryPartition(input: Iterator[Int]): Int = {
    var total = 0
    input.foreach { elem =>
      total += elem
    }
    total
  }
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Rdd Test")
    val spark = new SparkContext(conf)
    val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6個元素,分紅2個partition
    val result = input.mapPartitions(
      partition => Iterator(sumOfEveryPartition(partition)))//partition是傳入的參數,是個list,要求返回也是list,即Iterator(sumOfEveryPartition(partition))
    result.collect().foreach {
      println(_)//6 15
    }
    spark.stop()
  }
}

 

Action

Action函數 做用
reduce(func) 將RDD中元素按func進行聚合(func是一個 (T,T) => T 的映射函數,其中T爲源RDD元素類型,而且func須要知足 交換律 和 結合律 以便支持並行計算)
collect() 將數據集中全部元素以數組形式返回驅動器(driver)程序。一般用於,在RDD進行了filter或其餘過濾操做後,將一個足夠小的數據子集返回到驅動器內存中。
count() 返回數據集中元素個數
first() 返回數據集中首個元素(相似於 take(1) )
take(n) 返回數據集中前 個元素
takeSample(withReplacement,num, [seed]) 返回數據集的隨機採樣子集,最多包含 num 個元素,withReplacement 表示是否使用回置採樣,最後一個參數爲可選參數seed,隨機數生成器的種子。
takeOrdered(n[ordering]) 按元素排序(能夠經過 ordering 自定義排序規則)後,返回前 n 個元素
saveAsTextFile(path) 將數據集中元素保存到指定目錄下的文本文件中(或者多個文本文件),支持本地文件系統、HDFS 或者其餘任何Hadoop支持的文件系統。
保存過程當中,Spark會調用每一個元素的toString方法,並將結果保存成文件中的一行。
saveAsSequenceFile(path)
(Java and Scala)
將數據集中元素保存到指定目錄下的Hadoop Sequence文件中,支持本地文件系統、HDFS 或者其餘任何Hadoop支持的文件系統。適用於實現了Writable接口的鍵值對RDD。在Scala中,一樣也適用於可以被隱式轉換爲Writable的類型(Spark實現了全部基本類型的隱式轉換,如:Int,Double,String 等)
saveAsObjectFile(path)
(Java and Scala)
將RDD元素以Java序列化的格式保存成文件,保存結果文件可使用 SparkContext.objectFile 來讀取。
countByKey() 只適用於包含鍵值對(K, V)的RDD,並返回一個哈希表,包含 (K, Int) 對,表示每一個key的個數。
foreach(func) 在RDD的每一個元素上運行 func 函數。一般被用於累加操做,如:更新一個累加器(Accumulator ) 或者 和外部存儲系統互操做。
注意:用 foreach 操做出累加器以外的變量可能致使未定義的行爲。更詳細請參考前面的「理解閉包」(Understanding closures )這一小節。

混洗操做

有一些Spark算子會觸發衆所周知的混洗(Shuffle)事件。Spark中的混洗機制是用於將數據從新分佈,其結果是全部數據將在各個分區間從新分組。通常狀況下,混洗須要跨執行器(executor)或跨機器複製數據,這也是混洗操做通常都比較複雜並且開銷大的緣由。

背景

爲了理解混洗階段都發生了哪些事,我首先以reduceByKey 爲例來看一下。reduceByKey會生成一個新的RDD,將源RDD中一個key對應的多個value組合進一個tuple - 而後將這些values輸入給reduce函數,獲得的result再和key關聯放入新的RDD中。這個的難點在於對於某一個key來講,並不是其對應的全部values都在同一個分區(partition)中,甚至有可能都不在同一臺機器上,可是這些values又必須放到一塊兒計算reduce結果。

在Spark中,一般是因爲爲了進行某種計算操做,而將數據分佈到所須要的各個分區當中。而在計算階段,單個任務(task)只會操做單個分區中的數據 – 所以,爲了組織好每一個reduceByKey中reduce任務執行時所需的數據,Spark須要執行一個多對多操做。即,Spark須要讀取RDD的全部分區,並找到全部key對應的全部values,而後跨分區傳輸這些values,並將每一個key對應的全部values放到同一分區,以便後續計算各個key對應values的reduce結果 – 這個過程就叫作混洗(Shuffle)。

shuffle開銷大。

RDD持久化

rdd持久化分七個級別

Storage Level Meaning
MEMORY_ONLY

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

未序列化java對象存儲在jvm內,若是內存不足,部分數據不會存儲,且再次使用的過程會從新計算。這是默認級別,且cpu處理器最有效率的方式。

MEMORY_AND_DISK

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

未序列化java對象存儲在jvm內,若是內存不足,部分數據存儲在硬盤,且再次使用的過程會讀取硬盤。

MEMORY_ONLY_SER 
(Java and Scala)

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

以序列化形式存儲RDD(每一個分區一個字節數組)。一般這種方式比未序列化存儲方式要更省空間,尤爲是若是你選用了一個比較好的序列化協議(fast serializer),可是這種方式也相應的會消耗更多的CPU來讀取數據。

MEMORY_AND_DISK_SER 
(Java and Scala)

Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

和MEMORY_ONLY_SER相似,只是當內存裝不下的時候,會將分區的數據吐到磁盤上,而不是每次用到都從新計算。

DISK_ONLY

Store the RDD partitions only on disk.

只存儲在磁盤上。這種緩存估計用在處理超大文件的過程。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Same as the levels above, but replicate each partition on two cluster nodes.

和上面沒有」_2″的級別相對應,只不過每一個分區數據會在兩個節點上保存兩份副本。

OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

如何選擇存儲級別?

Spark的存儲級別主要可於在內存使用和CPU佔用之間作一些權衡。建議根據如下步驟來選擇一個合適的存儲級別:

  • 若是RDD能使用默認存儲級別(MEMORY_ONLY),那就儘可能使用默認級別。這是CPU效率最高的方式,全部RDD算子都能以最快的速度運行。
  • 若是步驟1的答案是否(不適用默認級別),那麼能夠嘗試MEMORY_ONLY_SER級別,並選擇一個高效的序列化協議(selecting a fast serialization library),這回大大節省數據對象的存儲空間,同時速度也還不錯。
  • 儘可能不要把數據吐到磁盤上,除非:1.你的數據集從新計算的代價很大;2.你的數據集是從一個很大的數據源中過濾獲得的結果。不然的話,重算一個分區的速度極可能和從磁盤上讀取差很少。
  • 若是須要支持容錯,能夠考慮使用帶副本的存儲級別(例如:用Spark來服務web請求)。全部的存儲級別都可以以重算丟失數據的方式來提供容錯性,可是帶副本的存儲級別可讓你的應用持續的運行,而沒必要等待重算丟失的分區。

刪除數據

Spark可以自動監控各個節點上緩存使用率,而且以LRU(last recent used)的方式將老數據逐出內存。若是你更喜歡手動控制的話,能夠用RDD.unpersist() 方法來刪除無用的緩存。

共享變量

通常來講,當spark傳遞一個函數操做到遠程集羣節點,驅動程序會把相關的數據以副本形式發送到各個節點。由於跨節點的讀寫效率過低了,因此通常不會對遠程的副本數據進行更新。然而有時候也須要對一些數據進行讀寫,這就是廣播變量和累加器。

廣播變量

廣播變量(groadcast varible)爲只讀變量,它有運行SparkContext的驅動程序建立後發送給參與計算的節點。對那些須要讓工做節點高效地訪問相同數據的應用場景,好比機器學習。咱們能夠在SparkContext上調用broadcast方法建立廣播變量:

  val broadcastList = sc.broadcast(List("Spark","Impala","Hadoop"))

廣播變量也能夠被非驅動程序所在節點(即工做節點)訪問,訪問方法就是調用該變量的value方法

  sc.parallelize(List("1","2","3")).map(x => broadcastList.value ++ x).collect

使用廣播變量能夠優化資源提升性能

廣播變量的優點:是由於不是每一個task一份變量副本,而是變成每一個節點的executor才一份副本。這樣的話,就可讓變量產生的副本大大減小。

廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的

BlockManager中,嘗試獲取變量副本;若是本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其餘

節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,

此後這個executor上的task,都會直接使用本地的BlockManager中的副本。

例如,50個executor,1000個task。一個map,10M:

默認狀況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集羣中,耗費10G的內存資源。

若是使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,並且不必定都是從Driver傳輸到每一個節點,還多是就近從最近的

節點的executor的bockmanager上拉取變量副本,網絡傳輸速度大大增長;500M,大大下降了內存消耗。

相關文章
相關標籤/搜索