Spark編程指南分享

 

轉載自:https://www.2cto.com/kf/201604/497083.htmlhtml

一、概述

在高層的角度上看,每個Spark應用都有一個驅動程序(driver program)。驅動程序就是運行用戶的main主程序並在集羣上執行各類並行操做的程序。Spark中的一個主要的抽象概念就是彈性分佈數據集(resilient distributed dataset,RDD),RDD是分佈在多個節點構成的集羣上的元素的集合,並支持並行操做。RDD能夠由Hadoop的分佈式文件系統(或其餘支持Hadoop分佈式系統的文件系統)中的文件建立,也能夠經過在驅動程序中的Scala集合建立,同時,對一個RDD進行轉換操做(transform)也能夠建立一個新的RDD。用戶也能夠將RDD存儲在內存中,這樣RDD就能夠在後序的並行操做中高效地重複使用。最後,RDD可以從某個節點的實效中進行恢復。java

Spark的第二個抽象概念就是共享變量(shared variables)。共享變量應用於並行操做中。默認狀況下,當Spark在幾個節點構成的集羣上並行執行一系列任務時,Spark會攜帶函數中使用的每個變量到每個任務中。有時,一些變量須要在幾個任務間共享,或者在任務和驅動程序間共享。Spark支持兩種共享變量:將數據緩存中全部節點中的廣播變量(broadcast variables),和只能增長的累加器(accumulators),好比計數器和總和(sums)。算法

這裏我只使用了Scala的版本。Spark還支持Java和Python。若是打開Spark的交互式腳本,很容易理解這個知道的內容。shell

二、鏈接Spark

Spark 1.6.1使用Scala 2.10版本。若是使用Scala編寫Spark應用,應該使用兼容的版本(好比2.10.x)。apache

在Intellij Idea搭建Spark開發環境中介紹了使用Idea+Maven搭建Spark開發環境。若是編寫Spark應用,應該添加Spark的依賴,具體的信息以下:數組

 
 
  1. groupId = org.apache.spark
  2. artifactId = spark-core_2.10
  3. version = 1.6.1 
 

一樣,若是使用HDFS的分佈式文件系統,也要添加hadoop-client的依賴:緩存

 
 
  1. groupId = org.apache.hadoop
  2. artifactId = hadoop-client
  3. version = <your-hdfs-version></your-hdfs-version> 
 

最後,須要在Scala中添加以下import語句:安全

 
 
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkConf 

 這裏要注意的是,在Spark 1.3.0版本之前,Scala中須要顯示添加import org.apache.spark.SparkContext._來使用重要的隱式轉換。不過1.3.0之後的版本就不須要了。markdown

三、初始化Spark

編寫Spark的第一件事就是建立SparkContext對象,來告訴Spark如何使用一個集羣。建立SparkContext對象須要使用SparkConf對象,這個對象包含一些關於應用程序的信息。網絡

注意,每個JVM上只能有一個活躍的SparkContext對象。因此,必須調用stop()來終止SparkContext對象才能建立另外一個新的SparkContext對象。

 
 
  1. val conf = new SparkConf().setAppName(appName).setMaster(master)
  2. new SparkContext(conf) 
 

其中,appName參數是應用程序在集羣中的名字。master參數指定主節點的位置,它能夠是一個Spark,Mesos或者YARN集羣的URL地址,也可使用本地模式的「local」。通常來講,當應用程序運行在集羣上時,在代碼上硬編碼master並不方便,而是在使用spark-submti提交應用的時候使用參數指定master。對於本地測試來講,可使用「local」來運行Spark程序。

3.一、使用交互式shell

Spark提供了交互式的shell,在這個交互式shell中,已經建立了一個SparkContext對象,這個變量就是sc,不用建立直接使用便可,本身建立的反而不能用。可使用--master參數指定sc連接到哪一個master,還有不少參數能夠選擇,這裏給出幾個例子。

下面使用本地模式4個核:

 
 
  1. ./bin/spark-shell --master local[4] 
 

下面使用--jar指定了要運行的jar包:

 
 
  1. ./bin/spark-shell --master local[4] --jars code.jar 

 下面使用--packages添加了依賴:

 
 
 
  1. ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" 
 

