spark RDD操做

Spark開發指南

簡介

總的來講,每個Spark的應用,都是由一個驅動程序(driver program)構成,它運行用戶的main函數,在一個集羣上執行各類各樣的並行操做。Spark提出的最主要抽象概念是彈性分佈式數據集 (resilient distributed dataset,RDD),它是一個元素集合,劃分到集羣的各個節點上,能夠被並行操做。RDDs的建立能夠從HDFS(或者任意其餘支持Hadoop文件系統) 上的一個文件開始,或者經過轉換驅動程序(driver program)中已存在的Scala集合而來。用戶也可讓Spark保留一個RDD在內存中,使其能在並行操做中被有效的重複使用。最後,RDD能自動從節點故障中恢復。

Spark的第二個抽象概念是共享變量(shared variables),能夠在並行操做中使用。在默認狀況下,Spark經過不一樣節點上的一系列任務來運行一個函數,它將每個函數中用到的變量的拷貝傳遞到每個任務中。有時候,一個變量須要在任務之間,或任務與驅動程序之間被共享。Spark 支持兩種類型的共享變量:廣播變量,能夠在內存的全部的結點上緩存變量;累加器:只能用於作加法的變量,例如計數或求和。

本指南將展現這些特性,並給出一些例子。讀者最比如較熟悉Scala,尤爲是閉包的語法。請留意,你也能夠經過spark-shell腳本,來交互式地運行Spark。咱們建議你在接下來的步驟中這樣作。

接入Spark

Spark 0.8.1 須要搭配使用 Scala 2.9.3. 若是你用Scala 來編寫應用,你須要使用相同版本的Scala,更新的大版本極可能不兼容。

要寫一個Spark 應用,你須要給它加上Spark的依賴。若是你使用SBT或者Maven,Spark能夠經過Maven中心庫來得到:

1

2

3

groupId = org.apache.spark

artifactId = spark-core_2.9.3

version = 0.8.1-incubating

另外,若是你想訪問一個HDFS集羣,你須要根據你的HDFS版本,添加一個hadoop-client的依賴:

1

2

3

groupId = org.apache.hadoop

artifactId = hadoop-client

version = <your-hdfs-version>

對於其餘編譯系統,你能夠經過運行sbt/sbt assembly來把Spark及其依賴打包到一個JAR(assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop*.jar)中,而後將其加入到你的CLASSPATH中。並按照這裏的描述設置HDFS版本。

最後,你須要將一些Spark的類和隱式轉換導入到你的程序中。經過以下語句:

1

2

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

Spark初始化

Spark程序須要作的第一件事情,就是建立一個SparkContext對象,它將告訴Spark如何訪問一個集羣。這個一般是經過下面的構造器來實現的:

1

new SparkContext(master, appName, [sparkHome], [jars])

master參數,是一個用於指定所鏈接的Spark or Mesos 集羣URL的字符串,也能夠是一個以下面所描述的用於在local模式運行的特殊字符串「local」。appName是你的應用的名稱,將會在集羣的Web監控UI中顯示。最後,若是部署到集羣,在分佈式模式下運行,最後兩個參數是必須的。後面會有具體的描述。

在Spark shell中,一個特殊的解釋器感知的SparkContext已經爲你建立好了,變量名是sc。建立你本身的SparkContext是不會生效的。你能夠用MASTER環境變量來設置SparkContext鏈接到的master。也能夠用ADD_JARS變量來將JARs加入到你的classpath。例如,若是在四核CPU上運行spark-shell,使用:

1

$ MASTER=local[4] ./spark-shell

或者,同時在classpath中加入code.jar,使用:

1

$ MASTER=local[4] ADD_JARS=code.jar ./spark-shell

Master URLs

傳遞給Spark的master URL能夠是如下任一種形式:

Master URL 含義
local 使用一個Worker線程本地化運行SPARK(徹底不併行)
local[K] 使用K個Worker線程本地化運行Spark(理想狀況下,K應該根據運行機器的CPU核數設定)
spark://HOST:PORT 鏈接到指定的Spark單機版集羣(Spark standalone cluster)master。必須使用master所配置的接口,默認接口是7077.
mesos://HOST:PORT 鏈接到指定的Mesos集羣。host參數是Moses master的hostname。必須使用master所配置的接口,默認接口是5050.

