大數據技術之_19_Spark學習_02_Spark Core 應用解析+ RDD 概念 + RDD 編程 + 鍵值對 RDD + 數據讀取與保存主要方式 + RDD 編程進階 + Spark ...

第1章 RDD 概念1.1 RDD 爲何會產生1.2 RDD 概述1.2.1 什麼是 RDD1.2.2 RDD 的屬性1.3 RDD 彈性1.4 RDD 特色1.4.1 分區1.4.2 只讀1.4.3 依賴1.4.4 緩存1.4.5 CheckPoint第2章 RDD 編程2.1 RDD 編程模型2.2 RDD 建立2.2.1 由一個已經存在的 Scala 集合建立,即集合並行化(測試用)2.2.2 由外部存儲系統的數據集建立(開發用)2.3 RDD 編程2.3.1 Transformation(轉換)2.3.2 Action(行動)2.3.3 數值 RDD 的統計操做2.3.4 向 RDD 操做傳遞函數注意事項2.3.5 在不一樣 RDD 類型間轉換2.4 RDD 持久化2.4.1 RDD 的緩存2.4.2 RDD 緩存方式2.5 RDD 檢查點機制2.5.1 checkpoint 寫流程2.5.2 checkpoint 讀流程2.6 RDD 的依賴關係2.6.1 窄依賴(Narrow Dependency)2.6.2 寬依賴(Wide Dependency)2.6.3 Lineage2.7 DAG 的生成2.8 RDD 相關概念關係2.9 Spark Core 實例練習第3章 鍵值對 RDD3.1 鍵值對 RDD 的轉化操做3.1.1 轉化操做列表3.1.2 聚合操做3.1.3 數據分組3.1.4 鏈接3.1.5 數據排序3.2 鍵值對 RDD 的行動操做3.3 鍵值對 RDD 的數據分區3.3.1 獲取 RDD 的分區方式3.3.2 Hash 分區方式3.3.3 Range 分區方式3.3.4 自定義分區方式(重點)3.3.5 分區 Shuffle 優化3.3.6 基於分區進行操做3.3.7 從分區中獲益的操做第4章 數據讀取與保存主要方式4.1 文本文件輸入輸出4.2 JSON 文件輸入輸出4.3 CSV 文件輸入輸出4.4 SequenceFile 文件輸入輸出4.5 對象文件輸入輸出4.6 Hadoop 輸入輸出格式4.7 文件系統的輸入輸出4.8 數據庫的輸入輸出第5章 RDD 編程進階5.1 累加器5.2 自定義累加器5.3 廣播變量第6章 Spark Core 實例練習6.1 計算獨立 IP 數6.2 統計每一個視頻獨立 IP 數6.3 統計一天中每一個小時間的流量附錄css


第1章 RDD 概念

1.1 RDD 爲何會產生

  RDD:Resilient Distributed Dataset 彈性分佈式數據集
  RDD 是 Spark 的基石,是實現 Spark 數據處理的核心抽象。那麼 RDD 爲何會產生呢?
  Hadoop 的 MapReduce 是一種基於數據集的工做模式,面向數據,這種工做模式通常是從存儲上加載數據集,而後操做數據集,最後寫入物理存儲設備。數據更多面臨的是一次性處理。
  MR 的這種方式對數據領域兩種常見的操做不是很高效。第一種是迭代式的算法。好比機器學習中 ALS、凸優化梯度降低等。這些都須要基於數據集或者數據集的衍生數據反覆查詢反覆操做。MR 這種模式不太合適,即便多 MR 串行處理,性能和時間也是一個問題。數據的共享依賴於磁盤。另一種是交互式數據挖掘,MR 顯然不擅長。
  MR 中的迭代:
  html


  Spark中的迭代:
  
  咱們須要一個效率很是快,且可以支持迭代計算和有效數據共享的模型,Spark 應運而生。RDD 是基於工做集的工做模式,更多的是面向工做流。
  可是不管是 MR 仍是 RDD 都應該具備相似位置感知、容錯和負載均衡等特性。

 

1.2 RDD 概述

1.2.1 什麼是 RDD

  RDD(Resilient Distributed Dataset)叫作分佈式數據集,是 Spark 中最基本的數據抽象,它表明一個不可變可分區(分片)裏面的元素可並行計算的集合(彈性)。在 Spark 中,對數據的全部操做不外乎建立 RDD、轉化已有 RDD 以及調用 RDD 操做進行求值。每一個 RDD 都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。RDD 能夠包含 Python、Java、Scala 中任意類型的對象,甚至能夠包含用戶自定義的對象。RDD 具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD 容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。
  RDD 支持兩種操做:轉化操做和行動操做。RDD 的轉化操做是返回一個新的 RDD 的操做,好比 map() 和 filter(),而行動操做則是向驅動器程序返回結果或把結果寫入外部系統的操做。好比 count() 和 first()。
  Spark 採用 惰性計算模式,RDD 只有第一次在一個行動操做中用到時,纔會真正計算。Spark 能夠優化整個計算過程。默認狀況下,Spark 的 RDD 會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個 RDD,可使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。java

1.2.2 RDD 的屬性

  1) 一組分片(Partition),即數據集的基本組成單位。對於 RDD 來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立 RDD 時指定 RDD 的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的 CPU Core 的數目。
  2) 一個計算每一個分區的函數。Spark 中 RDD 的計算是以分片爲單位的,每一個 RDD 都會實現 compute 函數以達到這個目的。compute 函數會對迭代器進行復合,不須要保存每次計算的結果。
  3) RDD 之間的依賴關係。RDD 的每次轉換都會生成一個新的 RDD,因此 RDD 之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark 能夠經過這個依賴關係從新計算丟失的分區數據,而不是對 RDD 的全部分區進行從新計算。
  4) 一個 Partitioner,即 RDD 的分片函數。當 前Spark 中實現了兩種類型的分片函數,一個是基於哈希的 HashPartitioner,另一個是基於範圍的 RangePartitioner。只有對於於 key-value 的 RDD,纔會有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函數不但決定了 RDD 自己的分片數量,也決定了Parent RDD Shuffle 輸出時的分片數量。
  5) 一個列表,存儲存取每一個 Partition 的優先位置(preferred location)。對於一個 HDFS 文件來講,這個列表保存的就是每一個 Partition 所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark 在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
  RDD 是一個應用層面的邏輯概念。一個 RDD 多個分片。RDD 就是一個元數據記錄集,記錄了 RDD 內存全部的關係數據。
  node

1.3 RDD 彈性


1) 自動進行內存和磁盤數據存儲的切換
  Spark 優先把數據放到內存中,若是內存放不下,就會放到磁盤裏面,程序進行自動的存儲切換。
2) 基於血統的高效容錯機制
  在 RDD 進行轉換和動做的時候,會造成 RDD 的 Lineage 依賴鏈,當某一個 RDD 失效的時候,能夠經過從新計算上游的 RDD 來從新生成丟失的 RDD 數據。
3) Task 若是失敗會自動進行特定次數的重試
  RDD 的計算任務若是運行失敗,會自動進行任務的從新計算,默認次數是 4 次。
4) Stage 若是失敗會自動進行特定次數的重試
  若是 Job 的某個 Stage 階段計算失敗,框架也會自動進行任務的從新計算,默認次數也是 4 次。
5) Checkpoint 和 Persist 可主動或被動觸發
  RDD 能夠經過 Persist 持久化將 RDD 緩存到內存或者磁盤,當再次用到該 RDD 時直接讀取就行。也能夠將 RDD 進行檢查點,檢查點會將數據存儲在 HDFS 中,該 RDD 的全部父 RDD 依賴都會被移除。
6) 數據調度彈性
  Spark 把這個 JOB 執行模型抽象爲通用的有向無環圖 DAG,能夠將多 Stage 的任務串聯或並行執行,調度引擎自動處理 Stage 的失敗以及 Task 的失敗。
7) 數據分片的高度彈性
  能夠根據業務的特徵,動態調整數據分片的個數,提高總體的應用執行效率。

 

  RDD 全稱叫作彈性分佈式數據集(Resilient Distributed Datasets),它是一種分佈式的內存抽象,表示一個只讀的記錄分區的集合,它只能經過其餘 RDD 轉換而建立,爲此,RDD 支持豐富的轉換操做(如 map, join, filter, groupby 等),經過這種轉換操做,新的 RDD 則包含了如何從其餘 RDDs 衍生所必需的信息,因此說 RDDs 之間是有依賴關係的。基於 RDDs 之間的依賴,RDDs 會造成一個有向無環圖 DAG,該 DAG 描述了整個流式計算的流程,實際執行的時候,RDD 是經過血緣關係(Lineage)一鼓作氣的,即便出現數據分區丟失,也能夠經過血緣關係重建分區,總結起來,基於 RDD 的流式計算任務可描述爲:從穩定的物理存儲(如分佈式文件系統)中加載記錄,記錄被傳入由一組肯定性操做構成的 DAG,而後寫回穩定存儲。另外 RDD 還能夠將數據集緩存到內存中,使得在多個操做之間能夠重用數據集,基於這個特色能夠很方便地構建迭代型應用(圖計算、機器學習等)或者交互式數據分析應用。能夠說 Spark 最初也就是實現 RDD 的一個分佈式系統,後面經過不斷髮展壯大成爲如今較爲完善的大數據生態系統,簡單來說,Spark-RDD 的關係相似於 Hadoop-MapReduce 關係。mysql

1.4 RDD 特色

  RDD 表示只讀的分區的數據集,對 RDD 進行改動,只能經過 RDD 的轉換操做,由一個 RDD 獲得一個新的 RDD,新的 RDD 包含了從其餘 RDD 衍生所必需的信息。RDDs 之間存在依賴,RDD 的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化 RDD 來切斷血緣關係。nginx

1.4.1 分區

  RDD 邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個 compute 函數獲得每一個分區的數據。若是 RDD 是經過已有的文件系統構建,則 compute 函數是讀取指定文件系統中的數據,若是 RDD 是經過其餘 RDD 轉換而來,則 compute 函數是執行轉換邏輯將其餘 RDD 的數據進行轉換。
  es6

1.4.2 只讀

以下圖所示,RDD 是隻讀的,要想改變 RDD 中的數據,只能在現有的 RDD 基礎上建立新的 RDD。
  web


  由一個 RDD 轉換到另外一個 RDD,能夠經過豐富的操做算子實現,再也不像 MapReduce 那樣只能寫 map 和 reduce 了,以下圖所示。
  
  RDD 的操做算子包括兩類,一類叫作 transformations,它是用來將 RDD 進行轉化,構建 RDD 的血緣關係;另外一類叫作 actions,它是用來觸發 RDD 的計算,獲得 RDD 的相關計算結果或者將 RDD 保存的文件系統中。下圖是 RDD 所支持的操做算子列表。
  

1.4.3 依賴

  RDDs 經過操做算子進行轉換,轉換獲得的新 RDD 包含了從其餘 RDDs 衍生所必需的信息,RDDs 之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs 之間分區是一一對應的,另外一種是寬依賴,下游 RDD 的每一個分區與上游 RDD (也稱之爲父 RDD)的每一個分區都有關,是多對多的關係。
  正則表達式


  經過 RDDs 之間的這種依賴關係,一個任務流能夠描述爲 DAG (有向無環圖),以下圖所示,在實際執行過程當中寬依賴對應於 Shuffle (圖中的 reduceByKey 和 join),窄依賴中的全部轉換操做能夠經過相似於管道的方式一鼓作氣執行(圖中 map 和 union 能夠一塊兒執行)。
  

1.4.4 緩存

  若是在應用程序中屢次使用同一個 RDD,能夠將該 RDD 緩存起來,該 RDD 只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該 RDD 的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用。以下圖所示,RDD-1 通過一系列的轉換後獲得 RDD-n 並保存到 hdfs,RDD-1 在這一過程當中會有個中間結果,若是將其緩存到內存,那麼在隨後的 RDD-1 轉換到 RDD-m 這一過程當中,就不會計算其以前的 RDD-0 了。
  算法

1.4.5 CheckPoint

  雖然 RDD 的血緣關係自然地能夠實現容錯,當 RDD 的某個分區數據失敗或丟失,能夠經過血緣關係重建。可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs 之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。爲此,RDD 支持 checkpoint 將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於 checkpoint 後的 RDD 不須要知道它的父 RDDs 了,它能夠從 checkpoint 處拿到數據。
  給定一個 RDD 咱們至少能夠知道以下幾點信息:
  一、分區數以及分區方式;
  二、由父 RDDs 衍生而來的相關依賴信息;
  三、計算每一個分區的數據,計算步驟爲:
    1)若是被緩存,則從緩存中取的分區的數據;
    2)若是被 checkpoint,則從 checkpoint 處恢復數據;
    3)根據血緣關係計算分區的數據。

第2章 RDD 編程

2.1 RDD 編程模型

  在 Spark 中,RDD 被表示爲對象,經過對象上的方法調用來對 RDD 進行轉換。通過一系列的 transformations 定義 RDD 以後,就能夠調用 actions 觸發 RDD 的計算,action 能夠是嚮應用程序返回結果(count, collect 等),或者是向存儲系統保存數據(saveAsTextFile 等)。在 Spark 中,只有遇到 action,纔會執行 RDD 的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。
  要使用 Spark,開發者須要編寫一個 Driver 程序,它被提交到集羣以調度運行 Worker,以下圖所示。Driver 中定義了一個或多個 RDD,並調用 RDD 上的 action,Worker 則執行 RDD 分區計算任務。
  


  Driver 和 Worker 內部示意圖:
  

2.2 RDD 建立

  在 Spark 中建立 RDD 的建立方式大概能夠分爲三種:從集合中建立 RDD;從外部存儲建立 RDD;從其餘 RDD 建立。

2.2.1 由一個已經存在的 Scala 集合建立,即集合並行化(測試用)

scala> val rdd1 = sc.parallelize(Array(12345678))

  Step一、而從集合中建立 RDD,Spark 主要提供了兩種函數:parallelize 和 makeRDD。咱們能夠先看看這兩個函數的聲明:

