Spark編程指南《Spark 官方文檔》

《Spark 官方文檔》Spark編程指南

spark-1.6.0 [原文地址]html

Spark編程指南

概述

整體上來講,每一個Spark應用都包含一個驅動器(driver)程序,驅動器運行用戶的main函數,並在集羣上執行各類並行操做。java

Spark最重要的一個抽象概念就是彈性分佈式數據集(resilient distributed dataset – RDD),RDD 是一個可分區的元素集合,其包含的元素能夠分佈在集羣各個節點上,而且能夠執行一些分佈式並行操做。RDD一般是經過,HDFS(或者其餘Hadoop支 持的文件系統)上的文件,或者驅動器中的Scala集合對象,來建立或轉換獲得;其次,用戶也能夠請求Spark將RDD持久化到內存裏,以便在不一樣的並 行操做裏複用之;最後,RDD具有容錯性,能夠從節點失敗中自動恢復數據。python

Spark第二個重要抽象概念是共享變量,共享變量是一種能夠在並行操做之間共享使用的變量。默認狀況下,當Spark把一系列任務 調度到不一樣節點上運行時,Spark會同時把每一個變量的副本和任務代碼一塊兒發送給各個節點。但有時候,咱們須要在任務之間,或者任務和驅動器之間共享一些 變量。Spark提供了兩種類型的共享變量:廣播變量累加器,廣播變量能夠用來在各個節點上緩存數據,而累加器則是用來執行跨節點的「累加」操做,例如:計數和求和。git

本文將會使用Spark所支持的全部語言來展現Spark的特性。若是你能啓動Spark的交互式shell動手實驗一下,效果會更好(對scala請使用bin/spark-shell,而對於python,請使用bin/pyspark)。github

 

連接Spark

Spark 1.6.0 使用了Scala 2.10。用Scala寫應用的話,你須要使用一個兼容的Scala版本(如:2.10.X)web

同時,若是你須要在maven中依賴Spark,能夠用以下maven工件標識:算法

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0

另外,若是你須要訪問特定版本的HDFS,那麼你可能須要增長相應版本的hadoop-client依賴項,其maven工件標識以下:shell

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後,你須要以下,在你的代碼裏導入一些Spark class:apache

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在Spark 1.3.0以前,你須要顯示的 import org.apache.spark.SparkContext._ 來啓用這些重要的隱式轉換)編程

初始化Spark

Spark應用程序須要作的第一件事就是建立一個 SparkContext 對象,SparkContext對象決定了Spark如何訪問集羣。而要新建一個SparkContext對象,你還得須要構造一個 SparkConf 對象,SparkConf對象包含了你的應用程序的配置信息。

每一個JVM進程中,只能有一個活躍(active)的SparkContext對象。若是你非要再新建一個,那首先必須將以前那個活躍的SparkContext 對象stop()掉。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName參數值是你的應用展現在集羣UI上的應用名稱。master參數值是Spark, Mesos or YARN cluster URL 或者特殊的「local」(本地模式)。實際上,通常不該該將master參數值硬編碼到代碼中,而是應該用spark-submit腳本的參數來設置。然而,若是是本地測試或單元測試中,你能夠直接在代碼裏給master參數寫死一個」local」值。

使用shell

在Spark shell中,默認已經爲你新建了一個SparkContext對象,變量名爲sc。因此spark-shell裏不能自建SparkContext對 象。你能夠經過–master參數設置要鏈接到哪一個集羣,並且能夠給–jars參數傳一個逗號分隔的jar包列表,以便將這些jar包加到 classpath中。你還能夠經過–packages設置逗號分隔的maven工件列表,以便增長額外的依賴項。一樣,還能夠經過 –repositories參數增長maven repository地址。下面是一個示例,在本地4個CPU core上運行的實例:

$ ./bin/spark-shell –master local[4]

或者,將code.jar添加到classpath下:

$ ./bin/spark-shell --master local[4] --jars code.jar

經過maven標識添加依賴:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

spark-shell –help能夠查看完整的選項列表。實際上,spark-shell是在後臺調用spark-submit來實現其功能的(spark-submit script.)

彈性分佈式數據集(RDD)

Spark的核心概念是彈性分佈式數據集(RDD),RDD是一個可容錯、可並行操做的分佈式元素集合。整體上有兩種方法能夠建立RDD對象:由驅 動程序中的集合對象經過並行化操做建立,或者從外部存儲系統中數據集加載(如:共享文件系統、HDFS、HBase或者其餘Hadoop支持的數據源)。

並行化集合

並行化集合是以一個已有的集合對象(例如:Scala Seq)爲參數,調用 SparkContext.parallelize() 方法建立獲得的RDD。集合對象中全部的元素都將被複制到一個可並行操做的分佈式數據集中。例如,如下代碼將一個1到5組成的數組並行化成一個RDD:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦建立成功,該分佈式數據集(上例中的distData)就能夠執行一些並行操做。如,distData.reduce((a, b) => a + b),這段代碼會將集合中全部元素加和。後面咱們還會繼續討論分佈式數據集上的各類操做。