上面僅僅是一些例子,Spark還有不少啓動參數,能夠運行spark-shell --help得到更多的信息。下面是master可選的值:

 

Master URLs
Master URL 含義
local 在本地運行Spark而且只有一個worker線程(也就是說沒有並行)
local[K] 在本地運行Spark並使用K個worker線程(基本上設置K值爲本地機器的核心數)
local[*] 在本地運行Spark,而且使用本地機器儘量多的核心數
spark://HOST:PORT 鏈接到給定的Spark standalone集羣上。端口號必須是主節點設置使用的端口號,默認使用7077
mesos://HOST:PORT 鏈接到給定的Mesos集羣。端口號默認使用5050
yarn 鏈接到yarn集羣,能夠經過--deploy-mode參數設置使用client或cluster兩種模式
yarn-client 和使用--deploy-mode client鏈接到yarn等價
yarn-cluster 和使用--deploy-mode cluster鏈接到yarn等價

 

四、彈性分佈數據集(RDD)

Spark的一個核心抽象概念就是彈性分佈數據集(resilient distributed dataset,RDD)。RDD是一個能夠並行操做的可容錯的元素集合。有兩種方法能夠建立一個RDD:將驅動程序中已存在的集合進行並行化操做,或者從外部存儲系統中建立RDD。事實上,還能夠經過對已有的RDD進行轉化操做建立一個新的RDD。

4.一、對集合序列化

能夠經過調用SparkContext對象的parallelize方法把一個在驅動程序中已存在的集合序列化爲RDD。集合中的元素會被複製爲分佈式數據集來支持並行計算。下面的例子將一個1到5的數組序列化操做爲一個RDD:

 
 
  1. val data = Array(1, 2, 3, 4, 5)
  2. val distData = sc.parallelize(data) 
 

RDD一旦建立,就能夠並行操做。例如,可使用下面的操做計算全部元素的和:

 
 
  1. val sum=distData.reduce((a,b)=>a+b) 

 稍後會介紹RDD的一些操做。

 

在將一個集合序列化時一個重要的參數就是partitions,也就是說要將這個集合分紅幾個部分。Spark會對每一個部分執行一個任務來達到並行操做的效果。通常來講,集羣中的每一個CPU分配2到4個部分。一般,Spark會基於集羣的配置自動設置這個值。然而,用戶也能夠本身設置這個值:

 
 
  1. val distData=sc.parallelize(data,10) 
 
 

這樣,就把data分爲10個部分。

 

4.二、外部數據集

Spark能夠經過任何支持Hadoop的存儲系統建立分佈式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持文本文件,SequenceFiles,和任何實現了Hadoop InputFormat接口的文件。

文本文件的RDD能夠經過SparkContext對象的textFile方法建立。這個方法傳遞一個URI參數來指定文件,URI參數可使本地文件路徑或者hdfs://等,而後將文件讀取爲行的集合。例子以下:

 
 
  1. scala> val distFile = sc.textFile("data.txt")
  2. distFile: RDD[String] = MappedRDD@1d4cee08 
 

一旦建立,distFile就能夠進行數據集操做。例如,下面的代碼使用map和reduce操做計算全部行的大小:

 
 
  1. distFile.map(=> s.length).reduce((a, b) => a + b) 

 

 

使用Spark讀取文件時的一些注意事項:

若是傳遞本地文件的路徑,那麼這個文件也必須在集羣的全部worker節點中的相同路徑上。或者將這個文件複製到全部worker上的相同路徑上;Spark中全部關於文件輸入的方法,包括textFile,都支持目錄,壓縮文件和通配符。好比textFile("/my/directory"),textFile("/my/directory/*.txt"), 和textFile("/my/directory/*.gz");textFile方法也能夠經過第二個參數來指定這個文件被分爲幾個部分。默認狀況下,文件基於塊來進行劃分(在HDFS中塊默認爲64MB或128MB,我這裏是128MB)。不過能夠指定的分區數比塊數多,但不能比塊數少;

除了文本文件,Spark的Scala API還支持另外的數據格式:

SparkContext.wholeTextFiles容許讀取一個目錄下全部的小文本文件,而後返回(文件名,內容)鍵值對。這和textFile返回文件中的全部行不一樣;對於SequenceFiles,可使用SparkContext對象的sequenceFile[K,V]方法,其中K和V是文件中鍵和值的類型。這些文件必須是實現Hadoop的Writable接口的類,好比IntWritable和Text。並且,Spark也容許使用基本類型,好比sequenceFile[Int,String]就會自動讀取IntWritable和Text;對於其它的Hadoop輸入格式,可使用SparkContext.hadoopRDD方法;RDD.saveAsObjectFile和SparkContext.objectFiles方法支持將一個RDD中的Java對象序列化爲對象文件。不過這個方法並不像Avro那樣高效;

4.三、RDD操做

RDD支持兩種類型的操做:轉換操做(transformations)和行動操做(actions)。轉換操做從一個已有的RDD建立一個新的RDD;行動操做對這個RDD數據集進行一系列運算後返回驅動程序一個結果。區分兩種操做的辦法就是看返回結果的類型,若是返回的是一個RDD,那麼就是轉換操做,不然就是行動操做。好比,map就是一個轉換操做,它把數據集中的每個元素都調用一個函數,將結果做爲新的RDD的元素。而reduce就是一個行動操做,它對數據集的全部元素調用一個聚合函數,而後把最終結果返回驅動程序(儘管還存在一個並行的方法reduceByKey返回一個分佈數據集)。

Spark中全部的轉換操做都是惰性求值的。所謂惰性求值,是說並不立刻計算結果,而僅僅記住對這個RDD進行的轉換操做序列。只有當對這個RDD調用一個須要返回給驅動程序一個結果的行動操做時才計算結果。Spark的這個設計使得程序運行得更有效。好比,使用map方法建立的一個RDD極可能調用reduce來計算結果並將結果返回個驅動程序,惰性求值使得只需給驅動程序返回reduce計算的結果,不然要返回一個map操做建立的RDD,顯然這個代價太大了。

默認狀況下,對經過轉換操做造成的RDD執行行動操做時都會從新計算這個RDD。這種狀況下,能夠將RDD經過persist或者cache方法存儲在內存中,Spark會把數據集中的元素存在集羣中的全部節點上,這樣下次計算的時候就能夠快速獲得結果。一樣,Spark也能夠把RDD存儲在磁盤上,或者在多個節點間進行復制。

4.3.一、基礎

下面的程序給出了RDD的基礎操做:

 
 
  1. val lines = sc.textFile("data.txt")
  2. val lineLengths = lines.map(=> s.length)
  3. val totalLength = lineLengths.reduce((a, b) => a + b) 
 

第一行經過SparkContext對象的textFile讀取文件建立了一個RDD變量lines,lines數據集並不會加載到內存中或採起其它行動,它僅僅是一個指向文件的指針。第二行定義了一個變量lineLengths保存map操做的結果,這是一個轉換操做,所以lineLengths也是一個RDD,map方法須要一個函數參數,對象Spark傳遞一個參數後面會介紹,這裏的函數參數將每一行映射爲一個數值,這個數值就是這一行的長度。注意,因爲Spark的惰性求值程序進行到這裏並無計算lineLengths。最終,對lineLengths調用reduce方法,這是一個行動操做。這時,Spark將計算分解爲多個任務運行在集羣的多個機器上,而後每一個機器執行本身那部分的map和reduce操做,計算完後將本身這部分的結果返回給驅動程序。

 

若是在後序的操做中還會用到lineLengths,就能夠將它存儲在內存中:

 
 
  1. lineLengths.persist() 
 

這樣,下一次調用reduce的時候就不用再計算了。

 

4.3.二、給Spark傳遞函數

Spark的API嚴重依賴從驅動程序傳遞函數給集羣。有兩種方式來傳遞函數:

 

匿名函數語法,能夠減小代碼;在一個單獨的object中定義一個函數。好比,能夠定義一個object MyFunctions,而後傳遞MyFunctions:

 
 
  1. object MyFunctions {
  2.   def func1(s: String): String = { ... }
  3. }
  4.  
  5. myRdd.map(MyFunctions.func1) 

 

 

 

注意還能夠傳遞一個類實例的方法引用(和object相反),不過這須要把包含這個類的對象同這個方法傳遞過去。好比,考慮下面的代碼:

 
 
  1. class MyClass {
  2.   def func1(s: String): String = { ... }
  3.   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  4. } 
 