def parallelize[T: ClassTag](
        seq: Seq[T],
        numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](
        seq: Seq[T],
        numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](
        seq: Seq[(T, Seq[String])]): RDD[T]

  Step二、咱們能夠從上面看出 makeRDD 有兩種實現,並且第一個 makeRDD 函數接收的參數和 parallelize 徹底一致。其實第一種 makeRDD 函數實現是依賴了 parallelize 函數的實現,來看看 Spark 中是怎麼實現這個 makeRDD 函數的:

def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  parallelize(seq, numSlices)
}

  Step三、第一個 makeRDD 的實現能夠本身指定分區的數量,它的默認分區取決於 CPU 的核心總數,示例代碼以下:

scala> val rdd2 = sc.makeRDD(Array(123456))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> rdd2.partitions.size
res1: Int = 4       本博主的 hadoop102 虛擬機 CPU 配置的是 4 核

  追蹤默認分區的底層源碼以下:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
}

def defaultParallelism(): Int

override def defaultParallelism(): Int 
= backend.defaultParallelism()

override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

  Step四、咱們能夠看出,這個 makeRDD 函數徹底和 parallelize 函數一致。可是咱們得看看第二種 makeRDD 函數的具體實現了,它接收的參數類型是 Seq[(T, Seq[String])],Spark 文檔的說明是:

Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.

分發本地 Scala 集合以造成 RDD,每一個對象具備一個或多個位置首選項(Spark 節點的主機名)。 爲每一個集合項建立一個新分區。

  原來,這個函數還爲數據提供了位置信息,來看看咱們怎麼使用:

scala> val guigu1= sc.parallelize(List(123))
guigu1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21

scala> val guigu2 = sc.makeRDD(List(123))
guigu2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21

scala> val seq = List((1, List("slave01")),| (2, List("slave02")))
seq: List[(Int, List[String])] = List((1, List(slave01)),
 (2,List(slave02)))

scala> val guigu3 = sc.makeRDD(seq)
guigu3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23

scala> guigu3.preferredLocations(guigu3.partitions(1))
res26: Seq[String] = List(slave02)

scala> guigu3.preferredLocations(guigu3.partitions(0))
res27: Seq[String] = List(slave01)

scala> guigu1.preferredLocations(guigu1.partitions(0))
res28: Seq[String] = List()

  咱們能夠看到,makeRDD 函數有兩種實現,第一種實現其實徹底和 parallelize 一致;而第二種實現能夠爲數據提供位置信息,而除此以外的實現和 parallelize 函數也是一致的,以下:

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

  都是返回 ParallelCollectionRDD,並且這個 makeRDD 的實現不能夠本身指定分區的數量,而是固定爲 seq 參數的 size 大小。

  擴展知識:
RDD 的運行規劃圖

2.2.2 由外部存儲系統的數據集建立(開發用)

包括本地的文件系統,還有全部 Hadoop 支持的數據集,好比 HDFS、Cassandra、HBase 等。

scala> val atguigu = sc.textFile("hdfs://hadoop102:9000/RELEASE")
atguigu: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

2.3 RDD 編程

  RDD 通常分爲數值 RDD 和鍵值對 RDD,本章不進行具體區分,先統一來看,下一章會對鍵值對 RDD 作專門說明。

2.3.1 Transformation(轉換)

  RDD 中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給 Driver
的動做時,這些轉換纔會真正運行。這種設計讓 Spark 更加有效率地運行。

經常使用的 Transformation 以下

一、map(func)
返回一個新的 RDD,該 RDD 由每個輸入元素通過 func 函數轉換後組成。
源碼:

    def map[U: ClassTag](f: T => U): RDD[U]

示例代碼:

scala> val mapSource = sc.parallelize(1 to 10)
mapSource: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> mapSource.collect()
res: Array[Int] = Array(12345678910)

scala> val map = mapSource.map(_ * 2)
map: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26

scala> map.collect      或者    map.collect() 括號能夠省略
res4: Array[Int] = Array(2468101214161820)

二、filter(func)
返回一個新的 RDD,該 RDD 由通過 func 函數計算後返回值爲 true 的輸入元素組成。
源碼:

    def filter(f: T => Boolean): RDD[T] 

示例代碼:

scala> val filterSource = sc.makeRDD(1 to 1010)
filterSource: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24

scala> filterSource.collect
res5: Array[Int] = Array(12345678910)

scala> filterSource.filter(_ % 2 == 0).collect
res6: Array[Int] = Array(246810)

三、 flatMap(func)
相似於 map,可是每個輸入元素能夠被映射爲 0 或多個輸出元素(因此 func 應該返回一個序列,而不是單一元素)。
源碼:

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

示例代碼:

scala> val flatMapSource = sc.makeRDD(Array("a b c""d e f""h i j"))
flatMapSource: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at <console>:24

scala> flatMapSource.map(_.split(" ")).collect
res1: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))

scala> flatMapSource.map(_.split(" "))
res2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:27

scala> flatMapSource.flatMap(_.split(" ")).collect
res3: Array[String] = Array(a, b, c, d, e, f, h, i, j)

scala> flatMapSource.flatMap(_.split(" "))
res4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:27

# flatMap(_.split(" ")) 等價於 flatMap(Array(_))

scala> flatMapSource.flatMap(Array(_)).collect
res5: Array[String] = Array(a b c, d e f, h i j)

scala> flatMapSource.collect
res6: Array[String] = Array(a b c, d e f, h i j)

四、mapPartitions(func)
相似於 map,但獨立地在 RDD 的每個分片上運行,所以在類型爲 T 的 RDD 上運行時,func 的函數類型必須是 Iterator[T] => Iterator[U]。
假設有 N 個元素,有 M 個分區,那麼 map 的函數的將被調用 N 次,而 mapPartitions 被調用 M 次,一個函數一次處理全部分區。mapPartitions 的執行效率要比 map 高。
源碼:

    def mapPartitions[U: ClassTag](
        f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U]

示例代碼:

scala> val mapPartSource = sc.makeRDD(1 to 105)
mapPartSource: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24

scala> mapPartSource.collect
res7: Array[Int] = Array(12345678910)

scala> mapPartSource.mapPartitions(_.map(_ + "a"))
res8: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at mapPartitions at <console>:27

scala> mapPartSource.mapPartitions(_.map(_ + "a")).collect
res9: Array[String] = Array(1a, 2a, 3a, 4a, 5a, 6a, 7a, 8a, 9a, 10a)

scala> mapPartSource.mapPartitions(items => items.map(x => x + "a")).collect
res10: Array[String] = Array(1a, 2a, 3a, 4a, 5a, 6a, 7a, 8a, 9a, 10a)

--------------------如下爲擴展練習代碼--------------------

scala> mapPartSource.partitions.size
res11: Int = 5

scala> mapPartSource.collect
res15: Array[Int] = Array(12345678910)

scala> mapPartSource.mapPartitions(x => Iterator(x.mkString("|"))).collect      mkString() 該函數把集合元素轉化爲字符串,可能還會添加分隔符、前綴、後綴。
res16: Array[String] 
= Array(1|23|45|67|89|10)

scala> val mapPartSource2 = sc.makeRDD(Array("a b c""d e f""h i j"), 5)
mapPartSource2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at makeRDD at <console>:24

scala> mapPartSource2.mapPartitions(x => Iterator(x.mkString("|"))).collect
res17: Array[String] = Array("", a b c, "", d e f, h i j)

scala> val mapPartSource3 = sc.makeRDD(Array("a b c""d e f""h i j""k l m""o p q"), 2)
mapPartSource3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at makeRDD at <console>:24

scala> mapPartSource3.mapPartitions(x => Iterator(x.mkString("|"))).collect
res18: Array[String] = Array(a b c|d e f, h i j|k l m|o p q)

五、mapPartitionsWithIndex(func)
相似於 mapPartitions,但 func 帶有一個整數參數表示分片的索引值,所以在類型爲 T 的 RDD 上運行時,func 的函數類型必須是 (Int, Interator[T]) => Iterator[U]。
源碼:

    private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
        f: (Int, Iterator[T]) => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U]

示例代碼:

scala> val source = sc.makeRDD(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> source.partitions.size
res0: Int = 4

scala> source.mapPartitionsWithIndex((index, x) => Iterator(index + ":" + x.mkString("|"))).collect
res3: Array[String] = Array(0:1|21:3|4|52:6|73:8|9|10)                    

六、sample(withReplacement, fraction, seed)
以指定的隨機種子隨機抽樣出數量爲 fraction 的數據,withReplacement 表示是抽出的數據是否放回,true 爲有放回的抽樣,false 爲無放回的抽樣,seed 用於指定隨機數生成器種子。
例如:從 RDD 中隨機且有放回的抽出 50% 的數據,隨機種子值爲 3(便可能以1 2 3的其中一個起始值)。主要用於觀察大數據集的分佈狀況。
源碼:

    def sample(
        withReplacement: Boolean,
        fraction: Double,
        seed: Long = Utils.random.nextLong)
: RDD[T]

示例代碼:

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd.collect()
res11: Array[Int] = Array(12345678910)

scala> var sample1 = rdd.sample(true0.42).collect
sample1: Array[Int] = Array(1227789)    爲何抽樣出7個數據呢?

scala> var sample2 = rdd.sample(false0.23).collect
sample2: Array[Int] = Array(19)

七、takeSample
和 sample 的區別是:takeSample 返回的是最終的結果集合。

八、union(otherDataset)
對源 RDD 和參數 RDD 求並集後返回一個新的 RDD。
源碼:

    def union(other: RDD[T]): RDD[T]

示例代碼:

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2).collect
rdd3: Array[Int] = Array(123455678910)

九、intersection(otherDataset)
對源 RDD 和參數 RDD 求交集後返回一個新的 RDD。
源碼:

    def intersection(other: RDD[T]): RDD[T]

示例代碼:

scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> val rdd3 = rdd1.intersection(rdd2).collect
rdd3: Array[Int] = Array(567

十、distinct([numTasks]))
對原 RDD 進行去重後返回一個新的 RDD。默認狀況下,只有 8 個並行任務來操做,可是能夠傳入一個可選的 numTasks 參數改變它。
源碼:

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    def distinct(): RDD[T]

示例代碼:

scala> val distinctRdd = sc.parallelize(List(12152961))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> distinctRdd.distinct().collect
res12: Array[Int] = Array(19562)

scala> distinctRdd.distinct(2).collect
res13: Array[Int] = Array(62195)

十一、partitionBy
對 KV 結構 RDD 進行從新分區,若是原有的 partionRDD 和現有的 partionRDD 是一致的話就不進行分區,不然會生成 shuffleRDD。
源碼:

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]

示例代碼:

scala> val rdd = sc.parallelize(Array((1,"aaa"), (2,"bbb"), (3,"ccc"), (4,"ddd")), 2)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[31] at parallelize at <console>:24

scala> rdd.partitions.size
res14: Int = 2

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(3))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[32] at partitionBy at <console>:26

scala> rdd2.partitions.size
res15: Int = 3

十二、reduceByKey(func, [numTasks])
在一個 (K,V) 的 RDD 上調用,返回一個 (K,V) 的 RDD,使用指定的 reduce 函數,將相同 key 的值聚合到一塊兒,reduce 任務的個數能夠經過第二個可選的參數來設置。
源碼:

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

    def reduceByKey(func: (V, V) => V): RDD[(K, V)]

示例代碼:

scala> val rdd = sc.parallelize(List(("female",1), ("male",5), ("female",5), ("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> val reduce = rdd.reduceByKey((x, y) => x + y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[34] at reduceByKey at <console>:26

scala> reduce.collect()
res16: Array[(String, Int)] = Array((female,6), (male,7))

--------------------如下爲擴展練習代碼--------------------

scala> val rdd = sc.makeRDD(1 to 15).union(sc.makeRDD(7 to 20))
rdd: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:24

scala> rdd.map(x => (x, 1))
res0: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:27

scala> rdd.map((_, 1))
res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at map at <console>:27

scala> res0.collect
res3: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (7,1), (8,1), (9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (16,1), (17,1), (18,1), (19,1), (20,1))

scala> res2.collect
res4: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (7,1), (8,1), (9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (16,1), (17,1), (18,1), (19,1), (20,1))

scala> val rdd = sc.makeRDD(1 to 15).union(sc.makeRDD(7 to 20)).map((_, 1))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at map at <console>:24

scala> rdd.reduceByKey(_ + _).collect
res5: Array[(Int, Int)] = Array((16,1), (8,2), (1,1), (17,1), (9,2), (18,1), (10,2), (2,1), (19,1), (11,2), (3,1), (4,1), (20,1), (12,2), (13,2), (5,1), (14,2), (6,1), (15,2), (7,2))

scala> rdd.reduceByKey((x, y) => (x + y)).collect
res6: Array[(Int, Int)] = Array((16,1), (8,2), (1,1), (17,1), (9,2), (18,1), (10,2), (2,1), (19,1), (11,2), (3,1), (4,1), (20,1), (12,2), (13,2), (5,1), (14,2), (6,1), (15,2), (7,2))

1三、groupByKey
groupByKey 也是對每一個 key 進行操做,但只生成一個 sequence。
源碼:

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

示例代碼:

scala> rdd.reduceByKey(_ + _).collect
res5: Array[(Int, Int)] = Array((16,1), (8,2), (1,1), (17,1), (9,2), (18,1), (10,2), (2,1), (19,1), (11,2), (3,1), (4,1), (20,1), (12,2), (13,2), (5,1), (14,2), (6,1), (15,2), (7,2))

scala> rdd.groupByKey
res7: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[11] at groupByKey at <console>:27

scala> rdd.groupByKey().collect
res8: Array[(Int, Iterable[Int])] = Array((16,CompactBuffer(1)), (8,CompactBuffer(11)), (1,CompactBuffer(1)), (17,CompactBuffer(1)), (9,CompactBuffer(11)), (18,CompactBuffer(1)), (10,CompactBuffer(11)), (2,CompactBuffer(1)), (19,CompactBuffer(1)), (11,CompactBuffer(11)), (3,CompactBuffer(1)), (4,CompactBuffer(1)), (20,CompactBuffer(1)), (12,CompactBuffer(11)), (13,CompactBuffer(11)), (5,CompactBuffer(1)), (14,CompactBuffer(11)), (6,CompactBuffer(1)), (15,CompactBuffer(11)), (7,CompactBuffer(11)))

scala> rdd.groupByKey().map(item => (item._1, item._2.sum)).collect       方式一:僅使用 map
res10: Array[(Int, Int)] = Array((16,1), (8,2), (1,1), (17,1), (9,2), (18,1), (10,2), (2,1), (19,1), (11,2), (3,1), (4,1), (20,1), (12,2), (13,2), (5,1), (14,2), (6,1), (15,2), (7,2))

scala> rdd.groupByKey().map(item => (item._1, item._2.reduceLeft(_ + _))).collect       方式二:使用 map + reduceLeft
res11: Array[(Int, Int)] = Array((16,1), (8,2), (1,1), (17,1), (9,2), (18,1), (10,2), (2,1), (19,1), (11,2), (3,1), (4,1), (20,1), (12,2), (13,2), (5,1), (14,2), (6,1), (15,2), (7,2))

scala> rdd.groupByKey().map{case (x, y) => (x, y.sum)}.collect      方式三:使用模式匹配
res13: Array[(Int, Int)] = Array((16,1), (8,2), (1,1), (17,1), (9,2), (18,1), (10,2), (2,1), (19,1), (11,2), (3,1), (4,1), (20,1), (12,2), (13,2), (5,1), (14,2), (6,1), (15,2), (7,2))

reduceByKey 和 groupByKey 的區別:

1四、combineByKey 此函數博大精深,海納百川

  def combineByKey[C](  
    createCombiner: V => C,  
    mergeValue: (C, V) => C,  
    mergeCombiners: (C, C) => C)

對相同的 K,把 V 合併成一個集合。

createCombiner: combineByKey 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。若是這是一個新的元素,combineByKey() 會使用一個叫做 createCombiner() 的函數來建立那個鍵對應的累加器的初始值。

mergeValue: 若是這是一個在處理當前分區以前已經遇到的鍵,它會使用 mergeValue() 方法將該鍵的累加器對應的當前值與這個新的值進行合併。

mergeCombiners: 因爲每一個分區都是獨立處理的,所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應同一個鍵的累加器,就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。

示例代碼:

scala> val rdd = sc.makeRDD(Array(("a",50),("a",70),("b",60),("a",60),("b",80),("c",90),("b",90),("c",60),("c",80)),3)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at makeRDD at <console>:24

scala> rdd.combineByKey((_, 1), (c:(Int,Int), v) => (c._1 + v, c._2 + 1), (c1:(Int,Int), c2:(Int,Int)) => (c1._1 + c2._1, c1._2 + c2._2))
res16: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[22] at combineByKey at <console>:27

scala> res16.collect
res17: Array[(String, (Int, Int))] = Array((c,(230,3)), (a,(180,3)), (b,(230,3)))

scala> res17.map(v => (v._1, v._2._1 / v._2._2))    方式一:僅使用 map
res18: Array[(String, Int)] = Array((c,76), (a,60), (b,76))

scala> res17.map{case (stu, (sum, count)) => (stu, sum / count)}    方式二:使用模式匹配
res26: Array[(String, Int)] = Array((c,76), (a,60), (b,76))

scala> case class Score(name: String, score: Int)   方式三:使用對象(好比樣例類),將數據轉換爲對象(樣例類),再將對象轉換成 KV 類型的數據(轉換時使用對象的屬性)
defined class Score

scala> val rdd 
= sc.makeRDD(Array(Score("a",50),Score("a",70),Score("b",60),Score("a",60),Score("b",80),Score("c",90),Score("b",90),Score("c",60),Score("c",80)),3)rdd: org.apache.spark.rdd.RDD[Score] = ParallelCollectionRDD[23] at makeRDD at <console>:26

scala> rdd.map(sco => (sco.name, sco.score))
res27: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[24] at map at <console>:29

scala> res27.combineByKey((_,1), (c:(Int,Int), v) => (c._1 + v, c._2 + 1), (c1:(Int,Int), c2:(Int,Int)) => (c1._1 + c2._1, c1._2 + c2._2))
res28: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[25] at combineByKey at <console>:31

scala> res28.collect
res29: Array[(String, (Int, Int))] = Array((c,(230,3)), (a,(180,3)), (b,(230,3)))

圖解以下:

1五、aggregateByKey

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

是 combineBykey 的簡化操做,zeroValue 相似於 createCombiner, seqOp 相似於 mergeValue, combOp 相似於 mergeCombiner。
在 kv 對的 RDD 中,按 key 將 value 進行分組合並,合併時,將初始值和每一個 value 做爲 seq 函數的參數,進行對應的計算,返回的結果做爲一個新的 kv 對,而後再將結果按照 key 進行合併,最後將每一個分組的 value 傳遞給 combine 函數進行計算(先將前兩個 value 進行計算,將返回結果和下一個 value 傳給 combine 函數,以此類推),將 key 與計算結果做爲一個新的 kv 對輸出。seqOp 函數用於在每個分區中用初始值逐步迭代 value,combOp 函數用於合併每一個分區中的結果。

示例代碼:

scala> val rdd1 = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.aggregateByKey(0)(math.max(_,_),_+_).collect
res0: Array[(Int, Int)] = Array((1,4), (3,8), (2,3))

scala> val rdd2 = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd2.aggregateByKey(0)(math.max(_,_),_+_).collect
res1: Array[(Int, Int)] = Array((3,8), (1,7), (2,3))

--------------------如下爲擴展練習代碼--------------------

scala> val rdd3 = sc.makeRDD(Array(("a",50),("a",70),("b",60),("a",60),("b",80),("c",90),("b",90),("c",60),("c",80)),3)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at makeRDD at <console>:24

scala> rdd3.aggregateByKey((0,0))((c:(Int,Int), v) => (c._1 + v, c._2 + 1), (c1:(Int,Int), c2:(Int,Int)) => (c1._1 + c2._1, c1._2 + c2._2)).collect
res3: Array[(String, (Int, Int))] = Array((c,(230,3)), (a,(180,3)), (b,(230,3)))

例如:
若是分一個分區,以 key 爲 1 的分區爲例,0 先和 3 比較得 3,3 再和 2 比較得 3,3 再和 4 比較得 4,因此整個 key 爲 1 的組最終結果爲(1,4),同理,key 爲 2 的最終結果爲(2,3),key 爲 3 的最終結果爲 (3,8)。
若是分三個分區,前兩個 kv對 在一個分區,中間兩個 kv對 在一個分區,最後兩個 kv對 在一個分區,第一個分區的最終結果爲 (1,3),第二個分區的最終結果爲 (1,4) 和 (2,3),最後一個分區的最終結果爲 (3,8),combine 後爲 (1,7), (2,3), (3,8)。
圖解以下:

1六、foldByKey

  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]     是 aggregateByKey 的簡化操做,seqop 和 combop 相同。注意:V 的類型不能改變。

示例代碼:

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val fold = rdd.foldByKey(0)(_+_)
fold: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at foldByKey at <console>:26

scala> fold.collect()
res4: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))

圖解以下:

1七、sortByKey([ascending], [numTasks])
在一個 (K,V) 的 RDD 上調用,K 必須實現 Ordered 接口,返回一個按照 key 進行排序的 (K,V) 的 RDD。
源碼:

  def sortByKey(
    ascending: Boolean = true
    numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

示例代碼:

scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd.sortByKey(true).collect()    升序
res5: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

scala> rdd.sortByKey(false).collect()   降序
res6: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

1八、sortBy(func, [ascending], [numTasks])
與 sortByKey 相似,可是更靈活,能夠用 func 先對數據進行處理,按照處理後的數據比較結果排序。
源碼:

  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

示例代碼:

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at makeRDD at <console>:24

scala> rdd.partitions.size
res12: Int = 4

scala> rdd.sortBy(_%4).collect
res13: Array[Int] = Array(48591261037)

scala> rdd.map(x => (x % 4, x)).sortByKey().map(x => x._2).collect
res14: Array[Int] = Array(48591261037)

1九、join(otherDataset, [numTasks])
在類型爲 (K,V) 和 (K,W) 的 RDD 上調用,返回一個相同 key 對應的全部元素對在一塊兒的 (K,(V,W)) 的 RDD。即和另外的 RDD 進行 JOIN。
源碼:

    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]     和另外的 RDD 進行 JOIN。

示例代碼:

scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[49] at parallelize at <console>:24

scala> rdd1.join(rdd2).collect()
res15: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

leftOuterJoin、rightOuterJoin、fullOuterJoin 同理。

20、cogroup(otherDataset, [numTasks])
在類型爲 (K,V) 和 (K,W) 的 RDD 上調用,返回一個 (K,(Iterable<V>,Iterable<W>)) 類型的 RDD。
源碼:

    def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]      相似於兩個 RDD 分別作 groupByKey 而後再 全JOIN。

圖解以下:


示例代碼:
scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[57] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(Array((1,4),(2,5),(5,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[58] at parallelize at <console>:24

scala> rdd1.cogroup(rdd2).collect()
res17: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (5,(CompactBuffer(),CompactBuffer(6))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer())))

scala> rdd1.groupByKey.join(rdd2.groupBy)
groupBy   groupByKey

scala> rdd1.groupByKey.join(rdd2.groupByKey).collect
res18: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))))

scala> rdd1.groupByKey.fullOuterJoin(rdd2.groupByKey).collect
res19: Array[(Int, (Option[Iterable[String]], Option[Iterable[Int]]))] = Array((1,(Some(CompactBuffer(a)),Some(CompactBuffer(4)))), (5,(None,Some(CompactBuffer(6)))), (2,(Some(CompactBuffer(b)),Some(CompactBuffer(5)))), (3,(Some(CompactBuffer(c)),None)))

2一、cartesian(otherDataset)
笛卡爾積

    def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]      笛卡爾積

示例代碼:

scala> val rdd1 = sc.parallelize(1 to 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[74] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[75] at parallelize at <console>:24

scala> rdd1.cartesian(rdd2).count
res21: Long = 12

scala> rdd1.cartesian(rdd2).collect()
res22: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))

2二、pipe(command, [envVars])
對於每一個分區,支持使用外部腳本好比 shell、perl 等處理分區內的數據。
注意:shell 腳本須要集羣中的全部節點都能訪問到。即腳本文件要分發到其餘機器節點。

    def pipe(command: String): RDD[String]

shell 腳本
pipe.sh

#!/bin/sh
echo "AA"
while read LINE; do
   echo ">>>"${LINE}
done

示例代碼:

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)     一個分區
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[78] at parallelize at <console>:24

scala> rdd.pipe("/home/atguigu/bin/test/pipe.sh")
res23: org.apache.spark.rdd.RDD[String] = PipedRDD[79] at pipe at <console>:27

scala> rdd.pipe("/home/atguigu/bin/test/pipe.sh").collect
res24: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),3)     三個分區
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[82] at parallelize at <console>:24

scala> rdd.pipe("/home/atguigu/bin/test/pipe.sh").collect
res25: Array[String] = Array(AA, >>>hi, AA, >>>Hello, >>>how, AA, >>>are, >>>you)

2三、coalesce(numPartitions)
縮減分區數,用於大數據集過濾後,提升小數據集的執行效率。

    def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

示例代碼:

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[84] at parallelize at <console>:24

scala> rdd.partitions.size
res27: Int = 4

scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[85] at coalesce at <console>:26

scala> coalesceRDD.partitions.size
res28: Int = 3

2四、repartition(numPartitions)
根據分區數,從新經過網絡隨機洗牌全部數據。

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

2五、repartitionAndSortWithinPartitions(partitioner)
repartitionAndSortWithinPartitions 函數是 repartition 函數的變種,與 repartition 函數不一樣的是,repartitionAndSortWithinPartitions 在給定的 partitioner 內部進行排序,性能比 repartition 要高。

2六、glom
將每個分區造成一個數組,造成新的 RDD 類型是 RDD[Array[T]]。
源碼:

    def glom(): RDD[Array[T]]       將每個分區中的全部數據轉換爲一個 Array 數組,造成新的 RDD。

示例代碼:

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[86] at parallelize at <console>:24

scala> rdd.glom().collect()
res29: Array[Array[Int]] = Array(Array(1234), Array(5678), Array(9101112), Array(13141516))

2七、mapValues
針對於 (K,V) 形式的類型只對 V 進行操做。
源碼:

    def mapValues[U](f: V => U): RDD[(K, U)]        只對 KV 結構中 value 數據進行映射。value 能夠改變類型。

示例代碼:

scala> val rdd = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[88] at parallelize at <console>:24

scala> rdd.mapValues(_+"|||").collect()
res30: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

2八、subtract
計算差的一種函數,去除兩個 RDD 中相同的元素,不一樣的 RDD 將保留下來。舉例說明:A - B = A - A 與 B 的交集
源碼:

    def subtract(other: RDD[T]): RDD[T]     求差集。

示例代碼:

scala> val rdd1 = sc.parallelize(3 to 8)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(1 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[91] at parallelize at <console>:24

scala> rdd1.subtract(rdd2).collect()
res31: Array[Int] = Array(867

2.3.2 Action(行動)

經常使用的 Action 以下:

一、reduce(func)
經過 func 函數彙集 RDD 中的全部元素,這個功能必須是可交換且可並聯的。
源碼:

    def reduce(f: (T, T) => T): T        歸約某個 RDD

示例代碼:

scala> val rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[97] at makeRDD at <console>:24

scala> rdd.reduce(_+_)
res33: Int = 55

二、collect()
將數據返回到 Driver,是以數組的形式返回數據集的全部元素(簡單測試用,生產環境中不用)

三、count()
返回 RDD 的元素個數。

四、first()
返回 RDD 的第一個元素(相似於 take(1))。

五、take(n)
返回一個由數據集的前 n 個元素組成的數組。

六、takeSample(withReplacement, num, [seed])
返回一個數組,該數組由從數據集中隨機採樣的 num 個元素組成,能夠選擇是否用隨機數替換不足的部分,seed 用於指定隨機數生成器種子。
圖解以下:

七、takeOrdered(n)
返回前幾個的排序。
示例代碼:

scala> var rdd = sc.makeRDD(Seq(1042123))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at makeRDD at <console>:24

scala> rdd.top(2)
res50: Array[Int] = Array(1210)

scala> rdd.takeOrdered(2)
res51: Array[Int] = Array(23)

八、aggregate
操做的是數值型數據,aggregate 函數將每一個分區裏面的元素經過 seqOp 和初始值進行聚合,而後用 combine 函數將每一個分區的結果和初始值 (zeroValue) 進行 combine 操做。這個函數最終返回的類型不須要和 RDD 中元素類型一致。

    def aggregate(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

圖解以下:


示例代碼:
scala> var rdd = sc.makeRDD(1 to 4,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

scala> rdd.aggregate(1)((_*_),(_+_))
res53: Int = 15

九、fold(zeroValue)(func)
摺疊操做,aggregate 的簡化操做,seqop 和 combop同樣。

十、saveAsTextFile(path) 以文本的方式保存到 HDFS 兼容的文件系統
將數據集的元素以 textfile 的形式保存到 HDFS 文件系統或者其餘支持的文件系統,對於每一個元素,Spark 將會調用 toString 方法,將它裝換爲文件中的文本。
示例代碼:

scala> var rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[108] at parallelize at <console>:24

scala> rdd.saveAsTextFile("hdfs://hadoop102:9000/textFile")

HDFS 上查看

[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -cat hdfs://hadoop102:9000/textFile/par*
1
2
3
4
5
6
7
8
9
10

十一、saveAsSequenceFile(path) 以 SequenceFile 形式來存文件
將數據集中的元素以 Hadoop sequencefile 的格式保存到指定的目錄下,能夠是 HDFS 或者其餘 Hadoop 支持的文件系統。
示例代碼:

scala> var rdd = sc.parallelize(List((1,5),(1,6),(1,7),(2,8),(2,9),(3,10)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[110] at parallelize at <console>:24

scala> rdd.saveAsSequenceFile("hdfs://hadoop102:9000/seqFile")

HDFS 上查看

十二、saveAsObjectFile(path) 以 ObjectFile 形式來存文件
用於將 RDD 中的元素序列化成對象,存儲到文件中。
示例代碼:

scala> var rdd = sc.parallelize(List((1,5),(1,6),(1,7),(2,8),(2,9),(3,10)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[112] at parallelize at <console>:24

scala> rdd.saveAsObjectFile("hdfs://hadoop102:9000/objFile")

scala> rdd.partitions.size
res57: Int = 4

HDFS 上查看

1三、countByKey() 返回 Map 結構,獲取每個 key 的數量
針對 (K,V) 類型的 RDD,返回一個 (K,Int) 的 map,表示每個 key 對應的元素個數。
示例代碼:

scala> var rdd = sc.parallelize(List((1,5),(1,6),(1,7),(2,8),(2,9),(3,10)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[115] at parallelize at <console>:24

scala> rdd.countByKey()
res58: scala.collection.Map[Int,Long] = Map(1 -> 32 -> 23 -> 1)

1四、foreach(func) 在數據集上的每個元素運行 func 函數。
示例代碼:

scala> var rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[119] at makeRDD at <console>:24

scala> var sum = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
sum: org.apache.spark.Accumulator[Int] = 0

scala> rdd.foreach(sum+=_)

scala> sum.value
res61: Int = 55

scala> rdd.collect().foreach(println)
1
2
3
4
5
6
7
8
9
10

2.3.3 數值 RDD 的統計操做

  Spark 對包含數值數據的 RDD 提供了一些描述性的統計操做。 Spark 的數值操做是經過流式算法實現的,容許以每次一個元素的方式構建出模型。這些統計數據都會在調用 stats() 時經過一次遍歷數據計算出來,並以 StatsCounter 對象返回。


示例代碼:
scala> var rdd = sc.makeRDD(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[122] at makeRDD at <console>:24

scala> rdd.count()
res64: Long = 100

scala> rdd.mean()
res65: Double = 50.5

scala> rdd.sum()
res66: Double = 5050.0

scala> rdd.max()
res67: Int = 100

scala> rdd.min()
res68: Int = 1

2.3.4 向 RDD 操做傳遞函數注意事項

  Spark 的大部分轉化操做和一部分行動操做,都須要依賴用戶傳遞的函數來計算。在 Scala 中,咱們能夠把定義的內聯函數、方法的引用或靜態方法傳遞給 Spark,就像 Scala 的其餘函數式 API 同樣。咱們還要考慮其餘一些細節,好比所傳遞的函數及其引用的數據須要是可序列化的(實現了 Java 的 Serializable 接口)。 傳遞一個對象的方法或者字段時,會包含對整個對象的引用。
  小結:傳遞函數的時候須要注意:若是你的 RDD 轉換操做中的函數使用到了類的方法或者變量,那麼你須要注意該類可能須要可以序列化。
示例代碼:

class SearchFunctions(val queryStringextends java.io.Serializable{
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
    // 問題:"isMatch"表示"this.isMatch",所以咱們要傳遞整個"this" 
    rdd.filter(isMatch)
  }
  def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { 
    // 問題:"query"表示"this.query",所以咱們要傳遞整個"this" 
    rdd.filter(x => x.contains(query)) 
  }
  def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
    // 安全:只把咱們須要的字段拿出來放入局部變量中 
    val query_ = this.query
    rdd.filter(x => x.contains(query_))
  } 

  若是在 Scala 中出現了 NotSerializableException,一般問題就在於咱們傳遞了一個不可序列化的類中的函數或字段。

2.3.5 在不一樣 RDD 類型間轉換

  有些函數只能用於特定類型的 RDD,好比 mean() 和 variance() 只能用在數值 RDD 上,而 join() 只能用在鍵值對 RDD 上。在 Scala 和 Java 中,這些函數都沒有定義在標準的 RDD 類中,因此要訪問這些附加功能,必需要確保得到了正確的專用 RDD 類。
  在 Scala 中,將 RDD 轉爲有特定函數的 RDD(好比在 RDD[Double] 上進行數值操做)是由隱式轉換來自動處理的。

2.4 RDD 持久化

2.4.1 RDD 的緩存

  Spark 速度很是快的緣由之一,就是在不一樣操做中能夠在內存中持久化或緩存個數據集。當持久化某個 RDD 後,每個節點都將把計算的分片結果保存在內存中,並在對此 RDD 或衍生出的 RDD 進行的其餘動做中重用。這使得後續的動做變得更加迅速。RDD 相關的持久化和緩存,是 Spark 最重要的特徵之一。能夠說,緩存是 Spark 構建迭代式算法快速交互式查詢的關鍵。若是一個有持久化數據的節點發生故障,Spark 會在須要用到緩存的數據時重算丟失的數據分區。若是但願節點故障的狀況不會拖累咱們的執行速度,也能夠把數據備份到多個節點上。
  持久化也是懶執行的,持久化有兩個操做:persist(StorageLevel),另一個是 cache, cache 就至關於 MEMORY_ONLY 的 persist。

2.4.2 RDD 緩存方式

  RDD 經過 persist 方法或 cache 方法能夠將前面的計算結果緩存,默認狀況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空 間中。可是並非這兩個方法被調用時當即緩存,而是觸發後面的 action 時,該 RDD 將會被緩存在計算節點的內存中,並供後面重用。
  經過查看源碼發現 cache 最終也是調用了 persist 方法,默認的存儲級別都是僅在內存存儲一份,Spark 的存儲級別還有好多種,存儲級別在 object StorageLevel 中定義的。

  在存儲級別的末尾加上「_2」來把持久化數據存爲兩份。


  緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD 的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於 RDD 的一系列轉換,丟失的數據會被重算,因爲 RDD 的各個 Partition 是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有 Partition。
   注意:使用 Tachyon 能夠實現堆外緩存。
  

示例代碼:

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:26

scala> nocache.collect
res0: Array[String] = Array(1[1556167699748], 2[1556167699748], 3[1556167699546], 4[1556167699546], 5[1556167699546], 6[1556167699747], 7[1556167699747], 8[1556167699545], 9[1556167699545], 10[1556167699545])

scala> nocache.collect  時間戳變化
res1: Array[String] = Array(1[1556167711837], 2[1556167711837], 3[1556167711813], 4[1556167711813], 5[1556167711813], 6[1556167711841], 7[1556167711841], 8[1556167711814], 9[1556167711814], 10[1556167711814])

scala>  val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26

scala> cache.cache      緩存一把
res2: cache.type = MapPartitionsRDD[2] at map at <console>:26

scala> cache.collect
res3: Array[String] = Array(1[1556167795046], 2[1556167795054], 3[1556167795023], 4[1556167795023], 5[1556167795023], 6[1556167795046], 7[1556167795046], 8[1556167795023], 9[1556167795025], 10[1556167795025])

scala> cache.collect    時間戳不變
res4: Array[String] = Array(1[1556167795046], 2[1556167795054], 3[1556167795023], 4[1556167795023], 5[1556167795023], 6[1556167795046], 7[1556167795046], 8[1556167795023], 9[1556167795025], 10[1556167795025])

2.5 RDD 檢查點機制

  Spark 中對於數據的保存除了持久化操做以外,還提供了一種檢查點的機制,檢查點(本質是經過將 RDD 寫入 Disk 作檢查點)是爲了經過 lineage 作容錯的輔助,lineage 過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的 RDD 開始重作 Lineage,就會減小開銷。檢查點經過將數據寫入到 HDFS 文件系統實現了 RDD 的檢查點功能。
  cache 和 checkpoint 是有顯著區別的,緩存把 RDD 計算出來而後放在內存中,可是 RDD 的依賴鏈(至關於數據庫中的 redo 日誌),也不能丟掉,當某個點某個 executor 宕了,上面 cache 的 RDD 就會丟掉, 須要經過依賴鏈重放計算出來,不一樣的是,checkpoint 是把 RDD 保存在 HDFS 中,是多副本可靠存儲,因此依賴鏈就能夠丟掉了,就斬斷了依賴鏈, 是經過複製實現的高容錯。
  若是存在如下場景,則比較適合使用檢查點機制:
  1) DAG 中的 Lineage 過長,若是重算,則開銷太大(如在 PageRank 中)。
  2) 在寬依賴上作 checkpoint 得到的收益更大。
  爲當前 RDD 設置檢查點。該函數將會建立一個二進制的文件,並存儲到 checkpoint 目錄中,該目錄是用 SparkContext.setCheckpointDir() 設置的。在 checkpoint 的過程當中,該 RDD 的全部依賴於父 RDD 中的信息將所有被移出。對 RDD 進行 checkpoint 操做並不會立刻被執行,必須執行 Action 操做才能觸發。

示例代碼:

scala> val data = sc.parallelize(1 to 100,5)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")

scala> data.count
res6: Long = 100

scala> data.checkpoint

scala> val ch1 = sc.parallelize(1 to 2)
ch1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:26

scala> val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:26

scala> ch3.checkpoint

scala> ch2.collect
res9: Array[String] = Array(1[1556169758749], 2[1556169758749])

scala> ch2.collect
res10: Array[String] = Array(1[1556169761316], 2[1556169761316])

scala> ch3.collect
res11: Array[String] = Array(1[1556169778793], 2[1556169778790])                

scala> ch3.collect
res12: Array[String] = Array(1[1556169780524], 2[1556169780525])

scala> ch3.collect
res13: Array[String] = Array(1[1556169780524], 2[1556169780525])

2.5.1 checkpoint 寫流程

  RDD checkpoint 過程當中會通過幾個狀態:[ initialized → marked for checkpointing → checkpointing in progress → checkpointed ]
  轉換流程以下:
  

  1) data.checkpoint 這個函數調用中設置的目錄中全部依賴的 RDD 都會被刪除,函數必須在 job 運行以前調用執行,強烈建議 RDD 緩存在內存中(又提到一次,千萬要注意喲)!不然保存到文件的時候須要從頭計算。初始化 RDD 的 checkpointData 變量爲 ReliableRDDCheckpointData。這時候標記爲 Initialized 狀態。

  2) 在全部 job action 的時候,runJob 方法中都會調用 rdd.doCheckpoint,這個會向前遞歸調用全部的依賴的 RDD,看看需不須要 checkpoint。須要 checkpoint,而後調用 checkpointData.get.checkpoint(),裏面標記狀態爲 CheckpointingInProgress,裏面調用具體實現類的 ReliableRDDCheckpointData 的 doCheckpoint 方法。

  3) doCheckpoint -> writeRDDToCheckpointDirectory,注意這裏會把 job 再運行一次,若是已經 cache 了,就能夠直接使用緩存中的 RDD 了,就不須要重頭計算一遍了(怎麼又說了一遍),這時候直接把 RDD,輸出到 hdfs 每一個分區一個文件,會先寫到一個臨時文件,若是所有輸出完,則進行 rename ,若是輸出失敗,就回滾 delete。

  4) 標記狀態爲 Checkpointed,markCheckpointed 方法中清除全部的依賴,怎麼清除依賴的呢?就是把 RDD 變量的強引用設置爲 null,垃圾回收了,會觸發 ContextCleaner 裏面的監聽,清除實際 BlockManager 緩存中的數據。

2.5.2 checkpoint 讀流程

  若是一個 RDD 咱們已經 checkpoint 了,那麼是何時用呢?checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,若是不被手動 remove 掉,是一直存在的,也就是說能夠被下一個 driver program 使用。好比 spark streaming 掛掉了, 重啓後就可使用以前 checkpoint 的數據進行 recover,固然在同一個 driver program 也可使用。
  咱們講下在同一個 driver program 中是怎麼使用 checkpoint 數據的。若是一個 RDD 被checkpoint 了,若是這個 RDD 上有 action 操做時候,或者回溯的這個 RDD 的時候,這個 RDD 進行計算的時候,裏面判斷若是已經 checkpoint 過,對分區和依賴的處理都是使用的 RDD 內部的 checkpointRDD 變量。

  具體細節以下:
  若是一個 RDD 被 checkpoint 了,那麼這個 RDD 中對分區和依賴的處理都是使用的 RDD 內部的 checkpointRDD 變量,具體實現是 ReliableCheckpointRDD 類型。這個是在 checkpoint 寫流程中建立的。依賴和獲取分區方法中先判斷是否已經 checkpoint,若是已經 checkpoint 了,就斬斷依賴,使用 ReliableCheckpointRDD 來處理依賴和獲取分區。
  若是沒有,才往前回溯依賴。依賴就是沒有依賴,由於已經斬斷了依賴,獲取分區數據就是讀取 checkpoint 到 hdfs 目錄中不一樣分區保存下來的文件。

2.6 RDD 的依賴關係

  RDD 和它依賴的父 RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
  

2.6.1 窄依賴(Narrow Dependency)

  窄依賴指的是每個父 RDD 的 Partition 最多被子 RDD 的一個 Partition 使用。
  總結:窄依賴咱們形象的比喻爲獨生子女。

2.6.2 寬依賴(Wide Dependency)

  寬依賴指的是多個子 RDD 的 Partition 會依賴同一個父 RDD 的 Partition,會引發 shuffle。
  總結:寬依賴咱們形象的比喻爲超生子女。

2.6.3 Lineage

  RDD 只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立 RDD 的一系列 Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD 的 Lineage 會記錄 RDD 的元數據信息和轉換行爲,當該 RDD 的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。
  

2.7 DAG 的生成

  


  DAG(Directed Acyclic Graph) 叫作有向無環圖,原始的 RDD 經過一系列的轉換就就造成了 DAG,根據 RDD 之間的依賴關係的不一樣將 DAG 劃分紅不一樣的 Stage,對於窄依賴,partition 的轉換處理在 Stage 中完成計算。對於寬依賴,因爲有 Shuffle 的存在,只能在 parent RDD 處理完成後,才能開始接下來的計算,所以寬依賴是劃分 Stage(階段) 的依據。
  
  RDD 的任務切分
  
  Application:一個可以打成 jar 包的 Spark 程序就是一個應用。裏面應該有一個 SparkContext。
  Job:一個應用中每個 Action 操做所涉及到的全部轉換叫一個 Job。
  Stage:一個 Job 根據 RDD 之間的寬窄依賴關係劃分爲多個 Stage,Stage 之間是根據依賴關係來逐個執行的。
  Task: 一個 Stage 運行的時候,RDD 的每個分區都會被一個 Task 去處理,也能夠認爲是 並行度
  RDD 的運行規劃圖
  
  寫代碼時咱們都是從前日後寫,可是劃分 Stage 是從後往前劃分。劃分過程以下:
  一、首先先把全部代碼劃分紅爲一個 Stage,而後該 Stage 入棧。
  二、從最後的代碼往前走,若是發現 RDD 之間的依賴關係是寬依賴,那麼將寬依賴前面的全部代碼劃分爲第二個 Stage,而後該 Stage 入棧。
  三、根據 2 規則繼續往前走,直到代碼開頭。
  Spark 核心組件交互流程簡圖
  

2.8 RDD 相關概念關係

  


  輸入可能以多個文件的形式存儲在 HDFS 上,每一個 File 都包含了不少塊,稱爲 Block。
  當 Spark 讀取這些文件做爲輸入時,會根據具體數據格式對應的 InputFormat 進行解析,通常是將若干個 Block 合併成一個輸入分片,稱爲 InputSplit,注意 InputSplit 不能跨越文件。
  隨後將爲這些輸入分片生成具體的 Task。InputSplit 與 Task 是一一對應的關係。隨後這些具體的 Task 每一個都會被分配到集羣上的某個節點的某個 Executor 去執行。
  1) 每一個節點能夠起一個或多個Executor。
  2) 每一個 Executor 由若干 core 組成,每一個 Executor 的每一個 core 一次只能執行一個 Task。
  3) 每一個 Task 執行的結果就是生成了目標 RDD 的一個 partiton。

 

  注意:這裏的 core 是虛擬的 core 而不是機器的物理 CPU 核,能夠理解爲就是 Executor 的一個工做線程。
  而 Task 被執行的併發度 = Executor 數目 * 每一個 Executor 核數。
  至於 partition 的數目:
  1) 對於數據讀入階段,例如 sc.textFile,輸入文件被劃分爲多少 InputSplit 就會須要多少初始 Task。
  2) 在 Map 階段 partition 數目保持不變。
  3) 在 Reduce 階段,RDD 的聚合會觸發 shuffle 操做,聚合後的 RDD 的 partition 數目跟具體操做有關,例如 repartition 操做會聚合成指定分區數,還有一些算子是可配置的。

  RDD 在計算的時候,每一個分區都會起一個 task,因此 RDD 的分區數目決定了總的的 Task 數目。
  申請的計算節點(Executor)數目和每一個計算節點核數,決定了你同一時刻能夠並行執行的 Task。
  好比你的 RDD 有 100 個分區,那麼計算的時候就會生成 100 個 Task,你的資源配置爲 10 個計算節點,每一個兩 2 個核,同一時刻能夠並行的 Task 數目爲 20,計算這個 RDD 就須要 5 個輪次。
  若是計算資源不變,你有 101 個 Task 的話,就須要 6 個輪次,在最後一輪中,只有一個 Task 在執行,其他核都在空轉。
  若是資源不變,你的 RDD 只有 2 個分區,那麼同一時刻只有 2 個 Task 運行,其他 18 個核空轉,形成資源浪費。這就是在 spark 調優中,增大 RDD 分區數目,增大任務並行度的作法。