若是沒有指定的msater URL, spark shell 的默認值是「local」。

若是在YARN上運行,Spark會在YARN上,啓動一個standalone部署的集羣實例,查看 running on YARN得到更多詳情。

在集羣上部署代碼

若是你要在集羣上運行應用,你須要給SparkContext指定兩個可選參數,使其能找到你的代碼:

  • sparkHome:你的集羣機器上Spark的安裝路徑(全部機器上路徑必須一致)

  • jars: 在本地機器上的JAR文件列表,其中包括你應用的代碼以及任何的依賴,Spark將會把他們部署到全部的集羣結點上。你須要使用你的編譯系統將你的應用打包成一系列JAR文件。例如,若是你使用SBT,用sbt-assembly插件將你的代碼和全部依賴變成一個JAR文件是一個好的辦法。

若是你在一個集羣上運行spark-shell, 在啓動以前你能夠經過指定ADD_JAR環境變量將JAR文件們加載在集羣上,這個變量須要包括一個用逗號分隔的JAR文件列表。例如,ADD_JARS=a.jar,b.jar ./spark-shell將啓動一個在classpath中帶有a.jar和b.jar的shell。另外,在shell中定義的任何新類,都會被自動分發出去。

彈性分佈式數據集

Spark圍繞的概念是彈性分佈式數據集(RDD),這是一個有容錯機制並能夠被並行操做的元素集合。目前有兩種類型的RDD:並行集合(Parallelized Collections):接收一個已經存在的Scala集合,而後進行各類並行計算。 Hadoop數據集(Hadoop Datasets):在一個文件的每條記錄上運行函數。只要文件系統是HDFS,或者hadoop支持的任意存儲系統便可。 這兩種類型的RDD均可以經過相同的方式進行操做。

並行集合(Parallelized Collections)

並行集合是經過調用SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq對象)。集合的對象將會被拷貝,建立出一個能夠被並行操做的分佈式數據集。例如,下面的解釋器輸出,演示瞭如何從一個數組建立一個並行集合:

1

2

3

4

5

scala> val data = Array(1, 2, 3, 4, 5)

data: Array[Int] = Array(1, 2, 3, 4, 5)

 

scala> val distData = sc.parallelize(data)

distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

一旦分佈式數據集(distData)被建立好,它們將能夠被並行操做。例如,咱們能夠調用distData.reduce(_ +_)來將數組的元素相加。咱們會在後續的分佈式數據集運算中進一步描述。

並行集合的一個重要參數是slices,表示數據集切分的份數。Spark將會在集羣上爲每一份數據起一個任務。典型地,你能夠在集羣的每一個CPU上分佈2-4個slices. 通常來講,Spark會嘗試根據集羣的情況,來自動設定slices的數目。然而,你也能夠經過傳遞給parallelize的第二個參數來進行手動設置。(例如:sc.parallelize(data, 10)).

Hadoop數據集(Hadoop Datasets)

Spark能夠從存儲在HDFS,或者Hadoop支持的其它文件系統(包括本地文件,Amazon S3, Hypertable, HBase等等)上的文件建立分佈式數據集。Spark能夠支持TextFile,SequenceFiles以及其它任何Hadoop輸入格式。(Python接口目前還不支持SequenceFile,很快會支持吧)