這裏,若是咱們建立一個新的MyClass實例而後調用doStuff方法,裏面的map引用這個MyClass實例裏面的func1方法,因此整個對象須要被傳遞到集羣中。這和rdd.map(x => this.func1(x))類似。

 

一樣,若是訪問外部屬性也須要傳遞整個對象:

 
 
  1. class MyClass {
  2.   val field = "Hello"
  3.   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(=> field + x) }
  4. } 
 

這和rdd.map(x => this.field + x)類似。爲了不這樣,能夠將屬性複製到方法內部而不是在外部訪問:

 
 
  1. def doStuff(rdd: RDD[String]): RDD[String] = {
  2.   val field_ = this.field
  3.   rdd.map(=> field_ + x)
  4. } 

 

 

4.3.三、理解閉包

Spark的一個難點就是理解在集羣執行代碼時變量和方法的做用域和聲明週期。對在RDD做用域外部的變量進行修改操做很讓人迷惑。在下面的例子中,咱們考察使用foreach()方法增長的代碼,不過一樣的事情也會發生在其它的操做中。

4.3.3.一、一個例子

考慮下面的RDD元素相加的例子,這個例子是否運行在同一個JVM上會產生不一樣的結果。好比,運行在本地模式(--master=local[n])和將Spark應用部署到集羣上(spark-submit):

 
 
  1. var counter = 0
  2. var rdd = sc.parallelize(data)
  3.  
  4. // Wrong: Don't do this!!
  5. rdd.foreach(=> counter += x)
  6.  
  7. println("Counter value: " + counter) 
 


4.3.3.二、本地模式與集羣模式

上面代碼的行爲是不肯定的,可能不會獲得預期結果。執行工做(job)時,Spark會把RDD的操做分解爲多個任務(task),每個任務由一個執行者(executor)執行。在執行以前,Spark會計算任務的閉包(closure)。閉包就是執行者在對RDD進行本身那部分計算時須要可見的變量和方法(在這個例子中是foreach)。這些閉包會被序列化並傳遞到每一個執行者上。

傳遞到每一個執行者上的閉包中的變量如今被賦值,如今foreach方法中的counter再也不是驅動程序中的counter。在驅動節點的內存中仍然有一個counter,但這個counter並不能被執行者訪問到,執行者只能訪問到在閉包的序列化中複製到執行者上的counter。所以,counter的最終結果仍然是0,由於對counter的全部操做都是在閉包序列化中的counter值。

在本地模式,一些狀況下foreach方法會像驅動程序同樣運行在同一個JVM上,所以會引用原始的counter,並正確更新它的值。

在這種狀況下爲了寫出良好行爲的代碼,應該使用累加器(accumulator)。Spark中的累加器在特殊狀況下使用,它提供一種在集羣中執行被分解的狀況下安全更新變量值的機制。累加器的細節會在這個指導的累加器部分討論。

通常的,閉包,不該該被用來改變全局狀態。Spark不定義也不保證對從閉包外部引用的對象的作出改變的行爲。一些代碼可能在本地模式下可以運行,不過這僅僅是巧合,在分佈式模式下就不會獲得指望的結果。若是須要全局的聚合操做就使用累加器。

4.3.3.三、打印RDD中的元素

另外一個常見用法就是使用rdd.foreach(println)或rdd.map(println)方法打印RDD中的元素。在一臺機器上,這個操做會獲得預期的結果,打印出RDD中的全部元素。然而在集羣模式下,執行者調用的打印方法會在執行者的標準輸出(stdout)上打印結果,而不是驅動程序的標準輸出上,所以驅動程序的標準輸出上並無結果。若是要將結果打印在驅動程序的標準輸出上,須要使用collect()方法先使執行者將各自的RDD部分返回給驅動程序,而後調用foreach或map,即:

 
 
  1. rdd.collect().map(println) 
 

不過,這樣可能會耗盡驅動程序的內存,由於大多數狀況下RDD很大。若是隻想查看RDD中的部分元素 ,可使用take()方法:

 
 
  1. rdd.take(100).foreach(println) 

這樣只取RDD中的100個元素。

 

4.3.四、鍵值對操做

