Spark學習筆記2:RDD編程

 經過一個簡單的單詞計數的例子來開始介紹RDD編程。java

import org.apache.spark.{SparkConf, SparkContext}

object word {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("word")
    val sc = new SparkContext(conf)
    val input = sc.parallelize(List("spark core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" "))
    val counts = words.map(word => (word,1)).reduceByKey{case (x,y) => x + y}
    counts.foreach(println)
  }

}

 使用Scala語言,IDE使用IntelliJ IDEA 。在IDEA上運行Spark應用須要添加Maven依賴。python

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.3</version>
</dependency>

運行結果:apache

 

運行Spark應用,首先須要導入Spark包,這裏使用Maven來鏈接公共倉庫中的Spark包。編程

接下來建立一個SparkConf來配置應用,這裏使用local即本地模式,方便調試代碼。緩存

而後基於這個SparkConf建立一個SparkContext對象。分佈式

有了SparkContext之後,就能夠來訪問Spark,建立RDD。sc.parallelize建立了一個Seq對象 RDD,能夠對這個RDD進行操做。flatmap和map就是對這個RDD進行的一些操做完成單詞計數,這些操做接下來將進行解釋,最後將結果在後臺顯示出來。這樣一個簡單的Spark應用就完成了。函數

 

RDD編程spa

前一章介紹過 RDD即彈性分佈式數據集,是Spark對數據的核心抽象,RDD就是分佈式的元素集合。scala

Spark對數據的操做無外乎建立RDD,轉化已有RDD以及調用RDD操做進行求值。Spark會自動將RDD中的數據分發到集羣上,並將操做並行化執行。調試

  • 建立RDD

可使用兩種方法建立RDD:讀取一個外部數據集,或在驅動器程序裏分發驅動器程序中的對象集合。

例如:SparkContext.textFile()用來讀取文本文件做爲一個字符串RDD。

    lines = sc.textFile("input.txt")

還能夠把程序中一個已有的集合傳給SparkContext的parallelize()方法 如咱們上面單詞計數例子裏的那樣。

  • RDD操做

RDD建立出來後,能夠進行兩種類型的操做:轉化操做和行動操做

a、轉化操做

RDD的轉化操做是返回新的RDD的操做,轉化出來的RDD是惰性求值的,只有在行動操做中用到這些RDD時纔會被計算。

常見轉化操做有:

  • map()操做用於接收一個函數,把這個函數用於RDD中的每一個元素,將函數的返回結果做爲結果RDD中對應元素的值。如 val counts = words.map(word =>(word,1)) 將每個單詞單詞映射成鍵值對。
  • filter()接受函數,將RDD中知足該函數的元素放入新的RDD中返回。
  • flatMap()被分別應用到輸入RDD的每一個元素上,返回的是一個包含各個迭代器可訪問的全部元素的RDD。能夠用於將輸入的字符串切分紅單詞。
  • 集合操做:distinct() 生成只包含不一樣元素的新RDD, union(other)返回一個包含兩個RDD中全部元素的RDD , intersection(other)只返回兩個RDD中都有的元素,subtract(other)返回只存在於第一個RDD而不存在於第二個RDD中的元素,cartesian(other)笛卡兒積,返回全部可能的(a,b)對,a,b分別來自兩個不一樣的RDD。

b、行動操做

行動操做把最終求得的結果返回到驅動器程序或者寫入外部存儲系統中。

常見的行動操做有:

  • count()用來返回計數結果
  • take(n)用來收集RDD中的n個元素
  • collect()用來獲取整個RDD中的元素
  • reduce()用來接收一個函數做爲參數,這個函數要操做兩個相同元素類型的RDD數據並返回一個一樣類型的新元素。 如 val sum = rdd.reduce( (x,y) => x+ y ) 用於計算輸入集合中元素的和。
  • fold() 和reduce()的區別是須要指定一個初始值做爲每一個分區第一次調用的結果。如 val sum = rdd.fold(1)(_ + _)
  • aggregate()函數不一樣於fold()和reduce(),不要求返回值類型必須與所操做的RDD類型相同,使用aggregate()時須要提供返回類型的初始值。使用aggregate()計算RDD平均值以下:

  val  input = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))

  val result = input.aggregate((0,0))(

    (acc , value) => (acc._1 + value , acc._2 + 1)

    (part1 , part2) => (part1._1 + part2._1 , part1._2 + part2._2))

  val avg = result._1 / result._2.toDouble

  aggregate()函數計算過程:

  value是input中的值 , 即 1 到 10

  在分佈式計算中,將input分紅多個分區,假設3個分區 ,分別計算  (1,2,3)  (4,5,6) (7,8,9,10)

  第一個分區用(acc , value) => (acc._1 + value , acc._2 + 1) 計算過程,初始值爲0:

    0 + 1 , 0 + 1 

    1 + 2 , 1 + 1

    2 + 3  ,   2 + 1

 最終這個分區獲得 (6,3) 即表示和爲6,有3個元素。一樣,另外兩個分區獲得(15,3) (34,4)

   接着調用(part1 , part2) => (part1._1 + part2._1 , part1._2 + part2._2)) 這一步  

   將三個分區的和以及元素個數加起來獲得(55,10)

  • 持久化緩存

Spark在每次調用行動操做時都會重算RDD以及它的全部依賴,爲了不屢次計算同一個RDD,可讓Spark對數據進行持久化。

持久化數據所在的節點發生故障時,Spark會在須要用到緩存的數據時重算丟失的數據分區。最好的作法是將數據備份到多個節點上。

使用persist()用來把數據以序列化的形式緩存,默認狀況下會緩存在JVM的堆空間中。若是緩存的數據太多,內存中放不下,Spark會自動利用最近最少使用的緩存策略把最老的分區從內存中移除。

unpersist()用於手動把持久化的RDD從緩存中移除。

相關文章
相關標籤/搜索