經過一個簡單的單詞計數的例子來開始介紹RDD編程。java
import org.apache.spark.{SparkConf, SparkContext} object word { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("word") val sc = new SparkContext(conf) val input = sc.parallelize(List("spark core scala python java spark scala")) val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word,1)).reduceByKey{case (x,y) => x + y} counts.foreach(println) } }
使用Scala語言,IDE使用IntelliJ IDEA 。在IDEA上運行Spark應用須要添加Maven依賴。python
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency>
運行結果:apache
運行Spark應用,首先須要導入Spark包,這裏使用Maven來鏈接公共倉庫中的Spark包。編程
接下來建立一個SparkConf來配置應用,這裏使用local即本地模式,方便調試代碼。緩存
而後基於這個SparkConf建立一個SparkContext對象。分佈式
有了SparkContext之後,就能夠來訪問Spark,建立RDD。sc.parallelize建立了一個Seq對象 RDD,能夠對這個RDD進行操做。flatmap和map就是對這個RDD進行的一些操做完成單詞計數,這些操做接下來將進行解釋,最後將結果在後臺顯示出來。這樣一個簡單的Spark應用就完成了。函數
RDD編程spa
前一章介紹過 RDD即彈性分佈式數據集,是Spark對數據的核心抽象,RDD就是分佈式的元素集合。scala
Spark對數據的操做無外乎建立RDD,轉化已有RDD以及調用RDD操做進行求值。Spark會自動將RDD中的數據分發到集羣上,並將操做並行化執行。調試
可使用兩種方法建立RDD:讀取一個外部數據集,或在驅動器程序裏分發驅動器程序中的對象集合。
例如:SparkContext.textFile()用來讀取文本文件做爲一個字符串RDD。
lines = sc.textFile("input.txt")
還能夠把程序中一個已有的集合傳給SparkContext的parallelize()方法 如咱們上面單詞計數例子裏的那樣。
RDD建立出來後,能夠進行兩種類型的操做:轉化操做和行動操做。
a、轉化操做
RDD的轉化操做是返回新的RDD的操做,轉化出來的RDD是惰性求值的,只有在行動操做中用到這些RDD時纔會被計算。
常見轉化操做有:
b、行動操做
行動操做把最終求得的結果返回到驅動器程序或者寫入外部存儲系統中。
常見的行動操做有:
val input = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
val result = input.aggregate((0,0))(
(acc , value) => (acc._1 + value , acc._2 + 1)
(part1 , part2) => (part1._1 + part2._1 , part1._2 + part2._2))
val avg = result._1 / result._2.toDouble
aggregate()函數計算過程:
value是input中的值 , 即 1 到 10
在分佈式計算中,將input分紅多個分區,假設3個分區 ,分別計算 (1,2,3) (4,5,6) (7,8,9,10)
第一個分區用(acc , value) => (acc._1 + value , acc._2 + 1) 計算過程,初始值爲0:
0 + 1 , 0 + 1
1 + 2 , 1 + 1
2 + 3 , 2 + 1
最終這個分區獲得 (6,3) 即表示和爲6,有3個元素。一樣,另外兩個分區獲得(15,3) (34,4)
接着調用(part1 , part2) => (part1._1 + part2._1 , part1._2 + part2._2)) 這一步
將三個分區的和以及元素個數加起來獲得(55,10)
Spark在每次調用行動操做時都會重算RDD以及它的全部依賴,爲了不屢次計算同一個RDD,可讓Spark對數據進行持久化。
持久化數據所在的節點發生故障時,Spark會在須要用到緩存的數據時重算丟失的數據分區。最好的作法是將數據備份到多個節點上。
使用persist()用來把數據以序列化的形式緩存,默認狀況下會緩存在JVM的堆空間中。若是緩存的數據太多,內存中放不下,Spark會自動利用最近最少使用的緩存策略把最老的分區從內存中移除。
unpersist()用於手動把持久化的RDD從緩存中移除。