儘管Spark中的大多數操做都支持任何類型的RDD,不過Spark爲包含鍵值對類型的RDD提供了一些特殊的操做。最多見的操做就是「混洗」(shuffle),好比根據鍵來對元素進行分組或聚合。

在Scala中,這些操做自動支持包含Tuple2(Scala中內置的元組類型,可使用(a,b)來建立)對象的RDD。

好比,下面的代碼使用reduceByKey方法操做鍵值對RDD來計算每一行出現的次數:

 
 
  1. val lines = sc.textFile("data.txt")
  2. val pairs = lines.map(s =>(s,1))
  3. val counts = pairs.reduceByKey((a, b)=> a + b) 
 

第一行,使用textFile方法從文本文件建立一個RDD變量lines,第二行使用map操做,將lines中的每個元素(即一行)映射爲一個元組(line,1),構成一個鍵值對RDD,即pairs,第三行調用行動操做reduceByKey,根據鍵進行reduce操做,將鍵相同的值相加,就獲得了每一行元素出現的次數。

 

咱們也可使用counts.sortByKey()來對結果進行排序,還可使用counts.collect()將結果以數組的形式返回給驅動程序。

注意:當在鍵值對中的鍵使用自定義類型時,必須保證這個自定義類型的equals方法和hashCode方法匹配。也就是說,若是兩個自定義類型的變量a,b的hashCode方法的返回值相同,那麼a.equals(b)也必定返回true。

4.3.五、轉換操做

下表列出了Spark支持的轉換操做。具體的細節能夠查看RDD的API文檔(包括Scala,Java,Python,R),和鍵值對RDD的函數文檔(Scala和R):

 

轉換操做 含義
map(func) 對RDD中的每一個元素調用func函數,而後返回結果構成新的RDD
filter(func) 返回一個由經過傳給filter的函數的元素組成的RDD
flatMap(func) 將函數應用於RDD中的每個元素,將返回的迭代器的全部內容構成新的RDD
mapPartitions(func) 和map相似,不過運行在RDD的不一樣分塊上,所以func的類型必須是Iterator=>Iterator
mapPartitionsWithIndex(func) 和mapPartitions相似,不過func函數提供一個整數值表示分塊的下標,因此函數的類型是(Int,Iterator=>Iterator)
sample(withReplacement,fraction,seed) 對RDD採樣,以及是否替換
union(otherDataset) 生成一個包含兩個RDD中全部元素的RDD
intersection(otherDataset) 返回由兩個RDD中共同元素組成的RDD
distinct([numTasks]) 返回去除原RDD中重複元素的新的RDD
groupByKey([numTasks]) 對具備相同鍵的值進行分組。注意若是僅僅是爲了聚合,使用reduceByKey或aggregateByKey性能更好
reduceByKey(func,[numTasks]) 合併既有相同鍵的值
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) 和reduceByKey相似,不過須要提供一個初始值
sortByKey([ascending],[numTasks]) 返回一個根據鍵排序的RDD
join(otherDataset,[numTasks]) 對兩個RDD進行內鏈接。其它的鏈接操做還有leftOuterJoin,rightOuterJoin和fullOuterJoin
cogroup(otherDataset,[numTasks]) 也叫groupWith,對類型(K,V)和(K,W)的RDD進行操做,返回(K,(Iterable,Iterable))類型的RDD
cartesian(otherDataset) 對類型T和U的RDD進行操做,返回(T,U)類型的RDD
pipe(command,[envVars]) 將RDD的每一個分區經過管道傳給一個shell腳本
coalesce(numPartitions) 減小RDD的分區數量。當對一個大的RDD執行filter操做後使用會有效
repartition(numPartitions) 對RDD從新分區
repartitionAndSortWithinPartitions(partitioner) 根據給定的partitioner對RDD從新分區,在每一個分區再根據鍵排序

4.3.六、行動操做

下面列出了RDD的行動操做:

 

 

