走近RDD

  RDD(Resilient Distributed Datasets)彈性分佈式數據集。RDD能夠當作是一個簡單的"數組",對其進行操做也只須要調用有限的"數組"中的方法便可,但它與通常數組的區別在於:RDD是分佈式存儲,能夠跟好的利用現有的雲數據平臺,並在內存中進行。此處的彈性指的是數據的存儲方式,及數據在節點中進行存儲的時候,既可使用內存也可使用磁盤。此外,RDD還具備很強的容錯性,在spark運行計算的過程當中,不會由於某個節點錯誤而使得整個任務失敗;不通節點中併發運行的數據,若是在某個節點發生錯誤時,RDD會自動將其在不一樣的節點中重試。apache

  RDD一大特性是延遲計算,即一個完整的RDD運行任務被分紅2部分:Transformation和Action。數組

  Transformation用於對RDD的建立。在spark中,RDD只能使用Transformation來建立,同時Transformation還提供了大量的操做方法。RDD還能夠利用Transformation來生成新的RDD,這樣能夠在有限的內存空間中生成竟可能多的數據對象。不管發生了多少次Transformation,此時,在RDD中真正數據計算運行的操做Action都沒真正的開始運行。併發

 

 

  Action是數據的執行部分,其也提供了大量的方法去執行數據的計算操做部分。分佈式

 

   RDD能夠將其當作一個分佈在不一樣節點中的分佈式數據集,並將數據以數據塊(Block)的形式存儲在各個節點的計算機中。每一個BlockMaster管理着若干個BlockSlave,而每一個BlockSlave又管理着若干個BlockNode。當BlockSlave得到了每一個Node節點的地址,又會反向向BlockMaster註冊每一個Node的基本信息,這樣就造成了分層管理。ide

 

  RDD依賴spa

    窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每一個分區都只被子 RDD 的一個分區所使用,例如map、filter。相應的,那麼寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴,例如groupByKey、reduceByKey等操做。若是父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,不然的話就是寬依賴。
  這種劃分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基於一對一的關係,能夠在 filter 以後執行 map 。其次,窄依賴支持更高效的故障還原。由於對於窄依賴,只有丟失的父 RDD 的分區須要從新計算。而對於寬依賴,一個結點的故障可能致使來自全部父 RDD 的分區丟失,所以就須要徹底從新執行。所以對於寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出同樣。對於join操做有兩種狀況,若是join操做的使用每一個partition僅僅和已知的Partition進行join,此時的join操做就是窄依賴;其餘狀況的join操做就是寬依賴;由於是肯定的Partition數量的依賴關係,因此就是窄依賴,得出一個推論,窄依賴不只包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨着RDD數據規模的改變而改變)
                
 
   下面就是RDD API
  一、parallelize
   def parallelize[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ }) //第一個參數是數據,同時還有一個帶有默認數值的參數,改參數爲1,該參數表示的是將數據分佈在多少個數據節點中存放。
  二、aggregate
   def aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]) //seqOp 是給定的計算方法,combOp 是合併方法,將第一個計算方法得出的結果與源碼中的zeroValue進行合併。實例:
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr=sc.parallelize(Array(1,2,3,4,5,6,7,8))//parallelize將內存數據讀入Spark系統中,做爲總體數據集
    val result=arr.aggregate(0)(math.max(_,_),_+_)//_+_ 對傳遞的第一個方法的結果集進行進一步處理
    println(result)
  }
}

結果爲8scala

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr=sc.parallelize(Array("abd","hello world","hello sb"))//parallelize將內存數據讀入Spark系統中,做爲總體數據集
    val result=arr.aggregate("")((value,word)=>value+word,_+_)//_+_ 對傳遞的第一個方法的結果集進行進一步處理
    println(result)
  }
}

結果爲abdhello worldhello sbcode

  三、cache是將數據內容計算並保存在計算節點的內存中orm

  四、cartesion是用於對不一樣的數組進行笛卡爾操做,要求是數組的長度必須相同對象

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array(1,2,3,4))//parallelize將內存數據讀入Spark系統中,做爲總體數據集
    val arr2=sc.parallelize(Array(4,3,2,1))
    val res=arr1.cartesian(arr2)
    res.foreach(print)
  }
}

結果:(1,4)(1,3)(1,2)(1,1)(2,4)(2,3)(2,2)(2,1)(3,4)(3,3)(3,2)(3,1)(4,4)(4,3)(4,2)(4,1)

  五、Coalesce是將已經存儲的數據從新分片後再進行存儲(repartition與Coalesce相似)

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array(1,2,3,4,5,6))//parallelize將內存數據讀入Spark系統中,做爲總體數據集
    val arr2=arr1.coalesce(2,true)
    val res1=arr1.aggregate(0)(math.max(_,_),_+_)
    println(res1)
    val res2=arr2.aggregate(0)(math.max(_,_),_+_)
    println(res2)
  }
}

結果爲6    11

  六、countByValue是計算數據集中某個數據出現的個數,並將其以map的形式返回

  七、countByKey是計算數據集中元數據鍵值對key出現的個數

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array((1,"a"),(2,'b'),(1,'c'),(1,'d'),(2,'a')))//parallelize將內存數據讀入Spark系統中,做爲總體數據集
    val res1=arr1.countByValue()
    res1.foreach(println)
    val res2=arr1.countByKey()
    res2.foreach(println)
  }
}
//結果:((1,c),1)
((2,a),1)
((1,a),1)
((1,d),1)
((2,b),1)
(1,3)
(2,2)
View Code

  八、filter是對數據集進行過濾

  九、flatMap是對RDD中的數據進行總體操做的一個特殊方法,其在定義時就是針對數據集進行操做

  十、map能夠對RDD中的數據集進行逐個操做,其與flatmap不一樣得是,flatmap是將數據集中的數據做爲一個總體去處理,以後再對其中的數據作計算,而map則直接對數據集中的數據作單獨的處理

  十一、groupBy是將傳入的數據進行分組

  十二、keyBy是爲數據集中的每一個個體數據添加一個key,從而造成鍵值對

  1三、reduce同時對2個數據進行處理,主要是對傳入的數據進行合併處理

  1四、sortBy是對已有的RDD進行從新排序

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array((1,"a"),(2,"c"),(3,"b"),(4,"x"),(5,"f")))//parallelize將內存數據讀入Spark系統中,做爲總體數據集
    val res1=arr1.sortBy(word=>word._1,true)
    val res2=arr1.sortBy(word=>word._2,true)
    res1.foreach(println)
    res2.foreach(println)
  }
}

  1五、zip能夠將若干個RDD壓縮成一個新的RDD

相關文章
相關標籤/搜索