2.9 Spark Core 實例練習

格式:  timestamp   province    city        userid      adid
        某個時間點  某個省份    某個城市    某個用戶    某個廣告

用戶 ID 範圍: 0 - 99
省份、城市 ID 相同: 0 - 9
adid範圍:0 - 19

需求:統計每個省份點擊 TOP3 的廣告 ID
需求:統計每個省份每個小時點擊 TOP3 廣告的 ID

agent.log

1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
1516609143869 3 3 36 16
1516609143869 3 3 54 22
1516609143869 7 6 33 5
......
......

示例代碼:
Practice.scala

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * timestamp   province    city        userid      adid
  * 某個時間點  某個省份    某個城市    某個用戶    某個廣告
  *
  * 用戶 ID 範圍: 0 - 99
  * 省份、城市 ID 相同: 0 - 9
  * adid範圍:0 - 19
  *
  * 需求1:統計每個省份點擊 TOP3 的廣告 ID
  * 需求2:統計每個省份每個小時點擊 TOP3 廣告的 ID
  */

case class Click(timestamp: Long, province: Int, city: Int, userid: Int, adid: Int)

object Practice 
{
  def main(args: Array[String]): Unit = {
    // 建立 SparkConf() 並設置 App 名稱
    val sparkConf = new SparkConf().setMaster("local[*]") setAppName ("practice")

    // 建立 SparkContext,該對象是提交 Spark App 的入口
    val sc = new SparkContext(sparkConf)

    // 載入數據
    // 1516609143867 6 7 64 16
    val click: RDD[String] = sc.textFile("D:\learn\JetBrains\workspace_idea\spark\sparkcore_ad\src\main\resources\agent.log")

    // 將字符串類型的數據轉換爲 Click 對象
    val clickRDD: RDD[Click] = click.map { item =>
      val param = item.split(" ")
      Click(param(0).toLong, param(1).toInt, param(2).toInt, param(3).toInt, param(4).toInt)
    }

    clickRDD.cache()

    // 需求1:統計每個省份點擊 TOP3 的廣告 ID
    // 統計每個省份點擊 TOP3 的廣告 ID,先建立一個最小粒度
    // pro-adid
    val proAndAdid2CountRDD: RDD[(String, Int)] = clickRDD.map(click => (click.province + "_" + click.adid, 1)) // (6_16, 1) (6_12, 1) (6_19, 1) (6_16, 1) (6_12, 1) ...
    val proAndAdid2CountsRDD: RDD[(String, Int)] = proAndAdid2CountRDD.reduceByKey(_ + _) // (6_16, 15) (6_12, 22) (6_19, 20) ....

    // 再逐漸放大粒度
    val pro2AdidCountsRDD = proAndAdid2CountsRDD.map { item =>
      val param = item._1.split("_")
      (param(0).toInt, (param(1).toInt, item._2)) // (6, (16,15))
    }
    val pro2AdidsRDD: RDD[(Int, Iterable[(Int, Int)])] = pro2AdidCountsRDD.groupByKey() // (6, (16,15),(12,22),(19,20),(2,13),...)

    val result = pro2AdidsRDD.mapValues { item =>
      item.toList.sortWith(_._2 > _._2).take(3// (6, (12,22),(19,20),(16,15))
    }

    println("---")

    println(result.collect())

    val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH")

    // 需求2:統計每個省份每個小時點擊 TOP3 廣告的 ID
    // pro-hour-adid
    val proAndTimeAndAdid2CountRDD: RDD[(String, Int)] = clickRDD.map { click =>
      (click.province + "_" + simpleDateFormat.format(new Date(click.timestamp)) + "_" + click.adid, 1// (6_2019-4-25 10_16, 1)
    }
    val proAndTimeAndAdid2CountsRDD: RDD[(String, Int)] = proAndTimeAndAdid2CountRDD.reduceByKey(_ + _) // (6_2019-4-25 10_16, 15)

    // 再逐漸放大粒度
    val result2 = proAndTimeAndAdid2CountsRDD.map { item =>
      val param = item._1.split("_")
      (param(0) + "_" + param(1), (param(2).toInt, item._2)) // ((6_2019-4-25 10), (16, 15))
    }.groupByKey().mapValues { item =>
      item.toList.sortWith(_._2 > _._2).take(3// ((6_2019-4-25 10), (16, 15))
    }.map{ item =>
      val param = item._1.split("_")
      (param(0).toInt, (param(1), item._2)) // (6, (2019-4-25 10, (16, 15))
    }.groupByKey()

    println(result2.collect())

    println("---")

    sc.stop()
  }
}

調試結果截圖:
需求1:統計每個省份點擊 TOP3 的廣告 ID


需求2:統計每個省份每個小時點擊 TOP3 廣告的 ID

第3章 鍵值對 RDD

  鍵值對 RDD 是 Spark 中許多操做所須要的常見數據類型。本章作特別講解。除了在基礎 RDD 類中定義的操做以外,Spark 爲包含鍵值對類型的 RDD 提供了一些專有的操做,在 PairRDDFunctions 專門進行了定義。這些 RDD 被稱爲 pair RDD。
  有不少種方式建立 pair RDD,在輸入輸出章節會講解。通常若是從一個普通的 RDD 轉 爲 pair RDD 時,能夠調用 map() 函數來實現,傳遞的函數須要返回鍵值對。

3.1 鍵值對 RDD 的轉化操做

3.1.1 轉化操做列表

上一章進行了練習,這一章會重點講解。
針對一個 pair RDD的轉化操做

針對兩個 pair RDD的轉化操做

3.1.2 聚合操做

  當數據集以鍵值對形式組織的時候,聚合具備相同鍵的元素進行一些統計是很常見的操做。以前講解過基礎 RDD 上的 fold()、combine()、reduce() 等行動操做,pair RDD 上則 有相應的針對鍵的轉化操做。Spark 有一組相似的操做,能夠組合具備相同鍵的值。這些操做返回 RDD,所以它們是轉化操做而不是行動操做。
  reduceByKey() 與 reduce() 至關相似,它們都接收一個函數,並使用該函數對值進行合併。 reduceByKey() 會爲數據集中的每一個鍵進行並行的歸約操做,每一個歸約操做會將鍵相同的值合併起來。由於數據集中可能有大量的鍵,因此 reduceByKey() 沒有實現爲向用戶程序返回一個值的行動操做。實際上,它會返回一個由各鍵和對應鍵歸約出來的結果值組成的新的 RDD。
  foldByKey() 則與 fold() 至關相似,它們都使用一個與 RDD 和合並函數中的數據類型相同的零值做爲初始值。與 fold() 同樣,foldByKey() 操做所使用的合併函數對零值與另外一個元素進行合併,結果仍爲該元素。

  求均值操做:版本一

input.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

圖解以下:

  combineByKey() 是最爲經常使用的基於鍵進行聚合的函數。大多數基於鍵聚合的函數都是用它實現的。和 aggregate() 同樣,combineByKey() 可讓用戶返回與輸入數據的類型不一樣的返回值。
  要理解 combineByKey(),要先理解它在處理數據時是如何處理每一個元素的。因爲 combineByKey() 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。
  若是這是一個新的元素,combineByKey() 會使用一個叫做 createCombiner() 的函數來建立那個鍵對應的累加器的初始值。須要注意的是,這一過程會在每一個分區中第一次出現各個鍵時發生,而不是在整個 RDD 中第一次出現一個鍵時發生。
  若是這是一個在處理當前分區以前已經遇到的鍵,它會使用 mergeValue() 方法將該鍵的累加器對應的當前值與這個新的值進行合併。
  因爲每一個分區都是獨立處理的,所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應同一個鍵的累加器,就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。

  求均值:版本二

val result = input.combineByKey(
  (v) => (v, 1),
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

result.collectAsMap().map(println(_))

3.1.3 數據分組

  若是數據已經以預期的方式提取了鍵,groupByKey() 就會使用 RDD 中的鍵來對數據進行分組。對於一個由類型 K 的鍵和類型 V 的值組成的 RDD,所獲得的結果 RDD 類型會是 [K, Iterable[V]]。
  groupBy() 能夠用於未成對的數據上,也能夠根據除鍵相同之外的條件進行分組。它能夠接收一個函數,對源 RDD 中的每一個元素使用該函數,將返回結果做爲鍵再進行分組。
  多個 RDD 分組,可使用 cogroup 函數,cogroup() 的函數對多個共享同一個鍵的 RDD 進行分組。對兩個鍵的類型均爲 K 而值的類型分別爲 V 和 W 的 RDD 進行 cogroup() 時,獲得的結果 RDD 類型爲 [(K, (Iterable[V], Iterable[W]))]。若是其中的 一個 RDD 對於另外一個 RDD 中存在的某個鍵沒有對應的記錄,那麼對應的迭代器則爲空。 cogroup() 提供了爲多個 RDD 進行數據分組的方法。

3.1.4 鏈接

  鏈接主要用於多個 pair RDD 的操做,鏈接方式多種多樣:右外鏈接、左外鏈接、交叉鏈接以及內鏈接。
  普通的 join 操做符表示內鏈接 2。只有在兩個 pair RDD 中都存在的鍵才叫輸出。當一個輸入對應的某個鍵有多個值時,生成的 pair RDD 會包括來自兩個輸入 RDD 的每一組相對應的記錄。
  leftOuterJoin() 產生的 pair RDD 中,源 RDD 的每個鍵都有對應的記錄。每一個鍵相應的值是由一個源 RDD 中的值與一個包含第二個 RDD 的值的 Option(在 Java 中爲 Optional)對象組成的二元組。
  rightOuterJoin() 幾乎與 leftOuterJoin() 徹底同樣,只不過預期結果中的鍵必須出如今第二個 RDD 中,而二元組中的可缺失的部分則來自於源 RDD 而非第二個 RDD。

示例圖解以下:

3.1.5 數據排序

sortByKey() 函數接收一個叫做 ascending 的參數,表示咱們是否想要讓結果按升序排序(默認值爲 true,默認升序)。

3.2 鍵值對 RDD 的行動操做

3.3 鍵值對 RDD 的數據分區

  Spark 目前支持 Hash 分區和 Range 分區,用戶也能夠自定義分區,Hash 分區爲當前的默認分區,Spark 中分區器直接決定了 RDD 中分區的個數、RDD 中每條數據通過 Shuffle 過程屬於哪一個分區和 Reduce 的個數。

注意
  (1) 只有 Key-Value 類型的 RDD 纔有分區的,非 Key-Value 類型的 RDD 分區的值是 None。
  (2) 每一個 RDD 的分區 ID 範圍:0~numPartitions-1,決定這個值是屬於那個分區的。

3.3.1 獲取 RDD 的分區方式

  能夠經過使用 RDD 的 partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象, 經過 get 方法獲取其中的值。

3.3.2 Hash 分區方式

  HashPartitioner 分區的原理:對於給定的 key,計算其 hashCode,併除以分區的個數取餘,若是餘數小於 0,則用 餘數+分區的個數,最後返回的值就是這個 key 所屬的分區 ID。

示例代碼:

scala> nopar.partitioner
res20: Option[org.apache.spark.Partitioner] = None

scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala>nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect
res0: Array[String] = Array("0 : "1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : "5 : (2,3), 6 : (3,6), 7 : (3,8)) 
scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[12] at partitionBy at <console>:26

scala> hashpar.count
res18: Long = 6

scala> hashpar.partitioner
res21: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)

scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res19: Array[Int] = Array(0312000)

3.3.3 Range 分區方式

  HashPartitioner 分區弊端:可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有 RDD 的所有數據。
  RangePartitioner 分區優點:儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,一個分區中的元素確定都是比另外一個分區內的元素小或者大。
  可是分區內的元素是不能保證順序的。簡單的說就是將必定範圍內的數映射到某一個分區內
  RangePartitioner 做用:將必定範圍內的數映射到某一個分區內,在實現中,分界的算法尤其重要。用到了水塘抽樣算法

3.3.4 自定義分區方式(重點)

  要實現自定義的分區器,你須要繼承 org.apache.spark.Partitioner 類並實現下面3個方法:

numPartitions: Int  返回建立出來的分區數。
getPartition(key: Any): Int  返回給定鍵的分區編號( 0 到 numPartitions-1 )。
equals()  Java 判斷相等性的標準方法。這個方法的實現很是重要,Spark 須要用這個方法來檢查你的分區器對象是否和其餘分區器實例相同,這樣 Spark 才能夠判斷兩個 RDD 的分區方式是否相同。

  假設咱們須要將相同後綴的數據寫入相同的文件,咱們經過將相同後綴的數據分區到相同的分區並保存輸出來實現。

示例代碼1:
在 IDEA 中寫代碼

package com.atguigu.spark

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
  * 自定義分區案例
  */

class CustomerPartitioner(numPartsIntextends Partitioner {

  // 覆蓋分區數
  override def numPartitions: Int = numParts

  // 覆蓋分區號獲取函數
  override def getPartition(key: Any): Int = {
    val ckey: String = key.toString
    ckey.substring(ckey.length - 1).toInt % numParts
  }
}

object CustomerPartitionerDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[*]")setAppName("partitioner")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List("aa.2""bb.2""cc.3""dd.3""ee.5"))

    data.map((_, 1)).partitionBy(new CustomerPartitioner(5)).keys.saveAsTextFile("hdfs://hadoop102:9000/partitioner")

    println("---")

    sc.stop()
  }
}

示例代碼2:
在 Spark Shell 中寫代碼

scala> val data=sc.parallelize(List("aa.2","bb.2","cc.3","dd.3","ee.5").zipWithIndex,2)    函數 zipWithIndex 做用是快速構建 KV 結構的數據
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> data.collect
res4: Array[(String, Int)] = Array((aa.2,0), (bb.2,1), (cc.3,2), (dd.3,3), (ee.5,4))

scala> data.mapPartitionsWithIndex((index,iter) => Iterator(index.toString +" : "+ iter.mkString("|"))).collect
res5: Array[String] = Array(0 : (aa.2,0)|(bb.2,1), 1 : (cc.3,2)|(dd.3,3)|(ee.5,4))

scala> :paste
// Entering paste mode (ctrl-D to finish)
class CustomerPartitioner(numPartsIntextends org.apache.spark.Partitioner{

  // 覆蓋分區數
  override def numPartitions: Int = numParts

  // 覆蓋分區號獲取函數
  override def getPartition(key: Any): Int = {
    val ckey: String = key.toString
    ckey.substring(ckey.length - 1).toInt % numParts
  }
}

// Exiting paste mode, now interpreting.

defined class CustomerPartitioner

scaladata.partitionBy(new CustomerPartitioner(4))     分4個區
res7org.apache.spark.rdd.RDD[(StringInt)] 
= ShuffledRDD[9] at partitionBy at <console>:31

scala> res7.mapPartitionsWithIndex((index,iter) => Iterator(index.toString +" : "+ iter.mkString("|"))).collect
res8: Array[String] = Array("0 : "1 : (ee.5,4), 2 : (aa.2,0)|(bb.2,1), 3 : (cc.3,2)|(dd.3,3))

  使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法便可。Spark 中有許多依賴於數據混洗的方法,好比 join() 和 groupByKey(),它們也能夠接收一個可選的 Partitioner 對象來控制輸出數據的分區方式。

3.3.5 分區 Shuffle 優化

  在分佈式程序中,通訊的代價是很大的,所以控制數據分佈以得到最少的網絡傳輸能夠極大地提高總體性能。
  Spark 中全部的鍵值對 RDD 均可以進行分區。系統會根據一個針對鍵的函數對元素進行分組。主要有哈希分區範圍分區,固然用戶也能夠自定義分區函數。
  經過分區能夠有效提高程序性能。以下例子:
  分析這樣一個應用,它在內存中保存着一張很大的用戶信息,也就是一個由 (UserID, UserInfo) 對組成的 RDD,其中 UserInfo 包含一個該用戶所訂閱的主題的列表。該應用會週期性地將這張表與一個小文件進行組合,這個小文件中存着過去五分鐘內發生的事件,其實就是一個由 (UserID, LinkInfo) 對組成的表,存放着過去五分鐘內某網站各用戶的訪問狀況。例如,咱們可能須要對用戶訪問其未訂閱主題的頁面的狀況進行統計。
  解決方案一:
  


  圖解以下:
  
  這段代碼能夠正確運行,可是不夠高效。這是由於在每次調用 processNewLogs() 時都會用到 join() 操做,而咱們對數據集是如何分區的卻一無所知。默認狀況下,鏈接操做會將兩個數據集中的全部鍵的哈希值都求出來,將該哈希值相同的記錄經過網絡傳到同一臺機器上,而後在那臺機器上對全部鍵相同的記錄進行鏈接操做。由於 userData 表比每五分鐘出現的訪問日誌表 events 要大得多,因此要浪費時間作不少額外工做,在每次調用時都對 userData 表進行哈希值計算和跨節點數據混洗,下降了程序的執行效率。
  優化方法:
  
  圖解以下:
  

3.3.6 基於分區進行操做

  基於分區對數據進行操做可讓咱們避免爲每一個數據元素進行重複的配置工做。諸如打開數據庫鏈接或建立隨機數生成器等操做,都是咱們應當儘可能避免爲每一個元素都配置一次的工做。Spark 提供基於分區的 mapPartition 和 foreachPartition,讓你的部分代碼只對 RDD 的每一個分區運行一次,這樣能夠幫助下降這些操做的代價。

3.3.7 從分區中獲益的操做

  可以從數據分區中得到性能提高的操做有 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey() 以及 lookup() 等。

第4章 數據讀取與保存主要方式

4.1 文本文件輸入輸出

  當咱們將一個文本文件讀取爲 RDD 時,輸入的每一行都會成爲 RDD 的一個元素。也能夠將多個完整的文本文件一次性讀取爲一個 pair RDD,其中鍵是文件名,值是文件內容。

val input = sc.textFile("./README.md"

  若是傳遞目錄,則將目錄下的全部文件讀取做爲 RDD。
  文件路徑支持通配符。
  經過 wholeTextFiles() 對於大量的小文件讀取效率比較高,大文件效果沒有那麼高。
  Spark 經過 saveAsTextFile() 進行文本文件的輸出,該方法接收一個路徑,並將 RDD 中的內容都輸入到路徑對應的文件中。Spark 將傳入的路徑做爲目錄對待,會在那個目錄下輸出多個文件。這樣,Spark 就能夠從多個節點上並行輸出了。

result.saveAsTextFile(outputFile)

示例代碼:

scala> val readme = sc.textFile("./README.md")
readme: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[3] at textFile at <console>:24

scala> readme.collect
res1: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", <http://spark.apache.org/>, "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html)., This README file only contains basic setup instructions., "", ## Building Spark, "", Spark is built using [Apache Maven](...

scala> val readme = sc.textFile("hdfs://hadoop102:9000/README.md")
readme: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/README.md MapPartitionsRDD[7] at textFile at <console>:24

scala> readme.collect
res7: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", <http://spark.apache.org/>, "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html)., This README file only contains basic setup instructions., "", ## Building Spark, "", Spark is built using [Apache Maven](...

scala> readme.saveAsTextFile("hdfs://hadoop102:9000/test")

scala> readme.saveAsTextFile("./saveTest")      注意:Spark Shell 若是開啓的集羣模式,則文件分散的存儲在其餘節點上;若是開啓的是 Client 模式,則文件存儲在本地當前目錄

4.2 JSON 文件輸入輸出

  若是 JSON 文件中每一行就是一個 JSON 記錄,那麼能夠經過將 JSON 文件當作文本文件來讀取,而後利用相關的 JSON 庫對每一條數據進行 JSON 解析。

示例代碼:

scala> import org.json4s._      須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入
import org.json4s._ 

scala> import org.json4s.jackson.JsonMethods._      須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入
import org.json4s.jackson.JsonMethods._     

scala> import org.json4s.jackson.Serialization      須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入
import org.json4s.jackson.Serialization

scala> var result = sc.textFile("/opt/module/spark-2.1.1-bin-hadoop2.7/examples/src/main/resources/people.json")
result: org.apache.spark.rdd.RDD[String] = /opt/module/spark-2.1.1-bin-hadoop2.7/examples/src/main/resources/people.json MapPartitionsRDD[3] at textFile at <console>:40

scala> result.collect
res4: Array[String] = Array({"name":"Michael""age":30}, {"name":"Andy""age":30}, {"name":"Justin""age":19})

scala> class Person(var nameStringvar ageInt)
defined class Person

scalaresult.collect().foreach(x 
=> {var c = parse(x).extract[Person];println(c.name + "," + c.age)})
Michael,30
Andy,30
Justin,19

  若是 JSON 數據是跨行的,那麼只能讀入整個文件,而後對整個文件進行解析。
  JSON 數據的輸出主要是經過在輸出以前將由結構化數據組成的 RDD 轉爲字符串 RDD,而後使用 Spark 的文本文件 API 寫出去。說白了仍是以文本文件的形式存儲,只是文本的格式已經在程序中轉換爲 JSON。

4.3 CSV 文件輸入輸出

  讀取 CSV/TSV 數據和讀取 JSON 數據類似,都須要先把文件看成普通文本文件來讀取數據,而後經過將每一行進行解析實現對 CSV 的讀取。
  CSV/TSV 數據的輸出也是須要將結構化 RDD 經過相關的庫轉換成字符串 RDD,而後使用 Spark 的文本文件 API 寫出去。

4.4 SequenceFile 文件輸入輸出

  SequenceFile 文件是 Hadoop 用來存儲二進制形式的 key-value 對而設計的一種平面文件(Flat File)。
  Spark 有專門用來讀取 SequenceFile 文件的接口。在 SparkContext 中,能夠調用 sequenceFile[keyClass, valueClass](path)

類型對應表以下:


圖解以下:

示例代碼:

scala> val data = sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> data.saveAsSequenceFile("hdfs://hadoop102:9000/sequdata")

scala> val sdata = sc.sequenceFile[Int,String]("hdfs://hadoop102:9000/sequdata/p*")
sdata: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at sequenceFile at <console>:24

scala> sdata.collect()
res14: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))