行動操做 含義
reduce(func) 使用func函數並行整合RDD中的全部元素
collect() 返回RDD中的全部元素
count() 返回RDD中的元素個數
first() 返回RDD中的第一個元素
take(n) 返回RDD中的n個元素
takeSample(withReplacement,num,[seed]) 從RDD中返回任意一些元素,結果不肯定
takeOrdered(n,[ordering]) 從RDD中按照提供的順序返回最前面的n個元素
saveAsTextFile(path) 將RDD中的元素寫入文本文件,Spark會調用元素的toString方法
saveAsSequenceFile(path) 將RDD中的元素保存爲Hadoop的SequenceFile文件
saveAsObjectFile(path) 將RDD中的元素使用Java中的序列化保存爲對象文件,可使用SparkContext.objectFile()讀取
countByKey() 操做鍵值對RDD,根據鍵值分別計數
foreach(func) 對RDD中的每一個元素調用給定的函數func

 

4.3.七、混洗操做(Shuffle)

在Spark中,一些操做會觸發一個叫作混洗(shuffle)的事件。混洗是Spark的機制,經過對數據進行從新分組使得同一組的在同一個分區。這一般會致使在執行者和機器之間數據的複製與傳遞,所以混洗操做是一個複雜並消耗性能的操做。

4.3.7.一、背景

咱們以reduceByKey操做來理解混洗操做期間發生了什麼。reduceByKey操做會生成一個新的RDD,原鍵值對RDD中的全部元素根據鍵的不一樣分組後使用reduce操做獲得一個結果,由全部的鍵和這個結果構成的鍵值對元素構成了這個新的RDD。問題在於並非每全部鍵相同的元素都在同一個分區上,甚至不在同一個機器上,但爲了計算結果,它們必須從新存儲到同一個位置。

在Spark中,數據通常不會爲了某個操做而具體地根據須要進行分組存儲。在計算過程當中,每個執行者對本身的分區進行計算,所以,爲了給執行者組織對應的分區,Spark須要執行一個滿射操做來從新組織數據。Spark必須讀取全部的分區來獲得全部鍵,而後對每一個鍵將鍵相同的元素組織到一塊兒執行reduce操做來計算結果。這就是混洗。

儘管通過混洗後每一個分區的元素集合分區自己都是肯定的,可是元素的順序不肯定。若是要是數據具備肯定的順序,可使用下面的混洗方法:

 

使用mapPartitions來對分區排序;使用reparttitionAndSortWithinPartitions高效的在重分區的同時排序分區;使用sortBy排序一個RDD;

 

能夠致使混洗的操做有重分區操做好比reparation和coalesce,ByKey操做(除了計數counting)好比groupByKey和reduceByKey,還有鏈接操做(join)好比cogroup和join。

4.3.7.二、性能影響

因爲涉及到磁盤I/O,數據序列化和網絡I/O,因此混洗操做性能消耗較大。爲了從新組織數據,Spark會產生一些map任務來組織數據,一些reduce任務來進行聚合。這一名稱來自於MapRedece,但並不直接和Spark的map和reduce操做相關。

本質上,map任務的結果會存在內存中直到存不下爲止。而後,這些結果根據所在的分區進行排序,寫入單一的文件中。在reduce階段,任務會讀取這些排好序的相關分塊。

一些混洗操做會消耗大量的堆空間,由於它們在轉化記錄以前或以後會之內存數據結構組織記錄。具體來講,reduceByKey和aggregateByKey在map階段構造這些數據結構,而後ByKey系列操做在reduce階段生成數據。當內存中存不下這些數據時,Spark會將這些數據存到磁盤中,致使額外的磁盤I/O並增長垃圾收集。

混洗也會在磁盤上產生大量的中間數據。在Spark 1.3中,這些數據會一直保存到相關的RDD不會再次使用,而後被當作垃圾收集。這對於操做譜系還會從新計算的時候是有益的。若是應用常用這些RDD或者垃圾收集機制沒能常常收集,垃圾收集會通過很長一段時間才發生。這意味着長時間運行的應用會佔用大量的磁盤空間。在建立SparkContext時可使用spark.local.dir來指定這個臨時存儲路徑。

經過調整各類參數配置能夠設置混洗的行爲。這會在Spark配置裏介紹。

4.四、RDD的持久化(緩存)

Spark中的一個重要特徵就是在操做過程當中將數據集緩存在內存中。當緩存一個RDD後,計算RDD的節點會分別保存它們所求出的分區數據,而後在隨後的操做中重複使用。這使得後序的操做執行的更快(一般快10幾倍)。持久化是迭代式算法和快速交互式使用的關鍵。

