Spark編程入門

1. 交互式Spark-Shell

根據前一節已經搭建好的Hadoop和Spark環境,直接經過腳本啓動Hadoop和Spark服務。若是 http://localhost:8080 可以訪問,說明Spark服務已經啓動。Spark爲咱們提供了PySpark以及Spark-shell,能夠方便的經過交互試界面調試Spark應用。接下來咱們將採用Spark-Shell來調試Spark程序。在終端中輸入以下命令: spark-shell --master spark://spark-B470:7077, master後面的URL就是Spark Master的URL ,能夠在 http://localhost:8080 的頁面上找到。html

  1. hadoop@spark-B470:~/Develop$ spark-shell --master spark://spark-B470:7077
  2. Setting default log level to "WARN".
  3. To adjust logging level use sc.setLogLevel(newLevel).
  4. SLF4J: Class path contains multiple SLF4J bindings.
  5. SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  6. SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  7. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  8. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  9. 16/10/29 23:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  10. 16/10/29 23:04:25 WARN Utils: Your hostname, spark-B470 resolves to a loopback address: 127.0.1.1; using 192.168.1.110 instead (on interface enp4s0)
  11. 16/10/29 23:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
  12. 16/10/29 23:04:27 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
  13. Spark context Web UI available at http://192.168.1.110:4040
  14. Spark context available as 'sc' (master = spark://spark-B470:7077, app id = app-20161029230426-0000).
  15. Spark session available as 'spark'.
  16. Welcome to
  17. ____ __
  18. / __/__ ___ _____/ /__
  19. _\ \/ _ \/ _ `/ __/ '_/
  20. /___/ .__/\_,_/_/ /_/\_\ version 2.0.0
  21. /_/
  22. Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
  23. Type in expressions to have them evaluated.
  24. Type :help for more information.
  25. scala>

看到Scala的交互式窗口後,能夠在 http://localhost:8080 的web上 Running Applications 這一欄 看到剛啓動的應用程序,若是你要退出Spark-Shell,按 CTRL D組合鍵退出。java

spark_web_ui

Spark 最主要的抽象是叫 Resilient Distributed Dataset(RDD) 的彈性分佈式集合。RDDs 可使用 Hadoop InputFormats(例如 HDFS 文件)建立,也能夠從其餘的 RDDs 轉換。讓咱們在 Spark 源代碼目錄從 /etc/protocols 文本文件中建立一個新的 RDD。python

  1. scala> val file = sc.textFile("file:///etc/protocols")
  2. file: org.apache.spark.rdd.RDD[String] = file:///etc/protocols MapPartitionsRDD[5] at textFile at <console>:24
  3. scala> file.count()
  4. res3: Long = 64
  5. scala> file.first()
  6. res4: String = # Internet (IP) protocols

上面的操做中建立了一個RDD file,執行了兩個簡單的操做:程序員

  • count() 獲取RDD的行數
  • first() 獲取第一行的內容

咱們繼續執行其餘操做,好比查找有多少行含有tcp和udp字符串:web

  1. scala> file.filter(line=>line.contains("tcp")).count()
  2. res2: Long = 1
  3. scala> file.filter(line=>line.contains("udp")).count()
  4. res3: Long = 2

查看一共有多少個不一樣單詞的方法,這裏用到MapReduce的思路:算法

  1. scala> val wordcount = file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((x,y)=>x+y)
  2. wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:26
  3. scala> wordcount.count()
  4. res4: Long = 243

2. Spark 核心概念

如今你已經用Spark-shell運行了一段Spark程序,是時候對Spark編程做更細緻的瞭解。shell

從上層來看,每一個Spark應用都由一個驅動器程序(driver program)來發起集羣 上的各類並行操做。驅動器程序包包含應用的 main 函數, 而且定義了集羣上的分佈式數據集,還對這些分佈式數據集應用了相關的操做。在前面的例子裏,實際的驅動器程序就是Spark shell 自己,你只要輸入想運行的操做就能夠了。express

驅動器程序經過一個 SparkContext 對象來訪問Spark,這個對象表明對計算集羣的一個鏈接。shell啓動時已經自動建立了一個 SparkContext 對象,是一個叫做 sc 的變量。咱們能夠經過在shell裏嘗試輸出 sc 來查看它的類型。apache

  1. scala> sc
  2. res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1f172892

一旦有了 SparkContext,你就能夠經過它來建立RDD。就像前面的例子,咱們調用了 sc.textFile() 來建立一個表明文件中各行文本的RDD。咱們能夠在這些行上進行各類並行操做,好比 count()編程

要執行這些操做,驅動器程序通常要管理多個執行器(executor)節點。好比,若是咱們在集羣上運行count() 操做,那麼不一樣的節點會統計文件的不一樣部分的行數。因爲咱們剛纔是在本地模式下運行的Spark shell,所以全部的工做會在單個節點上執行,但你能夠將這個shell鏈接到集羣上來進行並行的數據分析。下圖展現Spark如何在一個集羣上運行。

Spark分佈式集羣

3. RDD編程

3.1 RDD基礎

Spark中的RDD就是一個不可變的分佈式對象集合。每一個RDD都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。用戶可使用兩種方法建立RDD:讀取一個外部數據集,或在驅動程序裏分發驅動器程序中的對象集合(好比list和set)。咱們在前面的例子已經使用 SparkContext.textFile() 來讀取文本文件做爲一個字符串RDD。建立出來後的RDD支持兩種類型的操做:轉化操做(transformation) 和 行動操做(action)

轉化操做會由一個RDD生成一個新的RDD。例如,根據謂詞匹配狀況篩選數據就是一個常見的轉化操做。在咱們的文本示例中,咱們能夠用篩選來生成一個只存儲包含單詞tcp的字符串的的新的RDD。示例以下:

  1. scala> val tcpLines = file.filter(line => line.contains("tcp"))
  2. tcpLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26

行動操做會對RDD計算一個結果,並把結果返回到驅動器程序中,或者把結果存儲到外部存儲系統(如HDFS)中。first() 就是咱們以前調用的一個行動操做,它會返回RDD的一個元素,示例以下:

  1. scala> tcpLines.first()
  2. res3: String = tcp 6 TCP # transmission control protocol

轉化操做和行動操做的區別在於Spark計算RDD的方式不一樣。雖然你能夠在任什麼時候候定義新的RDD,可是Spark只會惰性計算這些RDD。它們只有第一次在一個行動操做中用到時,纔會真正計算。這種策略剛開始看起來可能會顯得有些奇怪,不過在大數據領域是頗有道理的。好比,咱們以一個文本文件定義了數據, 而後把其中包含tcp的行篩選出來。若是Spark在運行 val file = sc.textFile("file:///etc/protocols") 時就把文件中的全部行都讀取並存儲起來,就會消耗不少的存儲空間,而咱們立刻就要篩選掉其中的不少數據。相反,一旦Spark瞭解了完整的轉化操做鏈以後,它就能夠只計算求結果時真正須要的數據。事實上,在行動操做first()中,Spark只須要掃描文件直到找到第一個匹配的行爲止,而不須要讀取整個文件。

最後,默認狀況下,Spark的RDD會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個RDD,可使用 RDD.persist() 讓Spark把這個RDD緩存下來。在第一次對持久化的RDD計算以後,Spark會把RDD的內容保存到內存中(以分區方式存儲到集羣中的各機器上),這樣在以後的行動操做就能夠重用這些數據了。Spark在默認狀況下不進行持久化可能顯得有些奇怪,不過這對於大規模數據集是頗有意義的:若是不會重用該RDD,咱們就不必浪費存儲空間,Spark能夠直接遍歷一遍數據而後計算出結果。

在實際操做中,你會常常用 persist() 來把數據的一部分讀取到內存中,並反覆查詢這部分數據。例如,咱們想屢次對
/etc/protocols 文件包含tcp的行進行計算,就能夠寫出以下腳本:

  1. scala> tcpLines.persist()
  2. res8: tcpLines.type = MapPartitionsRDD[3] at filter at <console>:26
  3. scala> tcpLines.count()
  4. res9: Long = 1
  5. scala> tcpLines.first()
  6. res10: String = tcp 6 TCP # transmission control protocol

總的來講,每一個Spark程序通常的工做流程:

  1. 從外部數據建立輸入RDD。
  2. 使用諸如 filter() 這樣的轉化操做對RDD進行轉化,以定義新的RDD。
  3. 告訴Spark對須要被重用的中間結果RDD執行persist() 操做。
  4. 使用行動操做(如count()和first()等)來觸發一次並行計算,Spark會對計算進行優化後再執行。

3.2 建立RDD

Spark提供了兩種建立RDD 的方式:讀取外部數據集,以及在驅動程序中對一個集合進行並行化。

3.2.1 並行集合

並行集合 (Parallelized collections) 的建立是經過在一個已有的集合(Scala Seq)上調用 SparkContext 的 parallelize 方法實現的。集合中的元素被複制到一個可並行操做的分佈式數據集中。例如,這裏演示瞭如何在一個包含 1 到 10 的數組中建立並行集合:

  1. scala> val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  2. data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  3. scala> val distData = sc.parallelize(data)
  4. distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26

一旦建立完成,這個分佈式數據集(distData)就能夠被並行操做。例如,咱們能夠調用 distData.reduce((a, b) => a + b) 將這個數組中的元素相加。咱們之後再描述在分佈式上的一些操做。

並行集合一個很重要的參數是切片數(slices),表示一個數據集切分的份數。Spark 會在集羣上爲每個切片運行一個任務。你能夠在集羣上爲每一個 CPU 設置 2-4 個切片(slices)。正常狀況下,Spark 會試着基於你的集羣情況自動地設置切片的數目。然而,你也能夠經過 parallelize 的第二個參數手動地設置(例如:sc.parallelize(data, 10))。

注意:除了開發原型和測試時,這種方式用的並很少,畢竟這種方式須要把你的整個數據集先放在一臺機器的內存中。

3.2.2 外部數據集

Spark 能夠從任何一個 Hadoop 支持的存儲源建立分佈式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支持文本文件(text files),SequenceFiles 和其餘 Hadoop InputFormat。

文本文件 RDDs 可使用 SparkContext.textFile() 方法建立。在這個方法裏傳入文件的 URI (機器上的本地路徑 file:// 或 hdfs://,s3n:// 等),而後它會將文件讀取成一個行集合。這裏是一個調用例子:

  1. scala> val distFile = sc.textFile("file:///home/hadoop/Develop/start.sh")
  2. distFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/Develop/start.sh MapPartitionsRDD[14] at textFile at <console>:24

一旦建立完成,distFile 就能作數據集操做。例如,咱們能夠用下面的方式使用 map 和 reduce 操做將全部行的長度相加:distFile.map(s => s.length).reduce((a, b) => a + b) 。

注意,Spark 讀文件時:

  • 若是使用本地文件系統路徑,文件必須能在 work 節點上用相同的路徑訪問到。要麼複製文件到全部的 workers,要麼使用網絡的方式共享文件系統。
  • 全部 Spark 的基於文件的方法,包括 textFile,能很好地支持文件目錄,壓縮過的文件和通配符。例如,你可使用 textFile(「/my/文件目錄」),textFile(「/my/文件目錄/.txt」) 和 textFile(「/my/文件目錄/.gz」)。
  • textFile 方法也能夠選擇第二個可選參數來控制切片(slices)的數目。默認狀況下,Spark 爲每個文件塊(HDFS 默認文件塊大小是 64M)建立一個切片(slice)。可是你也能夠經過一個更大的值來設置一個更高的切片數目。注意,你不能設置一個小於文件塊數目的切片值。

3.3 RDD操做

RDD 支持兩種類型的操做:轉化操做(transformations) 從已經存在的數據集中建立一個新的數據集;行動操做(actions) 在數據集上進行計算以後返回一個值到驅動器程序。例如,map 是一個轉化操做,它將每個數據集元素傳遞給一個函數而且返回一個新的 RDD。另外一方面,reduce 是一個行動操做,它使用相同的函數來聚合 RDD 的全部元素,而且將最終的結果返回到驅動器程序(不過也有一個並行 reduceByKey 能返回一個分佈式數據集)。

3.3.1 向Spark傳遞函數

Spark的大部分轉化操做和一部分行動操做,都須要依賴用戶傳遞函數來計算。這裏推薦兩種方式:

  • 匿名函數 (Anonymous function syntax),能夠在比較短的代碼中使用。
  • 全局單例對象裏的靜態方法。例如,你能夠定義 object MyFunctions 而後傳遞 MyFounctions.func1,像下面這樣:
  1. object MyFunctions {
  2. def func1(s: String): String = { ... }
  3. }
  4. myRdd.map(MyFunctions.func1)

注意,它可能傳遞的是一個類實例裏的一個方法引用(而不是一個單例對象),這裏必須傳送包含方法的整個對象。例如:

  1. class MyClass {
  2. def func1(s: String): String = { ... }
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  4. }

這裏,若是咱們建立了一個 new MyClass 對象,而且調用它的 doStuffmap 裏面引用了這個 MyClass 實例中的 func1 方法,因此這個對象必須傳送到集羣上。相似寫成 rdd.map(x => this.func1(x))

以相似的方式,訪問外部對象的字段將會引用整個對象:

  1. class MyClass {
  2. val field = "Hello"
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  4. }

至關於寫成 rdd.map(x => this.field + x),引用了整個 this 對象。爲了不這個問題,最簡單的方式是複製 field 到一個本地變量而不是從外部訪問它:

  1. def doStuff(rdd: RDD[String]): RDD[String] = {
  2. val field_ = this.field
  3. rdd.map(x => field_ + x)
  4. }
3.3.2 使用鍵值對

雖然不少 Spark 操做工做在包含任意類型對象的RDD上的,可是少數幾個特殊操做僅僅在鍵值(key-value)對RDD上可用。最多見的是分佈式 「shuffle」 操做,例如根據一個 key 對一組數據進行分組和聚合。

在 Scala 中,這些操做在包含二元組(Tuple2)(在語言的內建元組中,經過簡單的寫 (a, b) 建立) 的 RDD 上自動地變成可用的,只要在你的程序中導入 org.apache.spark.SparkContext._ 來啓用 Spark 的隱式轉換。在 PairRDDFunctions 的類裏鍵值對操做是可使用的,若是你導入隱式轉換它會自動地包裝成元組 RDD。

例如,下面的代碼在鍵值對上使用 reduceByKey 操做來統計在一個文件裏每一行文本內容出現的次數:

  1. val lines = sc.textFile("data.txt")
  2. val pairs = lines.map(s => (s, 1))
  3. val counts = pairs.reduceByKey((a, b) => a + b)

咱們也可使用 counts.sortByKey(),例如,將鍵值對按照字母進行排序,最後 counts.collect() 把它們做爲一個對象數組帶回到驅動程序。

3.3.3 常見的轉化操做

下面的表格列了 Spark 支持的一些經常使用 transformations。

轉化操做 含義
map(func) 返回一個新的分佈式數據集,將數據源的每個元素傳遞給函數 func 映射組成
filter(func) 返回一個新的數據集,從數據源中選中一些元素經過函數 func 返回 true
flatMap(func) 相似於 map,可是每一個輸入項能被映射成多個輸出項(因此 func 必須返回一個 Seq,而不是單個 item)。
mapPartitions(func) 相似於 map,可是分別運行在 RDD 的每一個分區上,因此 func 的類型必須是 Iterator=> Iterator 當運行在類型爲 T 的 RDD 上。
mapPartitionsWithIndex(func) 相似於 mapPartitions,可是 func 須要提供一個 integer 值描述索引(index),因此 func 的類型必須是 (Int, Iterator) => Iterator 當運行在類型爲 T 的 RDD 上。
sample(withReplacement, fraction, seed) 對數據進行採樣。
union(otherDataset) 生成一個包含兩個RDD中全部元素的RDD
intersection(otherDataset) 返回兩個RDD共同的元素的RDD
distinct([numTasks])) 對RDD進行去重
groupByKey([numTasks]) 對具備相同鍵的值進行分組
reduceByKey(func, [numTasks]) 合併具備相同鍵的值
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey函數對PairRDD中相同Key的值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值
sortByKey([ascending], [numTasks]) 返回一個根據鍵升序或降序排序的RDD
join(otherDataset, [numTasks]) 對兩個RDD具備相同鍵的鍵值對進行內鏈接.
cogroup(otherDataset, [numTasks]) 對兩個RDD中擁有的相同鍵的數據分組到一塊兒
cartesian(otherDataset) 與另外一個RDD的笛卡爾積
pipe(command, [envVars]) 對RDD中的元素經過腳本管道執行腳本
coalesce(numPartitions, [shuffle]) 該函數用於將RDD進行重分區,使用HashPartitioner。第一個參數爲重分區的數目,第二個爲是否進行shuffle,默認爲false;
repartition(numPartitions) 對RDD數據集進行重分區操做,該函數其實就是coalesce函數第二個參數爲true的實現

表1. 對一個數據爲{1, 2, 3, 3}的RDD進行基本的RDD轉化操做

函數名 示例 結果
map() rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() rdd.filter(x => x != 1) {2, 3, 3}
distinct rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, seed) rdd.sample(false, 0.5) 非肯定

接下來,咱們在Spark shell裏驗證一下上述的幾個操做,完整的代碼以下:

  1. scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
  2. mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
  3. scala> mRDD.map(x => x + 1).collect()
  4. res30: Array[Int] = Array(2, 3, 4, 4)
  5. scala> mRDD.flatMap(x => x.to(3)).collect()
  6. res31: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
  7. scala> mRDD.filter(x => x != 1).collect()
  8. res32: Array[Int] = Array(2, 3, 3)
  9. scala> mRDD.distinct().collect()
  10. res33: Array[Int] = Array(1, 2, 3)
  11. scala> mRDD.sample(false, 0.5).collect()
  12. res35: Array[Int] = Array()
  13. scala> mRDD.sample(false, 0.5).collect()
  14. res36: Array[Int] = Array(1, 2, 3, 3)
  15. scala> mRDD.sample(false, 0.5).collect()
  16. res37: Array[Int] = Array(1, 3)

上述的幾個轉化操做中,除sample轉化操做每次返回不固定的元素,其餘幾個轉化操做的結果都是可預期的。上述的例子中,咱們爲了把結果給打印出來,咱們調用了 collect() 行動操做。

表2. 對數據分別爲 {1, 2, 3} 和 {3, 4, 5} 的RDD進行鍼對兩個RDD的轉化操做

函數名 示例 結果
union() rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() rdd.intersection(other) {3}
subtract() rdd.subtract(other) {1, 2}
cartesian() rdd.cartesian(other) {(1, 3), (1, 4), … (3, 5)}

一樣的,咱們在Spark shell裏驗證一下上述的幾個操做,完整的代碼以下:

  1. scala> val firstRDD = sc.parallelize(Array(1, 2, 3))
  2. firstRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
  3. scala> val secondRDD = sc.parallelize(Array(3, 4, 5))
  4. secondRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
  5. scala> firstRDD.union(secondRDD).collect()
  6. res26: Array[Int] = Array(1, 2, 3, 3, 4, 5)
  7. scala> firstRDD.intersection(secondRDD).collect()
  8. res27: Array[Int] = Array(3)
  9. scala> firstRDD.subtract(secondRDD).collect()
  10. res28: Array[Int] = Array(1, 2)
  11. scala> firstRDD.cartesian(secondRDD).collect()
  12. res29: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
3.3.4 常見的行動操做

下面的表格列了 Spark 支持的一些經常使用 actions。詳細內容請參閱 RDD API 文檔(ScalaJavaPython) 和 PairRDDFunctions 文檔(ScalaJava)。

行動操做 含義
reduce(func) 並行整合RDD中全部數據
collect() 返回RDD中的全部元素
count() 返回RDD中的元素個數
first() 返回RDD中的第一個元素
take(n) 從RDD中返回n個元素
takeSample(withReplacement, num, [seed]) 從RDD中返回任意一些元素
takeOrdered(n, [ordering]) 從RDD中按照提供的順序返回最前面的n個元素
saveAsTextFile(path) 把RDD中的元素寫入本地文件系統或者HDFS文件系統
saveAsSequenceFile(path) (Java and Scala) 把RDD中的元素寫入本地文件系統或HDFS文件系統的seqfile
saveAsObjectFile(path) (Java and Scala) 經過Java序列化把RDD中的元素寫入文件系統(本地或HDFS),經過SparkContext.objectFile()加載
countByKey() RDD中各元素的Key出現的次數
foreach(func) 對RDD中的每一個元素使用給定的函數

表3,對一個數據爲 {1, 2, 3, 3} 的RDD進行基本的RDD行動操做:

函數名 示例 結果
collect() rdd.collect() {1, 2, 3, 3}
count() rdd.count() 4
countByValue() rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) rdd.take(2) {1, 2}
top(num) rdd.top(2) {3, 3}
takeOrdered(num)(ordering) rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(num) rdd.takeSample(false, 1) 非肯定的
reduce(num) rdd.reduce((x, y) = > x + y) 9
fold(num) rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) rdd.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) (9, 4)
foreach(func) rdd.foreach(func)

咱們把上述示例的幾個行動操做在Spark shell中驗證一下,結果以下:

  1. scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
  2. mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at parallelize at <console>:24
  3. scala> mRDD.collect()
  4. res41: Array[Int] = Array(1, 2, 3, 3)
  5. scala> mRDD.count()
  6. res42: Long = 4
  7. scala> mRDD.countByValue()
  8. res43: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
  9. scala> mRDD.take(2)
  10. res44: Array[Int] = Array(1, 2)
  11. scala> mRDD.top(2)
  12. res45: Array[Int] = Array(3, 3)
  13. scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
  14. myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@5dc5e110
  15. scala> mRDD.takeOrdered(2)(myOrd)
  16. res99: Array[Int] = Array(3, 3)
  17. scala> mRDD.takeSample(false, 1)
  18. res53: Array[Int] = Array(3)
  19. scala> mRDD.takeSample(false, 1)
  20. res54: Array[Int] = Array(2)
  21. scala> mRDD.takeSample(false, 1)
  22. res55: Array[Int] = Array(3)
  23. scala> mRDD.reduce((x,y) => x+y)
  24. res56: Int = 9
  25. scala> mRDD.fold(0)((x,y) => x+y)
  26. res57: Int = 9
  27. scala> mRDD.aggregate((0, 0))((x,y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
  28. res58: (Int, Int) = (9,4)
  29. scala> mRDD.foreach(println)
  30. 3
  31. 2
  32. 3
  33. 1

備註:上述的takeSample操做爲從RDD中返回任意一個元素,因此每次返回的結果都有可能不一樣。

3.4 RDD持久化

Spark最重要的一個功能是它能夠經過各類操做(operations)持久化(或者緩存)一個集合到內存中。當你持久化一個RDD的時候,每個節點都將參與計算的全部分區數據存儲到內存中,而且這些數據能夠被這個集合(以及這個集合衍生的其餘集合)的動做(action)重複利用。這個能力使後續的動做速度更快(一般快10倍以上)。對應迭代算法和快速的交互使用來講,緩存是一個關鍵的工具。

你能經過 persist() 或者 cache() 方法持久化一個 RDD 。首先,在action中計算獲得RDD;而後,將其保存在每一個節點的內存中。Spark的緩存是一個容錯的技術-若是RDD的任何一個分區丟失,它 能夠經過原有的轉換(transformations)操做自動的重複計算而且建立出這個分區。

此外,咱們能夠利用不一樣的存儲級別存儲每個被持久化的RDD。例如,它容許咱們持久化集合到磁盤上、將集合做爲序列化的Java對象持久化到內存中、在節點間複製集合或者存儲集合到Tachyon中。咱們能夠經過傳遞一個 StorageLevel 對象給 persist() 方法設置這些存儲級別。cache() 方法使用了默認的存儲級別— StorageLevel.MEMORY_ONLY。完整的存儲級別介紹以下所示:

存儲級別 含義
MEMORY_ONLY 將RDD做爲非序列化的Java對象存儲在jvm中。若是RDD不適合存在內存中,一些分區將不會被緩存,從而在每次須要這些分區時都需從新計算它們。這是系統默認的存儲級別。
MEMORY_AND_DISK 將RDD做爲非序列化的Java對象存儲在jvm中。若是RDD不適合存在內存中,將這些不適合存在內存中的分區存儲在磁盤中,每次須要時讀出它們。
MEMORY_ONLY_SER 將RDD做爲序列化的Java對象存儲(每一個分區一個byte數組)。這種方式比非序列化方式更節省空間,特別是用到快速的序列化工具時,可是會更耗費cpu資源—密集的讀操做。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER相似,但不是在每次須要時重複計算這些不適合存儲到內存中的分區,而是將這些分區存儲到磁盤中。
DISK_ONLY 僅僅將RDD分區存儲到磁盤中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的存儲級別相似,可是複製每一個分區到集羣的兩個節點上面
OFF_HEAP (experimental) 以序列化的格式存儲RDD到Tachyon中。相對於MEMORY_ONLY_SER,OFF_HEAP減小了垃圾回收的花費,容許更小的執行者共享內存池。這使其在擁有大量內存的環境下或者多併發應用程序的環境中具備更強的吸引力。

Spark也會自動持久化一些shuffle操做(如 reduceByKey)中的中間數據,即便用戶沒有調用 persist 方法。這樣的好處是避免了在shuffle出錯狀況下,須要重複計算整個輸入。若是用戶計劃重用 計算過程當中產生的RDD,咱們仍然推薦用戶調用 persist 方法。

3.4.1 如何選擇存儲級別

Spark的多個存儲級別意味着在內存利用率和cpu利用效率間的不一樣權衡。咱們推薦經過下面的過程選擇一個合適的存儲級別:

  • 若是你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。由於這是cpu利用率最高的選項,會使RDD上的操做盡量的快。

  • 若是不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提升對象的空間使用率,可是仍可以至關快的訪問。

  • 除非函數計算RDD的花費較大或者它們須要過濾大量的數據,不要將RDD存儲到磁盤上,不然,重複計算一個分區就會和重磁盤上讀取數據同樣慢。

  • 若是你但願更快的錯誤恢復,能夠利用重複(replicated)存儲級別。全部的存儲級別均可以經過重複計算丟失的數據來支持完整的容錯,可是重複的數據可以使你在RDD上繼續運行任務,而不須要重複計算丟失的數據。

  • 在擁有大量內存的環境中或者多應用程序的環境中,OFF_HEAP具備以下優點:

    • 它運行多個執行者共享Tachyon中相同的內存池
    • 它顯著地減小垃圾回收的花費
    • 若是單個的執行者崩潰,緩存的數據不會丟失
3.4.2 刪除數據

Spark自動的監控每一個節點緩存的使用狀況,利用最近最少使用原則刪除老舊的數據。若是你想手動的刪除RDD,可使用 RDD.unpersist() 方法

4 共享變量

通常狀況下,當一個傳遞給Spark操做(例如map和reduce)的函數在遠程節點上面運行時,Spark操做實際上操做的是這個函數所用變量的一個獨立副本。這些變量被複制到每臺機器上,而且這些變量在遠程機器上的全部更新都不會傳遞迴驅動程序。一般跨任務的讀寫變量是低效的,可是,Spark仍是爲兩種常見的使用模式提供了兩種有限的共享變量:廣播變量(broadcast variable)和累加器(accumulator)

4.1 廣播變量

廣播變量容許程序員緩存一個只讀的變量在每臺機器上面,而不是每一個任務保存一份拷貝。例如,利用廣播變量,咱們可以以一種更有效率的方式將一個大數據量輸入集合的副本分配給每一個節點。Spark也嘗試着利用有效的廣播算法去分配廣播變量,以減小通訊的成本。

一個廣播變量能夠經過調用 SparkContext.broadcast(v) 方法從一個初始變量v中建立。廣播變量是v的一個包裝變量,它的值能夠經過value方法訪問,下面的代碼說明了這個過程:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(87)
  3. scala> broadcastVar.value
  4. res114: Array[Int] = Array(1, 2, 3)

廣播變量建立之後,咱們就可以在集羣的任何函數中使用它來代替變量v,這樣咱們就不須要再次傳遞變量v到每一個節點上。另外,爲了保證全部的節點獲得廣播變量具備相同的值,對象v不能在廣播以後被修改。

4.2 累加器

顧名思義,累加器是一種只能經過關聯操做進行「加」操做的變量,所以它可以高效的應用於並行操做中。它們可以用來實現 counters 和 sums。Spark原生支持數值類型的累加器,開發者能夠本身添加支持的類型。若是建立了一個具名的累加器,它能夠在spark的UI中顯示。這對於理解運行階段(running stages)的過程有很重要的做用。

一個累加器能夠經過調用 SparkContext.accumulator(v) 方法從一個初始變量v中建立。運行在集羣上的任務能夠經過 add 方法或者使用 += 操做來給它加值。然而,它們沒法讀取這個值。只有驅動程序可使用 value 方法來讀取累加器的值。以下代碼,展現瞭如何利用累加器將一個數組裏面的全部元素相加:

  1. scala> val accum = sc.accumulator(0, "My Accumulator")
  2. accum: org.apache.spark.Accumulator[Int] = 0
  3. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  4. scala> accum.value
  5. res2: Int = 10

這個例子利用了內置的整數類型累加器。開發者能夠利用子類 AccumulatorParam 建立本身的 累加器類型。AccumulatorParam 接口有兩個方法:zero 方法爲你的數據類型提供一個「0 值」(zero value);addInPlace 方法計算兩個值的和。例如,假設咱們有一個Vector 類表明數學上的向量,咱們可以定義以下累加器:

  1. object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  2. def zero(initialValue: Vector): Vector = {
  3. Vector.zeros(initialValue.size)
  4. }
  5. def addInPlace(v1: Vector, v2: Vector): Vector = {
  6. v1 += v2
  7. }
  8. }
  9. // Then, create an Accumulator of this type:
  10. val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在scala中,Spark支持用更通常的Accumulable接口來累積數據-結果類型和用於累加的元素類型 不同(例如經過收集的元素創建一個列表)。Spark也支持用 SparkContext.accumulableCollection 方法累加通常的scala集合類型。

5. 總結

簡單介紹了Spark核心概念和RDD操做,經過這些基本的轉化操做和行動操做,就能夠進行簡單的Spark應用開發。

相關文章
相關標籤/搜索