  能夠直接調用 saveAsSequenceFile(path) 保存你的 pair RDD,它會幫你寫出數據。須要鍵和值可以自動轉爲 Writable 類型。

4.5 對象文件輸入輸出

  對象文件是將對象序列化後保存的文件,採用 Java 的序列化機制。能夠經過 objectFile[k,v](path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD,也能夠經過調用 saveAsObjectFile() 實現對對象文件的輸出。由於是序列化,因此要指定類型。

示例代碼:

scala> val data = sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> data.saveAsObjectFile("hdfs://hadoop102:9000/objdata")
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> val objrdd: org.apache.spark.rdd.RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://hadoop102:9000/objdata/p*")
objrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[28] at objectFile at <console>:25

scala> objrdd.collect()
res20: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))

4.6 Hadoop 輸入輸出格式

  Spark 的整個生態系統與 Hadoop 是徹底兼容的,因此對於 Hadoop 所支持的文件類型或者數據庫類型,Spark 也一樣支持。另外,因爲 Hadoop 的 API 有新舊兩個版本,因此 Spark 爲了可以兼容 Hadoop 全部的版本,也提供了兩套建立操做接口。對於外部存儲建立操做而言,hadoopRDD 和 newHadoopRDD 是最爲抽象的兩個函數接口,主要包含如下四個參數:
  1) 輸入格式(InputFormat): 指定數據輸入的類型,如 TextInputFormat 等,新舊兩個版本所引用的版本分別是 org.apache.hadoop.mapred.InputFormat 和 org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
  2) 鍵類型: 指定 [K,V] 鍵值對中 K 的類型
  3) 值類型: 指定 [K,V] 鍵值對中 V 的類型
  4) 分區值: 指定由外部存儲生成的 RDD 的 partition 數量的最小值,若是沒有指定,系統會使用默認值 defaultMinSplits
  其餘建立操做的 API 接口都是爲了方便最終的 Spark 程序開發者而設置的,是這兩個接口的高效實現版本。例如,對於 textFile 而言,只有 path 這個指定文件路徑的參數,其餘參數在系統內部指定了默認值。