可使用persist或cache方法持久化一個RDD。當對一個RDD第一次執行行動操做時,RDD會保存在節點的內存中。Spark的緩存是可容錯的,意味着若是某個分區丟失了,RDD會自動根據在建立這個RDD時的轉換操做從新計算。

並且,每個持久化的RDD可使用不一樣的持久化級別,容許將數據持久化到磁盤,做爲Java序列化對象存儲在內存,在節點中備份,或者存在堆外空間上。這些持久化級別能夠經過傳遞一個StorageLevel對象給persist方法。cache方法只能使用默認的StorageLevel.MEMORY_ONLY這一個持久化級別。下面是全部的級別:

 

持久化級別 含義
MEMORY_ONLY 在JVM上存儲非序列化的Java對象。若是內存不夠,一些分區不會存儲,直到須要的時候從新計算。這是默認的級別。
MEMORY_AND_DISK 若是內存不夠,會把剩餘的分區存儲在磁盤上。
MEMORY_ONLY_SER 存儲序列化的Java對象。這通常比序列化空間效率高,不過讀取的時候消耗CPU較多。
MEMORY_AND_DISK_SER 和上一個類似,不過若是內存不夠會存儲到磁盤上,內存中存放序列化後的數據。
DISK_ONLY 僅存儲在磁盤上。
MEMORY_ONLY_2,MEMORY_AND_DISK_2 和上一個類似,只不過會在集羣中的兩個節點上備份。
OFF_HEAP(實驗中)  

 

注意,在Python中,會始終序列化要存儲的數據,因此持久化級別默認值就是以序列化後的對象存儲在JVM堆空間中。

Spark會在混洗操做(好比reduceByKey)中自動持久化一些中間數據,儘管用戶並無調用persist。這就避免了在混洗過程當中若是某個節點發生故障而從新計算整個數據集。咱們仍然建議若是打算重複使用RDD就使用persist對其進行持久化。

4.4.一、選擇哪一個持久化級別呢?

Spark的持久化級別是爲了提供在內存使用和CPU效率間不一樣的平衡選擇。咱們建議經過如下步驟進行級別選擇:

 

若是RDD適合默認的級別(MEMORY_ONLY),那麼就使用默認值。這是CPU效率最高的選項,使得對RDD的操做盡量的快;若是RDD不適合MEMORY_ONLY,嘗試使用MEMORY_ONLY_SER,而後選擇一個快的序列化庫對數據進行序列化來高效使用空間,不過訪問仍是很快;除非計算數據集的函數很是耗時,或者這些函數過濾掉大多數的數據,不然不要將數據持久化到磁盤上。否則,從新計算數據可能會和從磁盤中讀取同樣快;若是想出錯時儘快恢復,就是用備份。全部的級別都使用從新計算保證容錯性,但備份級別能夠保證程序繼續執行而不用等待從新計算丟失的分區;在有大量內存空間和多應用程序的實驗中,實驗中的OFF_HEAP模式有以下的優勢:容許多個執行者能共享Tachyon的內存池;有效的下降了垃圾收集的消耗;若是單個執行者發生故障緩存的數據不會丟失;

 

4.4.二、刪除數據

Spark會自動跟蹤每一個節點的緩存使用狀況,而且會根據最近最少使用原則(LRU)將最老的分區從內存中刪除。若是想手動刪除數據,使用unpersist方法。

五、共享變量

一般,當一個傳遞給一個Spark操做(好比map或reduce)的函數執行在遠程的集羣節點上時,它是對函數中使用的變量的另外一份副本進行操做的。這些變量會被複制到每個機器上,而且全部對這些變量的更新不會返回到驅動程序那裏。在任務間支持通用的、讀寫共享的變量並不有效。然而,Spark提供兩種經常使用形式的有限類型的共享變量:廣播變量(broadcast)和累加器(accumulators)。

5.一、廣播變量

廣播變量容許開發者在每一個機器上緩存一個只讀變量而不是把它在任務間複製。它們給每個節點一個大規模數據的一個副本,並經過高效的方式完成。Spark也會試圖使用更好的廣播算法來分佈式存儲廣播變量來減小網絡流量。