Text file的RDDs能夠經過SparkContext’s textFile的方式建立,該方法接受一個文件的URI地址(或者機器上的一個本地路徑,或者一個hdfs://, sdn://,kfs://,其它URI). 下面是一個調用例子:

1

2

scala> val distFile = sc.textFile("data.txt")

distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08

一旦建立完成,distFile能夠被進行數據集操做。例如,咱們能夠經過使用以下的map和reduce操做:distFile.map(_.size).reduce(_ + _ )將全部數據行的長度相加。

textFile方法也能夠經過輸入一個可選的第二參數,來控制文件的分片數目。默認狀況下,Spark爲每一塊文件建立一個分片(HDFS默認的塊大小爲64MB),可是你也能夠經過傳入一個更大的值,來指定一個更高的片值。注意,你不能指定一個比塊數更小的片值(和Map數不能小於Block數同樣,可是能夠比它多)

對於SequenceFiles,可使用SparkContext的sequenceFile[K, V]方法建立,其中K和V是文件中的key和values的類型。像IntWritableText同樣,它們必須是Hadoop的Writable interface的子類。另外,對於幾種通用Writable類型,Spark容許你指定原生類型來替代。例如:sequencFile[Int, String]將會自動讀取IntWritable和Texts。

最後,對於其餘類型的Hadoop輸入格式,你可使用SparkContext.hadoopRDD方法,它能夠接收任意類型的JobConf和輸入格式類,鍵類型和值類型。按照像Hadoop做業同樣的方法,來設置輸入源就能夠了。

RDD 的操做

RDD支持兩種操做:轉換(transformation從現有的數據集建立一個新的數據集;而動做(actions)在數據集上運行計算後,返回一個值給驅動程序。 例如,map就是一種轉換,它將數據集每個元素都傳遞給函數,並返回一個新的分佈數據集表示結果。另外一方面,reduce是一種動做,經過一些函數將全部的元素疊加起來,並將最終結果返回給Driver程序。(不過還有一個並行的reduceByKey,能返回一個分佈式數據集)

Spark中的全部轉換都是惰性的,也就是說,他們並不會直接計算結果。相反的,它們只是記住應用到基礎數據集(例如一個文件)上的這些轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這個設計讓Spark更加有效率的運行。例如,咱們能夠實現:經過map建立的一個新數據集,並在reduce中使用,最終只返回reduce的結果給driver,而不是整個大的新數據集。

默認狀況下,每個轉換過的RDD都會在你在它之上執行一個動做時被從新計算。不過,你也可使用persist(或者cache)方法,持久化一個RDD在內存中。在這種狀況下,Spark將會在集羣中,保存相關元素,下次你查詢這個RDD時,它將能更快速訪問。在磁盤上持久化數據集,或在集羣間複製數據集也是支持的,這些選項將在本文檔的下一節進行描述。

下面的表格列出了目前所支持的轉換和動做(詳情請參見 RDD API doc):

轉換(transformation

 轉換 含義
map(func) 返回一個新分佈式數據集,由每個輸入元素通過func函數轉換後組成
filter(func) 返回一個新數據集,由通過func函數計算後返回值爲true的輸入元素組成
flatMap(func) 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 相似於map,但獨立地在RDD的每個分塊上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithSplit(func) 相似於mapPartitions, 但func帶有一個整數參數表示分塊的索引值。所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U]
sample(withReplacement,fractionseed) 根據fraction指定的比例,對數據進行採樣,能夠選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 返回一個新的數據集,新數據集是由源數據集和參數數據集聯合而成
distinct([numTasks])) 返回一個包含源數據集中全部不重複元素的新數據集
groupByKey([numTasks]) 在一個(K,V)對的數據集上調用,返回一個(K,Seq[V])對的數據集
注意:默認狀況下,只有8個並行任務來作操做,可是你能夠傳入一個可選的numTasks參數來改變它
reduceByKey(func, [numTasks]) 在一個(K,V)對的數據集上調用時,返回一個(K,V)對的數據集,使用指定的reduce函數,將相同key的值聚合到一塊兒。相似groupByKey,reduce任務個數是能夠經過第二個可選參數來配置的
sortByKey([ascending], [numTasks]) 在一個(K,V)對的數據集上調用,K必須實現Ordered接口,返回一個按照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定
join(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)類型的數據集上調用時,返回一個相同key對應的全部元素對在一塊兒的(K, (V, W))數據集
cogroup(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的數據集上調用,返回一個 (K, Seq[V], Seq[W])元組的數據集。這個操做也能夠稱之爲groupwith
cartesian(otherDataset) 笛卡爾積,在類型爲 T 和 U 類型的數據集上調用時,返回一個 (T, U)對數據集(兩兩的元素對)

完整的轉換列表能夠在RDD API doc中得到。

動做(actions)

 動做 含義
reduce(func) 經過函數func(接受兩個參數,返回一個參數)彙集數據集中的全部元素。這個功能必須可交換且可關聯的,從而能夠正確的被並行執行。
collect() 在驅動程序中,以數組的形式,返回數據集的全部元素。這一般會在使用filter或者其它操做並返回一個足夠小的數據子集後再使用會比較有用。
count() 返回數據集的元素的個數。
first() 返回數據集的第一個元素(相似於take(1))
take(n) 返回一個由數據集的前n個元素組成的數組。注意,這個操做目前並不是並行執行,而是由驅動程序計算全部的元素
takeSample(withReplacement,numseed) 返回一個數組,在數據集中隨機採樣num個元素組成,能夠選擇是否用隨機數替換不足的部分,Seed用於指定的隨機數生成器種子
saveAsTextFile(path) 將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。對於每一個元素,Spark將會調用toString方法,將它轉換爲文件中的文本行
saveAsSequenceFile(path) 將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable接口,或者隱式的能夠轉換爲Writable的RDD。(Spark包括了基本類型的轉換,例如Int,Double,String,等等)
countByKey() 對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每個key對應的元素個數
foreach(func) 在數據集的每個元素上,運行函數func進行更新。這一般用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互,例如HBase

完整的轉換列表能夠在RDD API doc中得到。

RDD 的持久化

Spark最重要的一個功能,就是在不一樣操做間,持久化(或緩存)一個數據集在內存中。當你持久化一個RDD,每個結點都將把它的計算分塊結果保存在內存中,並在對此數據集(或者衍生出的數據集)進行的其它動做中重用。這將使得後續的動做(Actions)變得更加迅速(一般快10倍)。緩存是用Spark構建迭代算法的關鍵。

你能夠用persist()或cache()方法來標記一個要被持久化的RDD,而後一旦首次被一個動做(Action)觸發計算,它將會被保留在計算結點的內存中並重用。Cache有容錯機制,若是RDD的任一分區丟失了,經過使用原先建立它的轉換操做,它將會被自動重算(不須要所有重算,只計算丟失的部分)。

此外,每個RDD均可以用不一樣的保存級別進行保存,從而容許你持久化數據集在硬盤,或者在內存做爲序列化的Java對象(節省空間),甚至於跨結點複製。這些等級選擇,是經過將一個org.apache.spark.storage.StorageLevel對象傳遞給persist()方法進行肯定。cache()方法是使用默認存儲級別的快捷方法,也就是StorageLevel.MEMORY_ONLY(將反序列化的對象存入內存)。

完整的可選存儲級別以下:

存儲級別  意義
MEMORY_ONLY 將RDD做爲反序列化的的對象存儲JVM中。若是RDD不能被內存裝下,一些分區將不會被緩存,而且在須要的時候被從新計算。這是是默認的級別
MEMORY_AND_DISK 將RDD做爲反序列化的的對象存儲在JVM中。若是RDD不能被與內存裝下,超出的分區將被保存在硬盤上,而且在須要時被讀取
MEMORY_ONLY_SER 將RDD做爲序列化的的對象進行存儲(每一分區佔用一個字節數組)。一般來講,這比將對象反序列化的空間利用率更高,尤爲當使用fast serializer,但在讀取時會比較佔用CPU
MEMORY_AND_DISK_SER 與MEMORY_ONLY_SER類似,可是把超出內存的分區將存儲在硬盤上而不是在每次須要的時候從新計算
DISK_ONLY 只將RDD分區存儲在硬盤上
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 與上述的存儲級別同樣,可是將每個分區都複製到兩個集羣結點上

存儲級別的選擇

Spark的不一樣存儲級別,旨在知足內存使用和CPU效率權衡上的不一樣需求。咱們建議經過如下的步驟來進行選擇:

  • 若是你的RDDs能夠很好的與默認的存儲級別(MEMORY_ONLY)契合,就不須要作任何修改了。這已是CPU使用效率最高的選項,它使得RDDs的操做盡量的快。

  • 若是不行,試着使用MEMORY_ONLY_SER而且選擇一個快速序列化的庫使得對象在有比較高的空間使用率的狀況下,依然能夠較快被訪問。

  • 儘量不要存儲到硬盤上,除非計算數據集的函數,計算量特別大,或者它們過濾了大量的數據。不然,從新計算一個分區的速度,和與從硬盤中讀取基本差很少快。

  • 若是你想有快速故障恢復能力,使用複製存儲級別(例如:用Spark來響應web應用的請求)。全部的存儲級別都有經過從新計算丟失數據恢復錯誤的容錯機制,可是複製存儲級別可讓你在RDD上持續的運行任務,而不須要等待丟失的分區被從新計算。

若是你想要定義你本身的存儲級別(好比複製因子爲3而不是2),可使用StorageLevel 單例對象的apply()方法。

共享變量

通常來講,當一個函數被傳遞給Spark操做(例如map和reduce),在一個遠程集羣上運行,它實際上操做的是這個函數用到的全部變量的獨立拷貝。這些變量會被拷貝到每一臺機器,在遠程機器上對變量的全部更新都不會被傳播回驅動程序。一般看來,在任務之間中,讀寫共享變量顯然不夠高效。然而,Spark仍是爲兩種常見的使用模式,提供了兩種有限的共享變量:廣播變量和累加器。

廣播變量

廣播變量容許程序員保留一個只讀的變量,緩存在每一臺機器上,而非每一個任務保存一份拷貝。他們能夠這樣被使用,例如,以一種高效的方式給每一個結點一個大的輸入數據集。Spark會嘗試使用一種高效的廣播算法來傳播廣播變量,從而減小通訊的代價。

廣播變量是經過調用SparkContext.broadcast(v)方法從變量v建立的。廣播變量是一個v的封裝器,它的值能夠經過調用value方法得到。以下模塊展現了這個:

1

2

3

4

5

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

 

scala> broadcastVar.value

res0: Array[Int] = Array(1, 2, 3)

在廣播變量被建立後,它應該在集羣運行的任何函數中,代替v值被調用,從而v值不須要被再次傳遞到這些結點上。另外,對象v不能在廣播後修改,這樣能夠保證全部結點的收到的都是如出一轍的廣播值。

累加器

累加器是一種只能經過關聯操做進行「加」操做的變量,所以能夠高效被並行支持。它們能夠用來實現計數器(如MapReduce中)和求和器。Spark原生就支持Int和Double類型的累加器,開發者能夠本身添加新的支持類型。

一個累加器能夠經過調用SparkContext.accumulator(v)方法從一個初始值v中建立。運行在集羣上的任務,能夠經過使用+=來給它加值。然而,他們不能讀取這個值。只有驅動程序可使用value的方法來讀取累加器的值。

以下的解釋器模塊,展現瞭如何利用累加器,將一個數組裏面的全部元素相加:

1

2

3

4

5

6

7

8

9

scala> val accum = sc.accumulator(0)

accum: spark.Accumulator[Int] = 0

 

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

...

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

 

scala> accum.value

res2: Int = 10

更多信息

你能夠在Spark的網站上看到spark程序的樣例。Spark還在examples/src/main/scala上收入了一些例子,其中一些既有Spark版本,又有本地(非並行)版本。這些案例讓你看到要讓程序以集羣化的方式跑起來的話,須要作什麼修改。你能夠經過將類名傳遞給spark中的run-example腳原本運行它們,例如:

1

./run-example org.apache.spark.examples.SparkPi

任何樣例程序在運行時若是沒有提供任何參數,都會打印使用幫助。

當須要優化程序的幫助,configuration tuning指導提供了最佳實踐信息。它們對於確保你的數據以高效的格式存儲在內存中,相當重要。

相關文章
相關標籤/搜索