兼容舊版本 HadoopAPI 的建立操做


兼容新版本 HadoopAPI 的建立操做

注意:
  1. 在 Hadoop 中以壓縮形式存儲的數據,不須要指定解壓方式就可以進行讀取,由於 Hadoop 自己有一個解壓器會根據壓縮文件的後綴推斷解壓算法進行解壓。
  2. 若是用 Spark 從 Hadoop 中讀取某種類型的數據不知道怎麼讀取的時候,上網查找一個使用 map-reduce 的時候是怎麼讀取這種這種數據的,而後再將對應的讀取方式改寫成上面的 hadoopRDD 和 newAPIHadoopRDD 兩個類就好了。

示例代碼:

scala> val data = sc.parallelize(Array((30,"hadoop"), (71,"hive"), (11,"cat")))
data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at parallelize at <console>:35

scala> import org.apache.hadoop.io._     須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入

scala> import org.apache.hadoop.mapreduce.lib.output._     須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入

scala> data.saveAsNewAPIHadoopFile("hdfs://hadoop102:9000/output/", classOf[LongWritable] ,classOf[Text] ,classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[LongWritable, Text]])

  對於 RDD 最後的歸宿除了返回爲集合和標量,也能夠將 RDD 存儲到外部文件系統或者數據庫中,Spark 系統與 Hadoop 是徹底兼容的,因此 MapReduce 所支持的讀寫文件或者數據庫類型 Spark 也一樣支持。另外,因爲 Hadoop 的 API 有新舊兩個版本,因此 Spark 爲了可以兼容 Hadoop 全部的版本了,也提供了兩套讀取 Hadoop 文件 API。
  將 RDD 保存到 HDFS 中在一般狀況下須要關注或者設置五個參數,即文件保存的路徑、Key值的class類型、Value值的class類型、RDD的輸出格式(OutputFormat,如 TextOutputFormat/SequenceFileOutputFormat)、以及最後一個相關的參數 codec (這個參數表示壓縮存儲的壓縮形式,如 DefaultCodec、Gzip、Codec 等)

兼容舊版 API
源碼:

saveAsObjectFile(path: String): Unit
saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
saveAsTextFile(path: String): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
saveAsHadoopDataset(conf: JobConf): Unit

  這裏列出的 API,前面 6 個都是 saveAsHadoopDataset 的簡易實現版本,僅僅支持將 RDD 存儲到 HDFS 中,而 saveAsHadoopDataset 的參數類型是 JobConf,因此其不只可以將 RDD 存儲到 HDFS 中,也能夠將 RDD 存儲到其餘數據庫中,如 HBase、MangoDB、Cassandra 等。

兼容新版 API
源碼:

saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
saveAsNewAPIHadoopDataset(conf: Configuration): Unit

  一樣的,前 2 個 API 是 saveAsNewAPIHadoopDataset 的簡易實現,只能將 RDD 存到 HDFS 中,而 saveAsNewAPIHadoopDataset 比較靈活,新版的 API 沒有 codec 的參數,因此要壓縮存儲文件到 HDFS 中須要使用 hadoopConfiguration 參數,設置對應 mapreduce.map.output.compress.codec 參數和 mapreduce.map.output.compress 參數。

注意:
  1. 若是不知道怎麼將 RDD 存儲到 Hadoop 生態的系統中,主要上網搜索一下對應的 map-reduce 是怎麼將數據存儲進去的,而後改寫成對應的 saveAsHadoopDataset 或 saveAsNewAPIHadoopDataset 就能夠了。

示例代碼:

scala> import org.apache.hadoop.io._     須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入

scala> import org.apache.hadoop.mapreduce.lib.input._     須要導入一些 jar 包支持,或者在打開 spark shell 的時候在 --jars 中導入

scala> val read = sc.newAPIHadoopFile[LongWritable, Text, org.apache.hadoop.mapreduce.lib.input.TextInputFormat]("hdfs://hadoop102:9000/output/part*", classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat], classOf[LongWritable], classOf[Text])
read: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = hdfs://hadoop102:9000/output/part* NewHadoopRDD[48] at newAPIHadoopFile at <console>:35

scala> read.map{case (k, v) => v.toString}.collect
res44: Array[String] = Array(30 hadoop, 71      hive, 11        cat)

4.7 文件系統的輸入輸出

  Spark 支持讀寫不少種文件系統,像本地文件系統、Amazon S三、HDFS 等。

4.8 數據庫的輸入輸出

關係型數據庫鏈接:支持經過 Java JDBC 訪問關係型數據庫。須要經過 JdbcRDD 進行,示例以下:

注意:須要先將 mysql-connector-java-5.1.27-bin.jar 拷貝至 spark 的 /opt/module/spark-2.1.1-bin-hadoop2.7/jars 目錄下,而後分發至其餘機器節點,而後再啓動 spark-shell 交互窗口。

Mysql 讀取:
示例代碼:

def main (args: Array[String] ) {
  val sparkConf = new SparkConf ().setMaster ("local[2]").setAppName ("JdbcApp")
  val sc = new SparkContext (sparkConf)

  val rdd = new org.apache.spark.rdd.JdbcRDD(
    sc,
    () => {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      java.sql.DriverManager.getConnection ("jdbc:mysql://hadoop102:3306/rdd""root""123456")
    },
    "select * from rddtable where id >= ? and id <= ?;",
    1,
    10,
    1,
    r => (r.getInt(1), r.getString(2)))

  println(rdd.count ())
  rdd.foreach(println (_))
  sc.stop ()
}

Mysql 寫入:
示例代碼:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
  val sc = new SparkContext(sparkConf)
  val data = sc.parallelize(List("Female""Male""Female"))

  data.foreachPartition(insertData)
}

def insertData(iterator: Iterator[String]): Unit = {
  Class.forName ("com.mysql.jdbc.Driver").newInstance()
  val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd""root""123456")
  iterator.foreach(data => {
    val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
    ps.setString(1, data) 
    ps.executeUpdate()
  })
}

spark-shell 交互界面操做代碼:

[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ bin/spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/04/26 11:10:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/04/26 11:10:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.25.102:4040
Spark context available as '
sc' (master = spark://hadoop102:7077, app id = app-20190426111025-0004).
Spark session available as '
spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '
_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import com.mysql
import com.mysql

scala> import com.mysql.jdbc.Driver
import com.mysql.jdbc.Driver

# 從 Mysql 的數據庫表中讀取數據
scala> val rdd 
new org.apache.spark.rdd.JdbcRDD(sc,() => {Class.forName("com.mysql.jdbc.Driver").newInstance();java.sql.DriverManager.getConnection ("jdbc:mysql://hadoop102:3306/rdd""root""123456")},"select * from rddtable where id >= ? and id <= ?;",1,10,1,r => (r.getInt(1), r.getString(2)))
rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[0] at JdbcRDD at <console>:26

scala> rdd.collect
res0: Array[(Int, String)] = Array((1,abc), (2,dddd), (3,ddds))

# 向 Mysql 的數據庫表中寫入數據
scala> :paste
// Entering paste mode (ctrl-D to finish)

def insertData(iterator: Iterator[String]): Unit = {
  Class.forName ("com.mysql.jdbc.Driver").newInstance()
  val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd""root""123456")
  iterator.foreach(data => {
    val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
    ps.setString(1, data) 
    ps.executeUpdate()
  })
}

// Exiting paste mode, now interpreting.

insertData: (iterator: Iterator[String])Unit

scala> val data = sc.parallelize(List("Female""Male""Female"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> data.foreachPartition(insertData)

# 從 Mysql 的數據庫表中再次讀取數據
scala> val rdd = new org.apache.spark.rdd.JdbcRDD(sc,() => {Class.forName("com.mysql.jdbc.Driver").newInstance();java.sql.DriverManager.getConnection ("jdbc:mysql://hadoop102:3306/rdd""root""123456")},"select * from rddtable where id >= ? and id <= ?;",1,10,1,r => (r.getInt(1), r.getString(2)))
rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[2] at JdbcRDD at <console>:26

scala> rdd.collect
res2: Array[(Int, String)] = Array((1,abc), (2,dddd), (3,ddds), (4,Female), (5,Female), (6,Male))

JdbcRDD 接收這樣幾個參數:
  首先,要提供一個用於對數據庫建立鏈接的函數。這個函數讓每一個節點在鏈接必要的配置後建立本身讀取數據的鏈接。
  接下來,要提供一個能夠讀取必定範圍內數據的查詢,以及查詢參數中 lowerBound 和 upperBound 的值。這些參數可讓 Spark 在不一樣機器上查詢不一樣範圍的數據,這樣就不會因嘗試在一個節點上讀取全部數據而遭遇性能瓶頸。
  這個函數的最後一個參數是一個能夠將輸出結果從轉爲對操做數據有用的格式的函數。若是這個參數空缺,Spark 會自動將每行結果轉爲一個對象數組。

Cassandra 數據庫和 ElasticSearch 集成:

HBase 數據庫
因爲 org.apache.hadoop.hbase.mapreduce.TableInputFormat 類的實現,Spark 能夠經過 Hadoop 輸入格式訪問 HBase。這個輸入格式會返回鍵值對數據,其中鍵的類型爲 org.apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型爲 org.apache.hadoop.hbase.client.Result。

HBase 讀取:
示例代碼:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)

  val conf = HBaseConfiguration.create()
  // HBase 中的表名
  conf.set(TableInputFormat.INPUT_TABLE, "fruit")

  val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])

  val count = hBaseRDD.count()
  println("hBaseRDD RDD Count:"+ count)
  hBaseRDD.cache()
  hBaseRDD.foreach {
    case (_, result) =>
      val key = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
      val color = Bytes.toString(result.getValue("info".getBytes, "color".getBytes))
      println("Row key:" + key + " Name:" + name + " Color:" + color)
  }
  sc.stop()
}

HBase 寫入:
示例代碼:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)

  val conf = HBaseConfiguration.create()
  val jobConf = new JobConf(conf)
  jobConf.setOutputFormat(classOf[TableOutputFormat])
  jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")

  val fruitTable = TableName.valueOf("fruit_spark")
  val tableDescr = new HTableDescriptor(fruitTable)
  tableDescr.addFamily(new HColumnDescriptor("info".getBytes))

  val admin = new HBaseAdmin(conf)
  if (admin.tableExists(fruitTable)) {
    admin.disableTable(fruitTable)
    admin.deleteTable(fruitTable)
  }
  admin.createTable(tableDescr)

  def convert(triple: (Int, String, Int)= {
    val put = new Put(Bytes.toBytes(triple._1))
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
    (new ImmutableBytesWritable, put)
  }
  val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
  val localData = initialRDD.map(convert)

  localData.saveAsHadoopDataset(jobConf)
}

第5章 RDD 編程進階

5.1 累加器

  累加器用來對信息進行聚合,一般在向 Spark 傳遞函數時,好比使用 map() 函數或者用 filter() 傳條件時,可使用驅動器程序中定義的變量,可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。 若是咱們想實現全部分片處理時更新共享變量的功能,那麼累加器能夠實現咱們想要的效果。
  針對一個輸入的日誌文件,若是咱們想計算文件中全部空行的數量,咱們能夠編寫如下程序:

示例代碼:

scala> val notice = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32

scala> val blanklines = sc.accumulator(0)   聲明一個累加器,並傳入初始值 0 
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0

scala> val tmp = notice.flatMap(line => {
     |    if (line == "") {
     |       blanklines += 1    加加
     |    }
     |    line.split(" ")
     | })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36

scala> tmp.count()
res31: Long = 3213

scala> blanklines.value     在 Driver 中訪問累加器的值
res32: Int = 171

累加器的用法以下所示:
  經過在驅動器中調用 SparkContext.accumulator(initialValue) 方法,建立出存有初始值的累加器。返回值爲 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型。
  Spark 閉包裏的執行器代碼可使用累加器的 += 方法(在 Java 中是 add)增長累加器的值。
  Driver 驅動器程序能夠調用累加器的 value 屬性(在 Java 中使用 value() 或 setValue() )來訪問累加器的值。
  注意:工做節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。
  對於要在行動操做中使用的累加器,Spark 只會把每一個任務對各累加器的修改應用一次。所以,若是想要一個不管在失敗仍是重複計算時都絕對可靠的累加器,咱們必須把它放在 foreach() 這樣的行動操做中使用。轉換操做中累加器可能會發生不止一次更新,因此通常不推薦在轉換操做中使用。

5.2 自定義累加器

  自定義累加器類型的功能在 1.X 版本中就已經提供了,可是使用起來比較麻煩,在 2.0 版本後,累加器的易用性有了較大的改進,並且官方還提供了一個新的抽象類:AccumulatorV2 來提供更加友好的自定義類型累加器的實現方式。實現自定義類型累加器須要繼承 AccumulatorV2 並至少覆寫下例中出現的方法,下面這個累加器能夠用於在程序運行過程當中收集一些文本類信息,最終以 Set[String] 的形式返回。