Spark的行動操做在一系列階段(stage)執行,經過混洗操做進行分割。Spark會自動廣播每一個階段任務都須要的數據。這些數據以序列化的形式緩存而後再每一個任務使用以前反序列化。這意味着只有當任務須要在跨多個階段執行過程當中使用同一個數據時,或者以反序列化緩存數據是重要的時候,顯示廣播數據纔有用。

能夠經過使用SparkContext.broadcast()方法來對變量v建立一個廣播變量。廣播變量將v包裹起來,能夠經過調用value方法獲取v。下面的代碼演示了廣播變量的用法:

 
 
  1. scala> val broadcastVar = sc.broadcast(Array(1,2,3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)
  3.  
  4. scala> broadcastVar.value
  5. res0:Array[Int]=Array(1,2,3) 
 


當廣播變量建立後,在集羣中全部使用變量v的方法都應該使用這個廣播變量,所以v不會被複制到節點超過一次。並且,對象v在廣播以後不該該改變,這樣全部的節點都會得到廣播變量中相同的值。

 

5.二、累加器

累加器就是在相應操做中只能增長的變量,所以能更有效的支持並行操做。累加器能夠用來做爲計數器和求和。Spark支持數字類型的累加器,開發者能夠增長新類型的支持。若是累加器在建立時設置了一個名字,那麼名字就會在Spark的UI中顯示。這對於理解階段的執行過程有必定的幫助。

能夠經過調用SparkContext.accumulator(v)對變量v建立一個累加器。以後集羣上的任務就能夠經過add方法或+=操做符增長累加器。然而,任務並不能讀取累加器的值。只用驅動程序才能夠經過value方法讀取累加器的值。

下面的代碼演示了使用累加器計算數組的和:

 
 
  1. scala> val accum = sc.accumulator(0,"My Accumulator")
  2. accum: spark.Accumulator[Int]=0
  3.  
  4. scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)
  5. ...
  6. 10/09/2918:41:08 INFO SparkContext:Tasks finished in0.317106 s
  7.  
  8. scala> accum.value
  9. res2:Int=10 
 


上面的代碼使用內置支持的Int類型建立了累加器,開發者能夠經過實現AccumulatorParam接口來對本身的類型增長累加器支持。AccumulatorParam接口有兩個方法:zero方法提供一個本身的類型的零值,addInPlace方法定義兩個值相加的操做。好比,假設咱們有一個能夠表明數學上向量的類型Vector,能夠這樣寫:

 
 
  1. objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{
  2.   def zero(initialValue:Vector):Vector={
  3.     Vector.zeros(initialValue.size)
  4.   }
  5.   def addInPlace(v1:Vector, v2:Vector):Vector={
  6.     v1 += v2
  7.   }
  8. }
  9.  
  10. // Then, create an Accumulator of this type:
  11. val vecAccum = sc.accumulator(newVector(...))(VectorAccumulatorParam) 

 

 


Scala中,Spark也支持更經常使用的A吃醋姆拉不了接口來累加那些結果類型和元素類型不一樣的數據(好比經過收集數據構成一個list列表),還有一個SparkContext.accumulableCollection方法累加常見的Scala集合類型。

 

對累加器來講,更新老是在行動操做中執行,Spark保證每個任務對累加器的更新只有一次,好比重啓的任務不會更新這個值。在轉換操做中,用戶應該意識到若是任務或者job階段重複執行,那每一個任務的更新操做可能執行屢次。

累加器並無改變Spark的惰性求值策略。若是一個RDD的一個累加器被更新了,RDD在行動操做中累加器的值只更新一次。所以,當在惰性的轉換操做好比map中,累加器的更新並不能保證會執行。下面的代碼片斷演示了這個屬性:

 
 
  1. val accum = sc.accumulator(0)
  2. data.map { x => accum += x; f(x)}
  3. // Here, accum is still 0 because no actions have caused the map to be computed. 
 

六、部署到集羣上

 

在應用提交指導中介紹如何提交應用到集羣上。簡單來講,一旦應用打包成jar文件,spark-submit能夠將你的應用部署到任何集羣上。

七、更多

在Spark的網站上有一些Spark應用的例子。並且,在Spark的examples目錄下也有一些程序實例。你能夠經過run-example腳本運行例子:

 
 
  1. ./bin/run-example SparkPi
相關文章
相關標籤/搜索