並行化集合的一個重要參數是分區(partition),即這個分佈式數據集能夠分割爲多少片。Spark中每一個任務(task)都 是基於分區的,每一個分區一個對應的任務(task)。典型場景下,通常每一個CPU對應2~4個分區。而且通常而言,Spark會基於集羣的狀況,自動設置 這個分區數。固然,你仍是能夠手動控制這個分區數,只需給parallelize方法再傳一個參數便可(如:sc.parallelize(data, 10) )。注意:Spark代碼裏有些地方仍然使用分片(slice)這個術語,這只不過是分區的一個別名,主要爲了保持向後兼容。

外部數據集

Spark 能夠經過Hadoop所支持的任何數據源來建立分佈式數據集,包括:本地文件系統、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其餘 Hadoop 支持的輸入格式(InputFormat)。

文本文件建立RDD能夠用 SparkContext.textFile 方法。這個方法輸入參數是一個文件的URI(本地路徑,或者 hdfs://,s3n:// 等),其輸出RDD是一個文本行集合。如下是一個簡單示例:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

建立後,distFile 就能夠執行數據集的一些操做。好比,咱們能夠把全部文本行的長度加和:distFile.map(s => s.length).reduce((a, b) => a + b)

如下是一些Spark讀取文件的要點:

  • 若是是本地文件系統,那麼這個文件必須在全部的worker節點上可以以相同的路徑訪問到。因此要麼把文件複製到全部worker節點上同一路徑下,要麼掛載一個共享文件系統。
  • 全部Spark基於文件輸入的方法(包括textFile)都支持輸入參數爲:目錄,壓縮文件,以及通配符。例如:textFile(「/my /directory」), textFile(「/my/directory/*.txt」), 以及 textFile(「/my/directory/*.gz」)
  • textFile方法同時還支持一個可選參數,用以控制數據的分區個數。默認地,Spark會爲文件的每個block建立一個分區(HDFS上默認block大小爲64MB),你能夠經過調整這個參數來控制數據的分區數。注意,分區數不能少於block個數。

除了文本文件以外,Spark的Scala API還支持其餘幾種數據格式:

  • SparkContext.wholeTextFiles 能夠讀取一個包含不少小文本文件的目錄,而且以 (filename, content) 鍵值對的形式返回結果。這與textFile 不一樣,textFile只返回文件的內容,每行做爲一個元素。
  • 對於SequenceFiles,能夠調用 SparkContext.sequenceFile[K, V],其中 K 和 V 分別是文件中key和value的類型。這些類型都應該是 Writable 接口的子類, 如:IntWritable and Text 等。另外,Spark 容許你爲一些經常使用Writable指定原生類型,例如:sequenceFile[Int, String] 將自動讀取 IntWritable 和 Text。
  • 對於其餘的Hadoop InputFormat,你能夠用 SparkContext.hadoopRDD 方法,並傳入任意的JobConf 對象和 InputFormat,以及key class、value class。這和設置Hadoop job的輸入源是一樣的方法。你還可使用 SparkContext.newAPIHadoopRDD,該方法接收一個基於新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat做爲參數。
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持將RDD中元素以Java對象序列化的格式保存成文件。雖然這種序列化方式不如Avro效率高,卻爲保存RDD提供了一種簡便方式。

RDD算子

RDD支持兩種類型的算子(operation):transformation算子 和 action算子;transformation 算子能夠將已有RDD轉換獲得一個新的RDD,而action算子則是基於數據集計算,並將結果返回給驅動器(driver)。例如,map是一個 transformation算子,它將數據集中每一個元素傳給一個指定的函數,並將該函數返回結果構建爲一個新的RDD;而 reduce是一個action算子,它能夠將RDD中全部元素傳給指定的聚合函數,並將最終的聚合結果返回給驅動器(還有一個reduceByKey算 子,其返回的聚合結果是一個數據集)。

Spark中全部transformation算子都是懶惰的,也就是說,這些算子並不當即計算結果,而是記錄下對基礎數據集(如: 一個數據文件)的轉換操做。只有等到某個action算子須要計算一個結果返回給驅動器的時候,transformation算子所記錄的操做纔會被計 算。這種設計使Spark能夠運行得更加高效 – 例如,map算子建立了一個數據集,同時該數據集下一步會調用reduce算子,那麼Spark將只會返回reduce的最終聚合結果(單獨的一個數據) 給驅動器,而不是將map所產生的數據集整個返回給驅動器。

默認狀況下,每次調用action算子的時候,每一個由transformation轉換獲得的RDD都會被從新計算。然而,你也能夠經過調用 persist(或者cache)操做來持久化一個RDD,這意味着Spark將會把RDD的元素都保存在集羣中,所以下一次訪問這些元素的速度將大大提 高。同時,Spark還支持將RDD元素持久化到內存或者磁盤上,甚至能夠支持跨節點多副本。

基礎

如下簡要說明一下RDD的基本操做,參考以下代碼:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

其中,第一行是從外部文件加載數據,並建立一個基礎RDD。這時候,數據集並無加載進內存除非有其餘操做施加於lines,這時候的lines RDD其實能夠說只是一個指向 data.txt 文件的指針。第二行,用lines經過map轉換獲得一個lineLengths RDD,一樣,lineLengths也是懶惰計算的。最後,咱們使用 reduce算子計算長度之和,reduce是一個action算子。此時,Spark將會把計算分割爲一些小的任務,分別在不一樣的機器上運行,每臺機器 上都運行相關的一部分map任務,並在本地進行reduce,並將這些reduce結果都返回給驅動器。

若是咱們後續須要重複用到 lineLengths RDD,咱們能夠增長一行:

lineLengths.persist()

這一行加在調用 reduce 以前,則 lineLengths RDD 首次計算後,Spark會將其數據保存到內存中。

將函數傳給Spark

Spark的API 不少都依賴於在驅動程序中向集羣傳遞操做函數。如下是兩種建議的實現方式:

  • 匿名函數(Anonymous function syntax),這種方式代碼量比較少。
  • 全局單件中的靜態方法。例如,你能夠按以下方式定義一個 object MyFunctions 並傳遞其靜態成員函數 MyFunctions.func1:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意,技術上來講,你也能夠傳遞一個類對象實例上的方法(不是單件對象),不過這回致使傳遞函數的同時,須要把相應的對象也發送到集羣中各節點上。例如,咱們定義一個MyClass以下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

若是咱們 new MyClass 建立一個實例,並調用其 doStuff 方法,同時doStuff中的 map算子引用了該MyClass實例上的 func1 方法,那麼接下來,這個MyClass對象將被髮送到集羣中全部節點上。rdd.map(x => this.func1(x)) 也會有相似的效果。

相似地,若是應用外部對象的成員變量,也會致使對整個對象實例的引用:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上面的代碼對field的引用等價於 rdd.map(x => this.field + x),這將致使應用整個this對象。爲了不相似問題,最簡單的方式就是,將field執拗到一個本地臨時變量中,而不是從外部直接訪問之,以下:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解閉包

Spark裏一個比較難的事情就是,理解在整個集羣上跨節點執行的變量和方法的做用域以及生命週期。Spark裏一個頻繁出現的問題就是RDD算子 在變量做用域以外修改了其值。下面的例子,咱們將會以foreach() 算子爲例,來遞增一個計數器counter,不過相似的問題在其餘算子上也會出現。

示例

考慮以下例子,咱們將會計算RDD中原生元素的總和,若是不是在同一個JVM中執行,其表現將有很大不一樣。例如,這段代碼若是使用Spark本地模 式(–master=local[n])運行,和在集羣上運行(例如,用spark-submit提交到YARN上)結果徹底不一樣。

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地模式 v.s. 集羣模式

上面這段代碼其行爲是不肯定的。在本地模式下運行,全部代碼都在運行於單個JVM中,因此RDD的元素都可以被累加並保存到counter變量中,這是由於本地模式下,counter變量和驅動器節點在同一個內存空間中。

然而,在集羣模式下,狀況會更復雜,以上代碼的運行結果就不是所預期的結果了。爲了執行這個做業,Spark會將RDD算子的計算過程分割成多個獨立的任務(task)- 每一個任務分發給不一樣的執行器(executor)去執行。而執行以前,Spark須要計算閉包閉包是 由執行器執行RDD算子(本例中的foreach())時所須要的變量和方法組成的。閉包將會被序列化,併發送給每一個執行器。因爲本地模式下,只有一個執 行器,全部任務都共享一樣的閉包。而在其餘模式下,狀況則有所不一樣,每一個執行器都運行於不一樣的worker節點,而且都擁有獨立的閉包副本。

在上面的例子中,閉包中的變量會跟隨不一樣的閉包副本,發送到不一樣的執行器上,因此等到foreach真正在執行器上運行時,其引用的counter 已經再也不是驅動器上所定義的那個counter副本了,驅動器內存中仍然會有一個counter變量副本,可是這個副本對執行器是不可見的!執行器只能看 到其所收到的序列化閉包中包含的counter副本。所以,最終驅動器上獲得的counter將會是0。

爲了確保相似這樣的場景下,代碼能有肯定的行爲,這裏應該使用累加器Accumulator。累加器是Spark中專門用於集羣跨節點分佈式執行計算中,安全地更新同一變量的機制。本指南中專門有一節詳細說明累加器。

一般來講,閉包(由循環或本地方法組成),不該該改寫全局狀態。Spark中改寫閉包以外對象的行爲是未定義的。這種代碼,有可能在本地模式下能正常工做,但這只是偶然狀況,一樣的代碼在分佈式模式下其行爲極可能不是你想要的。因此,若是須要全局聚合,請記得使用累加器(Accumulator

打印RDD中的元素

另外一種常見習慣是,試圖用 rdd.foreach(println) 或者 rdd.map(println) 來打印RDD中全部的元素。若是是在單機上,這種寫法可以如預期同樣,打印出RDD全部元素。而後,在集羣模式下,這些輸出將會被打印到執行器的標準輸出 (stdout)上,所以驅動器的標準輸出(stdout)上神馬也看不到!若是真要在驅動器上把全部RDD元素都打印出來,你能夠先調用collect 算子,把RDD元素先拉倒驅動器上來,代碼多是這樣:rdd.collect().foreach(println)。不過若是RDD很大的話,有可能 致使驅動器內存溢出,由於collect會把整個RDD都弄到驅動器所在單機上來;若是你只是須要打印一部分元素,那麼take是不更安全的選 擇:rdd.take(100).foreach(println)

使用鍵值對

大部分Spark算子都能在包含任意類型對象的RDD上工做,但也有一部分特殊的算子要求RDD包含的元素必須是鍵值對(key-value pair)。這種算子常見於作分佈式混洗(shuffle)操做,如:以key分組或聚合。

在Scala中,這種操做在包含 Tuple2 (內建與scala語言,能夠這樣建立:(a, b) )類型對象的RDD上自動可用。鍵值對操做是在 PairRDDFunctions 類上可用,這個類型也會自動包裝到包含tuples的RDD上。

例如,如下代碼將使用reduceByKey算子來計算文件中每行文本出現的次數:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

一樣,咱們還能夠用 counts.sortByKey() 來對這些鍵值對按字母排序,最後再用 counts.collect() 將數據以對象數據組的形式拉到驅動器內存中。

注意:若是使用自定義類型對象作鍵值對中的key的話,你須要確保自定義類型實現了 equals() 方法(一般須要同時也實現hashCode()方法)。完整的細節能夠參考:Object.hashCode() documentation

轉換算子 – transformation

如下是Spark支持的一些經常使用transformation算子。詳細請參考 RDD API doc (ScalaJavaPythonR) 以及 鍵值對 RDD 函數 (ScalaJava) 。

transformation算子 做用
map(func) 返回一個新的分佈式數據集,其中每一個元素都是由源RDD中一個元素經func轉換獲得的。
filter(func) 返回一個新的數據集,其中包含的元素來自源RDD中元素經func過濾後(func返回true時才選中)的結果
flatMap(func) 相似於map,但每一個輸入元素能夠映射到0到n個輸出元素(因此要求func必須返回一個Seq而不是單個元素)
mapPartitions(func) 相似於map,但基於每一個RDD分區(或者數據block)獨立運行,因此若是RDD包含元素類型爲T,則 func 必須是 Iterator<T> => Iterator<U> 的映射函數。
mapPartitionsWithIndex(func) 相似於 mapPartitions,只是func 多了一個整型的分區索引值,所以若是RDD包含元素類型爲T,則 func 必須是 Iterator<T> => Iterator<U> 的映射函數。
sample(withReplacementfractionseed) 採樣部分(比例取決於 fraction )數據,同時能夠指定是否使用回置採樣(withReplacement),以及隨機數種子(seed)
union(otherDataset) 返回源數據集和參數數據集(otherDataset)的並集
intersection(otherDataset) 返回源數據集和參數數據集(otherDataset)的交集
distinct([numTasks])) 返回對源數據集作元素去重後的新數據集
groupByKey([numTasks]) 只對包含鍵值對的RDD有效,如源RDD包含 (K, V) 對,則該算子返回一個新的數據集包含 (K, Iterable<V>) 對。
注意:若是你須要按key分組聚合的話(如sum或average),推薦使用 reduceByKey或者 aggregateByKey 以得到更好的性能。
注意:默認狀況下,輸出計算的並行度取決於源RDD的分區個數。固然,你也能夠經過設置可選參數 numTasks 來指定並行任務的個數。
reduceByKey(func, [numTasks]) 若是源RDD包含元素類型 (K, V) 對,則該算子也返回包含(K, V) 對的RDD,只不過每一個key對應的value是通過func聚合後的結果,而func自己是一個 (V, V) => V 的映射函數。
另外,和 groupByKey 相似,能夠經過可選參數 numTasks 指定reduce任務的個數。
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 若是源RDD包含 (K, V) 對,則返回新RDD包含 (K, U) 對,其中每一個key對應的value都是由 combOp 函數 和 一個「0」值zeroValue 聚合獲得。容許聚合後value類型和輸入value類型不一樣,避免了沒必要要的開銷。和 groupByKey 相似,能夠經過可選參數 numTasks 指定reduce任務的個數。
sortByKey([ascending], [numTasks]) 若是源RDD包含元素類型 (K, V) 對,其中K可排序,則返回新的RDD包含 (K, V) 對,並按照 K 排序(升序仍是降序取決於 ascending 參數)
join(otherDataset, [numTasks]) 若是源RDD包含元素類型 (K, V) 且參數RDD(otherDataset)包含元素類型(K, W),則返回的新RDD中將包含內關聯後key對應的 (K, (V, W)) 對。外關聯(Outer joins)操做請參考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numTasks]) 若是源RDD包含元素類型 (K, V) 且參數RDD(otherDataset)包含元素類型(K, W),則返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。該算子還有個別名:groupWith
cartesian(otherDataset) 若是源RDD包含元素類型 T 且參數RDD(otherDataset)包含元素類型 U,則返回的新RDD包含前兩者的笛卡爾積,其元素類型爲 (T, U) 對。
pipe(command[envVars]) 以shell命令行管道處理RDD的每一個分區,如:Perl 或者 bash 腳本。
RDD中每一個元素都將依次寫入進程的標準輸入(stdin),而後按行輸出到標準輸出(stdout),每一行輸出字符串即成爲一個新的RDD元素。
coalesce(numPartitions) 將RDD的分區數減小到numPartitions。當之後大數據集被過濾成小數據集後,減小分區數,能夠提高效率。
repartition(numPartitions) 將RDD數據從新混洗(reshuffle)並隨機分佈到新的分區中,使數據分佈更均衡,新的分區個數取決於numPartitions。該算子老是須要經過網絡混洗全部數據。
repartitionAndSortWithinPartitions(partitioner) 根據partitioner(spark自帶有HashPartitioner和RangePartitioner等)從新分區RDD,而且在每 個結果分區中按key作排序。這是一個組合算子,功能上等價於先 repartition 再在每一個分區內排序,但這個算子內部作了優化(將排序過程下推到混洗同時進行),所以性能更好。

動做算子 – action

如下是Spark支持的一些經常使用action算子。詳細請參考 RDD API doc (ScalaJavaPythonR) 以及 鍵值對 RDD 函數 (ScalaJava) 。

Action算子 做用
reduce(func) 將RDD中元素按func進行聚合(func是一個 (T,T) => T 的映射函數,其中T爲源RDD元素類型,而且func須要知足 交換律 和 結合律 以便支持並行計算)
collect() 將數據集中全部元素以數組形式返回驅動器(driver)程序。一般用於,在RDD進行了filter或其餘過濾操做後,將一個足夠小的數據子集返回到驅動器內存中。
count() 返回數據集中元素個數
first() 返回數據集中首個元素(相似於 take(1) )
take(n) 返回數據集中前 個元素
takeSample(withReplacement,num, [seed]) 返回數據集的隨機採樣子集,最多包含 num 個元素,withReplacement 表示是否使用回置採樣,最後一個參數爲可選參數seed,隨機數生成器的種子。
takeOrdered(n[ordering]) 按元素排序(能夠經過 ordering 自定義排序規則)後,返回前 n 個元素
saveAsTextFile(path) 將數據集中元素保存到指定目錄下的文本文件中(或者多個文本文件),支持本地文件系統、HDFS 或者其餘任何Hadoop支持的文件系統。
保存過程當中,Spark會調用每一個元素的toString方法,並將結果保存成文件中的一行。
saveAsSequenceFile(path)
(Java and Scala)
將數據集中元素保存到指定目錄下的Hadoop Sequence文件中,支持本地文件系統、HDFS 或者其餘任何Hadoop支持的文件系統。適用於實現了Writable接口的鍵值對RDD。在Scala中,一樣也適用於可以被隱式轉換爲 Writable的類型(Spark實現了全部基本類型的隱式轉換,如:Int,Double,String 等)
saveAsObjectFile(path)
(Java and Scala)
將RDD元素以Java序列化的格式保存成文件,保存結果文件可使用 SparkContext.objectFile 來讀取。
countByKey() 只適用於包含鍵值對(K, V)的RDD,並返回一個哈希表,包含 (K, Int) 對,表示每一個key的個數。
foreach(func) 在RDD的每一個元素上運行 func 函數。一般被用於累加操做,如:更新一個累加器(Accumulator ) 或者 和外部存儲系統互操做。
注意:用 foreach 操做出累加器以外的變量可能致使未定義的行爲。更詳細請參考前面的「理解閉包」(Understanding closures )這一小節。

混洗操做

有一些Spark算子會觸發衆所周知的混洗(Shuffle)事件。Spark中的混洗機制是用於將數據從新分佈,其結果是全部數據將在各個分區間從新分組。通常狀況下,混洗須要跨執行器(executor)或跨機器複製數據,這也是混洗操做通常都比較複雜並且開銷大的緣由。

背景

爲了理解混洗階段都發生了哪些事,我首先以 reduceByKey 算 子爲例來看一下。reduceByKey算子會生成一個新的RDD,將源RDD中一個key對應的多個value組合進一個tuple - 而後將這些values輸入給reduce函數,獲得的result再和key關聯放入新的RDD中。這個算子的難點在於對於某一個key來講,並不是其對 應的全部values都在同一個分區(partition)中,甚至有可能都不在同一臺機器上,可是這些values又必須放到一塊兒計算reduce結 果。

在Spark中,一般是因爲爲了進行某種計算操做,而將數據分佈到所須要的各個分區當中。而在計算階段,單個任務(task)只會操做單個分區中的 數據 – 所以,爲了組織好每一個reduceByKey中reduce任務執行時所需的數據,Spark須要執行一個多對多操做。即,Spark須要讀取RDD的所 有分區,並找到全部key對應的全部values,而後跨分區傳輸這些values,並將每一個key對應的全部values放到同一分區,以便後續計算各 個key對應values的reduce結果 – 這個過程就叫作混洗(Shuffle)。

雖然混洗好後,各個分區中的元素和分區自身的順序都是肯定的,可是分區中元素的順序並不是肯定的。若是須要混洗後分區內的元素有序,能夠參考使用如下混洗操做:

  • mapPartitions 使用 .sorted 對每一個分區排序 
  • repartitionAndSortWithinPartitions 重分區的同時,對分區進行排序,比自行組合repartition和sort更高效
  • sortBy 建立一個全局有序的RDD

會致使混洗的算子有:重分區(repartition)類算子,如: repartition 和 coalesceByKey 類算子(除了計數類的,如 countByKey) 如:groupByKey 和 reduceByKey;以及Join類算子,如:cogroup 和 join.

性能影響

混洗(Shuffle)之因此開銷大,是由於混洗操做須要引入磁盤I/O,數據序列化以及網絡I/O等操做。爲了組織好混洗數據,Spark須要生 成對應的任務集 – 一系列map任務用於組織數據,再用一系列reduce任務來聚合數據。注意這裏的map、reduce是來自MapReduce的術語,和Spark的 map、reduce算子並無直接關係。

在Spark內部,單個map任務的輸出會盡可能保存在內存中,直至放不下爲止。而後,這些輸出會基於目標分區從新排序,並寫到一個文件裏。在reduce端,reduce任務只讀取與之相關的並已經排序好的blocks。

某些混洗算子會致使很是明顯的內存開銷增加,由於這些算子須要在數據傳輸先後,在內存中維護組織數據記錄的各類數據結構。特別 地,reduceByKey和aggregateByKey都會在map端建立這些數據結構,而ByKey系列算子都會在reduce端建立這些數據結 構。若是數據在內存中存不下,Spark會把數據吐到磁盤上,固然這回致使額外的磁盤I/O以及垃圾回收的開銷。

混洗還會再磁盤上生成不少臨時文件。以Spark-1.3來講,這些臨時文件會一直保留到其對應的RDD被垃圾回收才刪除。之因此這樣作,是由於如 果血統信息須要從新計算的時候,這些混洗文件能夠沒必要從新生成。若是程序持續引用這些RDD或者垃圾回收啓動頻率較低,那麼這些垃圾回收可能須要等較長的 一段時間。這就意味着,長時間運行的Spark做業可能會消耗大量的磁盤。Spark的臨時存儲目錄,是由spark.local.dir 配置參數指定的。

混洗行爲能夠由一系列配置參數來調優。參考Spark配置指南(Spark Configuration Guide)中「混洗行爲」這一小節。

RDD持久化

Spark的一項關鍵能力就是它能夠持久化(或者緩存)數據集在內存中,從而跨操做複用這些數據集。若是你持久化了一個RDD,那麼每一個節點上都會 存儲該RDD的一些分區,這些分區是由對應的節點計算出來並保持在內存中,後續能夠在其餘施加在該RDD上的action算子中複用(或者從這些數據集派 生新的RDD)。這使得後續動做的速度提升不少(一般高於10倍)。所以,緩存對於迭代算法和快速交互式分析是一個很關鍵的工具。

你能夠用persist() 或者 cache() 來標記一下須要持久化的RDD。等到該RDD首次被施加action算子的時候,其對應的數據分區就會被保留在內存裏。同時,Spark的緩存具有必定的 容錯性 – 若是RDD的任何一個分區丟失了,Spark將自動根據其原來的血統信息從新計算這個分區。

另外,每一個持久化的RDD可使用不一樣的存儲級別,好比,你能夠把RDD保存在磁盤上,或者以java序列化對象保存到內存裏(爲了省空間),或者跨節點多副本,或者使用 Tachyon 存 到虛擬機之外的內存裏。這些存儲級別均可以由persist()的參數StorageLevel對象來控制。cache() 方法自己就是一個使用默認存儲級別作持久化的快捷方式,默認存儲級別是 StorageLevel.MEMORY_ONLY(以java序列化方式存到內存裏)。完整的存儲級別列表以下:

存儲級別 含義
MEMORY_ONLY 以未序列化的Java對象形式將RDD存儲在JVM內存中。若是RDD不能所有裝進內存,那麼將一部分分區緩存,而另外一部分分區將每次用到時從新計算。這個是Spark的RDD的默認存儲級別。
MEMORY_AND_DISK 以未序列化的Java對象形式存儲RDD在JVM中。若是RDD不能所有裝進內存,則將不能裝進內存的分區放到磁盤上,而後每次用到的時候從磁盤上讀取。
MEMORY_ONLY_SER 以序列化形式存儲RDD(每一個分區一個字節數組)。一般這種方式比未序列化存儲方式要更省空間,尤爲是若是你選用了一個比較好的序列化協議(fast serializer),可是這種方式也相應的會消耗更多的CPU來讀取數據。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER相似,只是當內存裝不下的時候,會將分區的數據吐到磁盤上,而不是每次用到都從新計算。
DISK_ONLY RDD數據只存儲於磁盤上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面沒有」_2″的級別相對應,只不過每一個分區數據會在兩個節點上保存兩份副本。
OFF_HEAP (實驗性的) 將RDD以序列化格式保存到Tachyon。與 MEMORY_ONLY_SER相比,OFF_HEAP減小了垃圾回收開銷,而且使執行器(executor)進程更小且能夠共用同一個內存池,這一特性 在須要大量消耗內存和多Spark應用併發的場景下比較吸引人。並且,由於RDD存儲於Tachyon中,因此一個執行器掛了並不會致使數據緩存的丟失。 這種模式下Tachyon 的內存是可丟棄的。所以,Tachyon並不會重建一個它逐出內存的block。若是你打算用Tachyon作爲堆外存儲,Spark和Tachyon具 有開箱即用的兼容性。請參考這裏,有建議使用的Spark和Tachyon的匹配版本對:page

注意:在Python中存儲的對象老是會使用 Pickle 作序列化,因此這時是否選擇一個序列化級別已經可有可無了。

Spark會自動持久化一些混洗操做(如:reduceByKey)的中間數據,即使用戶根本沒有調用persist。這麼作是爲了不一旦有一個節點在混洗過程當中失敗,就要重算整個輸入數據。固然,咱們仍是建議對須要重複使用的RDD調用其persist算子。

如何選擇存儲級別?

Spark的存儲級別主要可於在內存使用和CPU佔用之間作一些權衡。建議根據如下步驟來選擇一個合適的存儲級別:

  • 若是RDD能使用默認存儲級別(MEMORY_ONLY),那就儘可能使用默認級別。這是CPU效率最高的方式,全部RDD算子都能以最快的速度運行。
  • 若是步驟1的答案是否(不適用默認級別),那麼能夠嘗試MEMORY_ONLY_SER級別,並選擇一個高效的序列化協議(selecting a fast serialization library),這回大大節省數據對象的存儲空間,同時速度也還不錯。
  • 儘可能不要把數據吐到磁盤上,除非:1.你的數據集從新計算的代價很大;2.你的數據集是從一個很大的數據源中過濾獲得的結果。不然的話,重算一個分區的速度極可能和從磁盤上讀取差很少。
  • 若是須要支持容錯,能夠考慮使用帶副本的存儲級別(例如:用Spark來服務web請求)。全部的存儲級別都可以以重算丟失數據的方式來提供容錯性,可是帶副本的存儲級別可讓你的應用持續的運行,而沒必要等待重算丟失的分區。
  • 在一些須要大量內存或者並行多個應用的場景下,實驗性的OFF_HEAP會有如下幾個優點:
    • 這個級別下,能夠容許多個執行器共享同一個Tachyon中內存池。
    • 能夠有效地減小垃圾回收的開銷。
    • 即便單個執行器掛了,緩存數據也不會丟失。

刪除數據

Spark可以自動監控各個節點上緩存使用率,而且以LRU(最近常用)的方式將老數據逐出內存。若是你更喜歡手動控制的話,能夠用RDD.unpersist() 方法來刪除無用的緩存。

共享變量

通常而言,當咱們給Spark算子(如 map 或 reduce)傳遞一個函數時,這些函數將會在遠程的集羣節點上運行,而且這些函數所引用的變量都是各個節點上的獨立副本。這些變量都會以副本的形式複製 到各個機器節點上,若是更新這些變量副本的話,這些更新並不會傳回到驅動器(driver)程序。一般來講,支持跨任務的可讀寫共享變量是比較低效的。不 過,Spark仍是提供了兩種比較通用的共享變量:廣播變量和累加器。

廣播變量

廣播變量提供了一種只讀的共享變量,它是把在每一個機器節點上保存一個緩存,而不是每一個任務保存一份副本。一般能夠用來在每一個節點上保存一個較大的輸 入數據集,這要比常規的變量副本更高效(通常的變量是每一個任務一個副本,一個節點上可能有多個任務)。Spark還會嘗試使用高效的廣播算法來分發廣播變 量,以減小通訊開銷。

Spark的操做有時會有多個階段(stage),不一樣階段之間的分割線就是混洗操做。Spark會自動廣播各個階段用到的公共數據。這些方式廣播的數據都是序列化過的,而且在運行各個任務前須要反序列化。這也意味着,顯示地建立廣播變量,只有在跨多個階段(stage)的任務須要一樣的數據 或者 緩存數據的序列化和反序列化格式很重要的狀況下 纔是必須的。

廣播變量能夠經過一個變量v來建立,只需調用 SparkContext.broadcast(v)便可。這個廣播變量是對變量v的一個包裝,要訪問其值,能夠調用廣播變量的 value 方法。代碼示例以下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

廣播變量建立以後,集羣中任何函數都不該該再使用原始變量v,這樣才能保證v不會被屢次複製到同一個節點上。另外,對象v在廣播後不該該再被更新, 這樣才能保證全部節點上拿到一樣的值(例如,更新後,廣播變量又被同步到另外一新節點,新節點有可能獲得的值和其餘節點不同)。

累加器

累加器是一種只支持知足結合律的「累加」操做的變量,所以它能夠很高效地支持並行計算。利用累加器能夠實現計數(相似MapReduce中的計數 器)或者求和。Spark原生支持了數字類型的累加器,開發者也能夠自定義新的累加器。若是建立累加器的時候給了一個名字,那麼這個名字會展現在 Spark UI上,這對於瞭解程序運行處於哪一個階段很是有幫助(注意:Python尚不支持該功能)。

創捷累加器時須要賦一個初始值v,調用 SparkContext.accumulator(v) 能夠建立一個累加器。後續集羣中運行的任務可使用 add 方法 或者 += 操做符 (僅Scala和Python支持)來進行累加操做。不過,任務自己並不能讀取累加器的值,只有驅動器程序能夠用 value 方法訪問累加器的值。

如下代碼展現瞭如何使用累加器對一個元素數組求和:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

以上代碼使用了Spark內建支持的Int型累加器,開發者也能夠經過子類化 AccumulatorParam 來 自定義累加器。累加器接口(AccumulatorParam )主要有兩個方法:1. zero:這個方法爲累加器提供一個「零值」,2.addInPlace 將收到的兩個參數值進行累加。例如,假設咱們須要爲Vector提供一個累加機制,那麼可能的實現方式以下:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

若是使用Scala,Spark還支持幾種更通用的接口:1.Accumulable,這個接口能夠支持所累加的數據類型與結果類型不一樣(如:構建一個收集元素的list);2.SparkContext.accumulableCollection 方法能夠支持經常使用的Scala集合類型。

對於在action算子中更新的累加器,Spark保證每一個任務對累加器的更新只會被應用一次,例如,某些任務若是重啓過,則不會再次更新累加器。 而若是在transformation算子中更新累加器,那麼用戶須要注意,一旦某個任務由於失敗被從新執行,那麼其對累加器的更新可能會實施屢次。

累加器並不會改變Spark懶惰求值的運算模型。若是在RDD算子中更新累加器,那麼其值只會在RDD作action算子計算的時候被更新一次。因 此,在transformation算子(如:map)中更新累加器,其值並不能保證必定被更新。如下代碼片斷說明了這一特性:

也不會進行實際的計算val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// 這裏,accum任然是0,由於沒有action算子,因此map

部署到集羣

應用提交指南(application submission guide)中描述瞭如何向集羣提交應用。換句話說,就是你須要把你的應用打包成 JAR文件(Java/Scala)或者一系列 .py 或 .zip 文件(Python),而後再用 bin/spark-submit 腳本將其提交給Spark所支持的集羣管理器。

從Java/Scala中啓動Spark做業

org.apache.spark.launcher 包提供了簡明的Java API,能夠將Spark做業做爲子進程啓動。

單元測試

Spark對全部常見的單元測試框架提供友好的支持。你只須要在測試中建立一個SparkContext對象,而後吧master URL設爲local,運行測試操做,最後調用 SparkContext.stop() 來中止測試。注意,必定要在 finally 代碼塊或者單元測試框架的 tearDown方法裏調用SparkContext.stop(),由於Spark不支持同一程序中有多個SparkContext對象同時運行。

從1.0以前版本遷移過來

Spark 1.0 凍結了Spark Core 1.x 系列的核心API,只要是沒有標記爲 「experimental」 或者 「developer API」的API,在將來的版本中會一直支持。對於Scala用戶來講,惟一的變化就是分組相關的算子,如:groupByKey, cogroup, join,這些算子的返回類型由 (Key, Seq[Value]) 變爲 (Key, Iterable[Value])。

更詳細遷移向導請參考這裏:Spark StreamingMLlib 以及 GraphX.

下一步

你能夠去Spark的官網上看看示例程序(example Spark programs)。另外,Spark代碼目錄下也自帶了很多例子,見 examples 目錄(Scala,JavaPythonR)。你能夠把示例中的類名傳給 bin/run-example 腳原本運行這些例子;例如:

./bin/run-example SparkPi

若是須要運行Python示例,則須要使用 spark-submit 腳本:

./bin/spark-submit examples/src/main/python/pi.py

對R語言,一樣也須要使用 spark-submit:

./bin/spark-submit examples/src/main/r/dataframe.R

配置(configuration)和調優(tuning)指南提供了很多最佳實踐的信息,能夠幫助你優化程序,特別是這些信息能夠幫助你確保數據以一種高效的格式保存在內存裏。集羣模式概覽(cluster mode overview)這篇文章描述了分佈式操做中相關的組件,以及Spark所支持的各類集羣管理器。

最後,完整的API文件見:ScalaJavaPython 以及 R.

 

原創文章,轉載請註明: 轉載自併發編程網 – ifeve.com本文連接地址: 《Spark 官方文檔》Spark編程指南

相關文章
相關標籤/搜索