spark總結

 

RDD及其特色

一、RDD是Spark的核心數據模型,可是個抽象類,全稱爲Resillient Distributed Dataset,即彈性分佈式數據集。java

二、RDD在抽象上來講是一種元素集合,包含了數據。它是被分區的,分爲多個分區,每一個分區分佈在集羣中的不一樣節點上,從而讓RDD中的數據能夠被並行操做。(分佈式數據集)算法

三、RDD一般經過Hadoop上的文件,即HDFS文件或者Hive表,來進行建立;有時也能夠經過應用程序中的集合來建立。sql

四、RDD最重要的特性就是,提供了容錯性,能夠自動從節點失敗中恢復過來。即若是某個節點上的RDDpartition,由於節點故障,致使數據丟了,那麼RDD會自動經過本身的數據來源從新計算該partition。這一切對使用者是透明的。apache

五、RDD的數據默認狀況下存放在內存中的,可是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)編程

建立RDD

進行Spark核心編程的第一步就是建立一個初始的RDD。該RDD,一般就表明和包含了Spark應用程序的輸入源數據。而後經過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其餘的RDD。數組

Spark Core提供了三種建立RDD的方式:緩存

1.使用程序中的集合建立RDD(主要用於測試)網絡

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件建立RDD(主要用於臨時性處理有大量數據的文件)app

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();

3.使用HDFS文件建立RDD(生產環境的經常使用方式)分佈式

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件建立RDD對比使用本地文件建立RDD,須要修改的,只有兩個地方:
第一,將SparkSession對象的master("local")方法去掉
第二,咱們針對的不是本地文件了,修改成hadoop hdfs上的真正的存儲大數據的文件

操做RDD

Spark支持兩種RDD操做:transformation和action。

transformation操做

transformation操做會針對已有的RDD建立一個新的RDD。transformation具備lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所作的操做,不會自發的執行。只有執行了一個action,以前的全部transformation纔會執行。

經常使用的transformation介紹:

map :將RDD中的每一個元素傳人自定義函數,獲取一個新的元素,而後用新的元素組成新的RDD。

filter:對RDD中每一個元素進行判斷,若是返回true則保留,返回false則剔除。

flatMap:與map相似,可是對每一個元素均可以返回一個或多個元素。

groupByKey:根據key進行分組,每一個key對應一個Iterable<value>。

reduceByKey:對每一個key對應的value進行reduce操做。

sortByKey:對每一個key對應的value進行排序操做。

join:對兩個包含<key,value>對的RDD進行join操做,每一個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,可是每一個key對應的Iterable<value>都會傳入自定義函數進行處理。

sparkRDD算子:
map與flatmap的區別:扁平化
map函數:會對每一條輸入進行指定func操做,而後爲每一條輸入返回一個對象
flatmap函數:先進行map映射,而後在flatten(進行扁平化操做)

reducebykey算子:
首先會觸發shuffle,會進行兩次聚合操做
1,按照key將數據放到一塊兒(本地聚合--Shuffle Write)
2,將相同key的數據聚合(全局聚合--Shuffle Reader)

action操做

action操做主要對RDD進行最後的操做,好比遍歷,reduce,保存到文件等,並能夠返回結果給Driver程序。action操做執行,會觸發一個spark job的運行,從而觸發這個action以前全部的transformation的執行,這是action的特性。

經常使用的action介紹:

reduce:將RDD中的全部元素進行聚合操做。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中全部元素獲取到本地客戶端(通常不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

 

saveAsTextFile:將RDD元素保存到文件中,對每一個元素調用toString方法。

countByKey:對每一個key對應的值進行count計數。

foreach:遍歷RDD中的每一個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法便可。在該RDD第一次被計算出來時,就會直接緩存在每一個節點中。可是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等建立了一個RDD以後,直接連續調用cache()或persist()才能夠。

若是你先建立一個RDD,而後單獨另起一行執行cache()或persist()方法,是沒有用的,並且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是爲了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

一、優先使用MEMORY_ONLY,若是能夠緩存全部數據的話,那麼就使用這種策略。由於純內存速度最快,並且沒有序列化,不須要消耗CPU進行反序列化操做。

二、若是MEMORY_ONLY策略,沒法存儲全部數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操做仍是很是快,只是要消耗CPU進行反序列化。

三、若是須要進行快速的失敗恢復,那麼就選擇帶後綴爲_2的策略,進行數據的備份,這樣在失敗時,就不須要從新計算了。

四、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如從新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅爲每一個節點拷貝一份,更大的用處是優化性能,減小網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value)  //廣播變量讀值broadcastVars.value

Accumulator則可讓多個task共同操做一份變量,主要能夠進行累加操做。task只能對Accumulator進行累加操做,不能讀取它的值。只有Driver程序能夠讀取Accumulator的值。

 
val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

 

小案例

案例需求:

一、對文本文件內的每一個單詞都統計出其出現的次數。
二、按照每一個單詞出現次數的數量,降序排序。

步驟:

  • 1.建立RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

scala編寫:

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("WordCount").master("local").getOrCreate()
    val lines = spark.sparkContext.textFile("D:\\spark.txt")
    val words = lines.flatMap{line => line.split(" ")}
    val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
    val countWord = wordCounts.map{word =>(word._2,word._1)}
    val sortedCountWord = countWord.sortByKey(false)
    val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
    sortedWordCount.foreach(s=>
    {
      println("word \""+s._1+ "\" appears "+s._2+" times.")
    })
    spark.stop()
  }
}

  

小案例實戰2

需求:

一、按照文件中的第一列排序。
二、若是第一列相同,則按照第二列排序。

實現步驟:

    • 一、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現本身對多個列的排序算法
    • 二、將包含文本的RDD,映射成key爲自定義key,value爲文本的JavaPairRDD(map)
    • 三、使用sortByKey算子按照自定義的key進行排序(sortByKey)
    • 四、再次映射,剔除自定義的key,只保留文本行(map)
    • 五、打印輸出(foreach)

scala編寫:

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
  override def compare(that: SecondSortKey): Int = {
    if(this.first - that.first !=0){
      this.first-that.first
    }else{
      this.second-that.second
    }
  }
}
object SecondSort {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
    val lines = spark.sparkContext.textFile("D:\\sort.txt")
    val pairs = lines.map{line => (
      new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
    )}
    val sortedParis = pairs.sortByKey()
    val sortedLines = sortedParis.map(pairs => pairs._2)
    sortedLines.foreach(s => println(s))
    spark.stop()
  }
}

  

小案例實戰3

需求:

對每一個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.建立初始RDD

2.對初始RDD的文本行按空格分割,映射爲key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.打印輸出

如下是使用scala實現:

object GroupTop3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
    //建立初始RDD
    val lines = spark.sparkContext.textFile("D:\\score.txt")
    //對初始RDD的文本行按空格分割,映射爲key-value鍵值對
    val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
    //對pairs鍵值對按鍵分組
    val groupedPairs = pairs.groupByKey()
    //獲取分組後每組前3的成績
    val top3Score = groupedPairs.map(classScores => {
      var className = classScores._1
      //獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
      var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
      Tuple2(className,top3)
    })
    top3Score.foreach(m => {
      println(m._1)
      for(s <- m._2) println(s)
      println("------------------")
    })
  }
}

  以上三個小案例都用Scala實現了,用到了Scala中的集合的操做、高階函數、鏈式調用、隱式轉換等知識,本身動手實現,對Scala有個比較好的理解和掌握。

相關文章
相關標籤/搜索