示例代碼:

package com.atguigu.spark

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._

/**
  * 自定義累加器案例
  */

class LogAccumulator extends AccumulatorV2[Stringjava.util.Set[String]] {

  // 定義一個累加器的內存結構,用於保存帶有字母的字符串
  private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()

  // 重寫方法檢測累加器內部數據結構是否爲空
  override def isZero: Boolean = {
    // 檢查 logArray 是否爲空
    _logArray.isEmpty
  }

  // 重置你的累加器數據結構
  override def reset(): Unit = {
    // clear 方法清空 _logArray 的全部內容
    _logArray.clear()
  }

  // 提供轉換或者行動操做中添加累加器值的方法
  override def add(v: String): Unit = {
    // 將帶有字幕的字符串添加到 _logArray 內存結構中
    _logArray.add(v)
  }

  // 多個分區的累加器的值進行合併的操做函數
  override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
    // 經過類型監測將o這個累加器的值加入到當前 _logArray 結構中
    other match {
      case o: LogAccumulator => _logArray.addAll(o.value)
    }
  }

  override def value: java.util.Set[String] = {
    java.util.Collections.unmodifiableSet(_logArray)
  }

  // 讓 Spark 框架可以調用 copy 函數產生一個新的系統的類,即累加器實例
  override def copy(): AccumulatorV2[String, java.util.Set[String]] = {
    val newAcc = new LogAccumulator()
    _logArray.synchronized {
      newAcc._logArray.addAll(_logArray)
    }
    newAcc
  }
}

// 過濾掉帶字母的
object LogAccumulatorDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[*]")setAppName("LogAccumulator")
    val sc = new SparkContext(conf)

    val accum = new LogAccumulator
    sc.register(accum, "logAccum"// 先註冊一個自定義的累加器
    val sum = sc.parallelize(Array("1""2a""3""4b""5""6""7cd""8""9"), 2).filter(line => {
      val pattern = """^-?(\d+)"""
      val flag = line.matches(pattern)
      if (!flag) {
        accum.add(line)
      }
      flag
    }).map(_.toInt).reduce(_ + _)

    println("sum: " + sum)
    for (v <- accum.value) print(v + " ")
    println()
    sc.stop()
  }
}

輸出結果以下:

sum: 32
7cd 42

5.3 廣播變量

  廣播變量用來高效分發較大的對象。向全部工做節點發送一個較大的只讀值,以供一個或多個 Spark 操做使用。好比,若是你的應用須要向全部節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起來都很順手。
  傳統方式下,Spark 會自動把閉包中全部引用到的變量發送到工做節點上。雖然這很方便,但也很低效。緣由有二:首先,默認的任務發射機制是專門爲小任務進行優化的;其次,事實上你可能會在多個並行操做中使用同一個變量,可是 Spark 會爲每一個任務分別發送。

示例代碼:

scala> val broadcastVar = sc.broadcast(Array(123))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)

scala> broadcastVar.value
res33: Array[Int] = Array(123)

使用廣播變量的過程以下:
  (1) 經過對一個類型 T 的對象調用 SparkContext.broadcast 建立出一個 Broadcast[T] 對象。任何可序列化的類型均可以這麼實現。
  (2) 經過 value 屬性訪問該對象的值 (在 Java 中爲 value() 方法)。
  (3) 變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。

第6章 Spark Core 實例練習

  結合實際生產狀況編寫一個統計功能,經過分析 CDN 或者 Nginx 的日誌文件,統計出訪問的 PV、UV、IP 地址、訪問來源等相關數據。

日誌的格式爲:

IP        命中率    響應時間      請求時間     請求方法  請求URL   請求協議   狀態碼      響應大小     referer   用戶代理
ClientIP  Hit/Miss  ResponseTime  [Time Zone]  Method    URL       Protocol   StatusCode  TrafficSize  Referer   UserAgent

日誌文件 cdh.txt 內容示例:

100.79.121.48 HIT 33 [15/Feb/2017:00:00:46 +0800] "GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174055 "http://www.abc.com.cn/" "Mozilla/4.0+(compatible;+MSIE+6.0;+Windows+NT+5.1;+Trident/4.0;)"
111.19.97.15 HIT 18 [15/Feb/2017:00:00:39 +0800] "
GET http://cdn.v.abc.com.cn/videojs/video-js.css HTTP/1.1" 200 14727 "http://www.zzqbsm.com/" "Mozilla/5.0+(Linux;+Android+5.1;+vivo+X6Plus+D+Build/LMY47I)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Version/4.0+Chrome/35.0.1916.138+Mobile+Safari/537.36+T7/7.4+baiduboxapp/8.2.5+(Baidu;+P1+5.1)"
218.108.100.234 HIT 1 [15/Feb/2017:00:00:57 +0800] "
GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174050 "http://www.abc.com.cn/" "Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/53.0.2785.116+Safari/537.36"
......
......

6.1 計算獨立 IP 數

計算思路
  1. 從每行日誌中篩選出 IP 地址
  2. 去除重複的 IP 獲得獨立 IP 數

計算過程
  flatMap(x => IPPattern findFirstIn(x)) 經過正則取出每行日誌中的 IP 地址
  map(x => (x,1)) 將每行中的 IP 映射成 (IP,1),造成一個 pair RDD
  reduceByKey((x,y) => x+y) 將相同的 IP 合併,獲得 (IP,數量)
  sortBy(_._2, false) 按 IP 大小排序

統計結果

(114.55.227.102,9348)
(220.191.255.197,2640)
(115.236.173.94,2476)
(183.129.221.102,2187)
(112.53.73.66,1794)
(115.236.173.95,1650)
(220.191.254.129,1278)
(218.88.25.200,751)
(183.129.221.104,569)
(115.236.173.93,529)
獨立IP數:43649

6.2 統計每一個視頻獨立 IP 數

  有時咱們不但須要知道全網訪問的獨立 IP 數,更想知道每一個視頻訪問的獨立 IP 數

計算思路
  1. 篩選視頻文件將每行日誌拆分紅 (文件名, IP地址) 形式
  2. 按文件名分組,至關於數據庫的 group by,這時 RDD 的結構爲 (文件名, [IP1, IP1, IP2, …]),這時 IP 有重複
  3. 將每一個文件名中的 IP 地址去重,這時 RDD 的結果爲 (文件名, [IP1, IP2, …]),這時 IP 沒有重複

計算過程
  filter(x => x.matches(「.([0-9]+).mp4.「)) 篩選日誌中的視頻請求
  map(x => getFileNameAndIp(x)) 將每行日誌格式化成 (文件名, IP) 這種格式
  groupByKey() 按文件名分組,這時 RDD 結構爲 (文件名, [IP1, IP1, IP2, …]),IP 有重複
  map(x => (x.1, x._2.toList.distinct)) 去除 value 中重複的 IP 地址   sortBy(._2.size, false) 按 IP 數排序

計算結果

視頻:141081.mp4 獨立IP:2393
視頻:140995.mp4 獨立IP:2050
視頻:141027.mp4 獨立IP:1784
視頻:141090.mp4 獨立IP:1702
視頻:141032.mp4 獨立IP:1528
視頻:89973.mp4 獨立IP:1523
視頻:141080.mp4 獨立IP:1425
視頻:141035.mp4 獨立IP:1321
視頻:141082.mp4 獨立IP:1272
視頻:140938.mp4 獨立IP:816

6.3 統計一天中每一個小時間的流量

  有時我想知道網站每小時視頻的觀看流量,看看用戶都喜歡在什麼時間段過來看視頻

計算思路
  1. 將日誌中的訪問時間及請求大小兩個數據提取出來造成 RDD (訪問時間, 訪問大小),這裏要去除 404 之類的非法請求
  2. 按訪問時間分組造成 RDD (訪問時間, [大小1, 大小2, …])
  3. 將訪問時間對應的大小相加造成 (訪問時間, 總大小)

計算過程
  filter(x => isMatch(httpSizePattern, x)).filter(x => isMatch(timePattern, x)) 過濾非法請求
  map(x => getTimeAndSize(x)) 將日誌格式化成 RDD (請求小時, 請求大小)
  groupByKey() 按請求時間分組造成 RDD (請求小時, [大小1, 大小2, …])
  map(x => (x._1, x._2.sum)) 將每小時的請求大小相加,造成 RDD (請求小時, 總大小)

計算結果

00時 CDN流量=14G
01時 CDN流量=3G
02時 CDN流量=5G
03時 CDN流量=3G
04時 CDN流量=3G
05時 CDN流量=4G
06時 CDN流量=11G
07時 CDN流量=22G
08時 CDN流量=43G
09時 CDN流量=52G
10時 CDN流量=61G
11時 CDN流量=45G
12時 CDN流量=46G
13時 CDN流量=51G
14時 CDN流量=55G
15時 CDN流量=45G
16時 CDN流量=45G
17時 CDN流量=44G
18時 CDN流量=45G
19時 CDN流量=51G
20時 CDN流量=55G
21時 CDN流量=53G
22時 CDN流量=42G
23時 CDN流量=25G

示例代碼以下:

package com.atguigu.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

import scala.util.matching.Regex

object CdnStatics {

  val logger = LoggerFactory.getLogger(CdnStatics.getClass)

  // 匹配 IP 地址
  val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r

  // 匹配視頻文件名
  val videoPattern = "([0-9]+).mp4".r // .r()方法簡介:Scala 中將字符串轉換爲正則表達式

  // [15/Feb/2017:11:17:13 +0800]  匹配 2017:11 按每小時播放量統計
  val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r

  // 匹配 http 響應碼和請求數據大小
  val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r

  def main(args: Array[String]): Unit 
= {
    val conf = new SparkConf().setMaster("local[*]").setAppName("CdnStatics")
    val sc = new SparkContext(conf)

    val input = sc.textFile("D:\\learn\\JetBrains\\workspace_idea\\spark\\sparkcore_cdh\\src\\main\\resources\\cdn.txt").cache()

    // 統計獨立 IP 訪問量前 10 位
    ipStatics(input)

    // 統計每一個視頻獨立 IP 數
    videoIpStatics(input)

    // 統計一天中每一個小時間的流量
    flowOfHour(input)

    sc.stop()
  }

  // 統計獨立IP訪問量前 10 位
  def ipStatics(data: RDD[String]): Unit = {

    // 一、統計獨立 IP 數
    val ipNums = data.map(x => (IPPattern.findFirstIn(x).get, 1)).reduceByKey(_ + _).sortBy(_._2, false)

    // 輸出 IP 訪問數前量前 10 位
    ipNums.take(10).foreach(println)

    println("獨立IP數:" + ipNums.count())
  }

  // 統計每一個視頻獨立 IP 數
  def videoIpStatics(data: RDD[String]): Unit = {

    def getFileNameAndIp(line: String) = {
      (videoPattern.findFirstIn(line).mkString, IPPattern.findFirstIn(line).mkString)
    }

    // 二、統計每一個視頻獨立IP數
    data.filter(x => x.matches(".*([0-9]+)\\.mp4.*")).map(x => getFileNameAndIp(x)).groupByKey().map(x => (x._1, x._2.toList.distinct)).
      sortBy(_._2.size, false).take(10).foreach(x => println("視頻:" + x._1 + " 獨立IP數:" + x._2.size))
  }

  // 統計一天中每一個小時間的流量
  def flowOfHour(data: RDD[String]): Unit = {

    def isMatch(pattern: Regex, str: String) = {
      str match {
        case pattern(_*) => true
        case _ => false
      }
    }

    /**
      * 獲取日誌中小時和http 請求體大小
      *
      * @param line
      * @return
      */

    def getTimeAndSize(line: String) = {
      var res = (""0L)
      try {
        val httpSizePattern(code, size) line
        val timePattern(year, hour) 
= line
        res = (hour, size.toLong)
      } catch {
        case ex: Exception => ex.printStackTrace()
      }
      res
    }

    // 三、統計一天中每一個小時間的流量
    data.filter(x => isMatch(httpSizePattern, x)).filter(x => isMatch(timePattern, x)).map(x => getTimeAndSize(x)).groupByKey()
      .map(x => (x._1, x._2.sum)).sortByKey().foreach(x => println(x._1 + "時 CDN 流量=" + x._2 / (1024 * 1024 * 1024) + "G"))
  }
}

輸出結果以下:

(114.55.227.102,9348)
(220.191.255.197,2640)
(115.236.173.94,2476)
(183.129.221.102,2187)
(112.53.73.66,1794)
(115.236.173.95,1650)
(220.191.254.129,1278)
(218.88.25.200,751)
(183.129.221.104,569)
(115.236.173.93,529)
獨立IP數:43649
視頻:141081.mp4 獨立IP數:2393
視頻:140995.mp4 獨立IP數:2050
視頻:141027.mp4 獨立IP數:1784
視頻:141090.mp4 獨立IP數:1702
視頻:141032.mp4 獨立IP數:1528
視頻:89973.mp4 獨立IP數:1523
視頻:141080.mp4 獨立IP數:1425
視頻:141035.mp4 獨立IP數:1321
視頻:141082.mp4 獨立IP數:1272
視頻:140938.mp4 獨立IP數:816
08時 CDN 流量=43G
00時 CDN 流量=14G
16時 CDN 流量=45G
01時 CDN 流量=3G
09時 CDN 流量=52G
02時 CDN 流量=5G
17時 CDN 流量=44G
03時 CDN 流量=3G
10時 CDN 流量=61G
04時 CDN 流量=3G
18時 CDN 流量=45G
05時 CDN 流量=4G
06時 CDN 流量=11G
07時 CDN 流量=22G
11時 CDN 流量=45G
19時 CDN 流量=51G
12時 CDN 流量=46G
20時 CDN 流量=55G
13時 CDN 流量=51G
21時 CDN 流量=53G
14時 CDN 流量=55G
22時 CDN 流量=42G
15時 CDN 流量=45G
23時 CDN 流量=25G

附錄

兩張圖看清 IT 行業
IT行業橫向圖


IT行業縱向圖

2 種計算方式


2 種數據庫架構圖

原文出處:https://www.cnblogs.com/chenmingjun/p/10777091.html

相關文章
相關標籤/搜索