【大數據】SparkCore學習筆記

 

1RDD概述

1.1 什麼是RDD

RDDResilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象。代碼中是一個抽象類,它表明一個不可變、可分區、裏面的元素可並行計算的集合。java

1.2 RDD的屬性

 

1) 一組分區(Partition),即數據集的基本組成單位;mysql

2) 一個計算每一個分區的函數;es6

3) RDD之間的依賴關係;面試

4) 一個Partitioner,即RDD的分片函數;算法

5) 一個列表,存儲存取每一個Partition的優先位置(preferred location)。sql

1.3 RDD特色

RDD表示只讀的分區的數據集,對RDD進行改動,只能經過RDD的轉換操做,由一個RDD獲得一個新的RDD,新的RDD包含了從其餘RDD衍生所必需的信息。RDDs之間存在依賴,RDD的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化RDD來切斷血緣關係。shell

1.3.1 分區

RDD邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個compute函數獲得每一個分區的數據。若是RDD是經過已有的文件系統構建,則compute函數是讀取指定文件系統中的數據,若是RDD是經過其餘RDD轉換而來,則compute函數是執行轉換邏輯將其餘RDD的數據進行轉換。數據庫

 

1.3.2 只讀

以下圖所示,RDD是隻讀的,要想改變RDD中的數據,只能在現有的RDD基礎上建立新的RDDapache

 

 由一個RDD轉換到另外一個RDD,能夠經過豐富的操做算子實現,再也不像MapReduce那樣只能寫mapreduce了,以下圖所示。編程

 

 RDD的操做算子包括兩類,一類叫作transformations,它是用來將RDD進行轉化,構建RDD的血緣關係;另外一類叫作actions,它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中。下圖是RDD所支持的操做算子列表。

1.3.3 依賴

RDDs經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區是一一對應的,另外一種是寬依賴,下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。

 

1.3.4 緩存

若是在應用程序中屢次使用同一個RDD,能夠將該RDD緩存起來,該RDD只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用。以下圖所示,RDD-1通過一系列的轉換後獲得RDD-n並保存到hdfsRDD-1在這一過程當中會有個中間結果,若是將其緩存到內存,那麼在隨後的RDD-1轉換到RDD-m這一過程當中,就不會計算其以前的RDD-0了。

 

1.3.5 CheckPoint

雖然RDD的血緣關係自然地能夠實現容錯,當RDD的某個分區數據失敗或丟失,能夠經過血緣關係重建。可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。爲此,RDD支持checkpoint將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於checkpoint後的RDD不須要知道它的父RDDs了,它能夠從checkpoint處拿到數據。

2RDD編程

2.1 編程模型

Spark中,RDD被表示爲對象,經過對象上的方法調用來對RDD進行轉換。通過一系列的transformations定義RDD以後,就能夠調用actions觸發RDD的計算,action能夠是嚮應用程序返回結果(count, collect),或者是向存儲系統保存數據(saveAsTextFile)。在Spark中,只有遇到action,纔會執行RDD的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。

    要使用Spark,開發者須要編寫一個Driver程序,它被提交到集羣以調度運行Worker,以下圖所示。Driver中定義了一個或多個RDD,並調用RDD上的actionWorker則執行RDD分區計算任務。

 

 

2.2 RDD的建立

Spark中建立RDD的建立方式能夠分爲三種:從集合中建立RDD;從外部存儲建立RDD;從其餘RDD建立。

2.2.1 從集合中建立

從集合中建立RDDSpark主要提供了兩種函數:parallelizemakeRDD

1)使用parallelize()從集合建立

scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2)使用makeRDD()從集合建立

scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

 

2.2.2 由外部存儲系統的數據集建立

包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFSCassandraHBase等,咱們會在第4章詳細介紹。

scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")

rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

2.2.3 從其餘RDD建立

詳見2.3節

2.3 RDD的轉換(面試開發重點)

RDD總體上分爲Value類型和Key-Value類型

2.3.1 Value類型

2.3.1.1 map(func)案例

1. 做用:返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成

2. 需求:建立一個1-10數組的RDD,將全部元素*2造成新的RDD

1)建立

scala> var source  = sc.parallelize(1 to 10)

source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

2)打印

scala> source.collect()

res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

(3)將全部元素*2

scala> val mapadd = source.map(_ * 2)

mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

4)打印最終結果

scala> mapadd.collect()

res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

2.3.1.2 mapPartitions(func) 案例

1. 做用:相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲TRDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]假設有N個元素M個分區,那麼map的函數的將被調用N,mapPartitions被調用M,一個函數一次處理全部分區。

2. 需求:建立一個RDD,使每一個元素*2組成新的RDD

1)建立一個RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

2)使每一個元素*2組成新的RDD

scala> rdd.mapPartitions(x=>x.map(_*2))

res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27

3)打印新的RDD

scala> res3.collect

res4: Array[Int] = Array(2, 4, 6, 8)

2.3.1.3 mapPartitionsWithIndex(func) 案例

1. 做用:相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲TRDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]

2. 需求:建立一個RDD,使每一個元素跟所在分區造成一個元組組成一個新的RDD

1)建立一個RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

2)使每一個元素跟所在分區造成一個元組組成一個新的RDD

scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))

indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26

3)打印新的RDD

scala> indexRdd.collect

res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

2.3.1.4 flatMap(func) 案例

1. 做用:相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)

2. 需求:建立一個元素爲1-5RDD,運用flatMap建立一個新的RDD,新的RDD爲原RDD的每一個元素的擴展(1->1,2->1,2……5->1,2,3,4,5)

1)建立

scala> val sourceFlat = sc.parallelize(1 to 5)

sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

2)打印

scala> sourceFlat.collect()

res11: Array[Int] = Array(1, 2, 3, 4, 5)

3)根據原RDD建立新RDD1->1,2->1,2……5->1,2,3,4,5)

scala> val flatMap = sourceFlat.flatMap(1 to _)

flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26

4)打印新RDD

scala> flatMap.collect()

res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

2.3.1.5 map()mapPartition()的區別

1. map():每次處理一條數據。

2. mapPartition():每次處理一個分區的數據,這個分區的數據處理完後,原RDD中分區的數據才能釋放,可能致使OOM

3. 開發指導:當內存空間較大的時候建議使用mapPartition(),以提升處理效率。

2.3.1.6 glom案例

1. 做用:將每個分區造成一個數組,造成新的RDD類型時RDD[Array[T]]

2. 需求:建立一個4個分區的RDD,並將每一個分區的數據放到一個數組

(1)建立

scala> val rdd = sc.parallelize(1 to 16,4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

2)將每一個分區的數據放到一個數組並收集到Driver端打印

scala> rdd.glom().collect()

res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

2.3.1.7 groupBy(func)案例

1. 做用:分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。

2. 需求:建立一個RDD,按照元素模以2的值進行分組。

1)建立

scala> val rdd = sc.parallelize(1 to 4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

2)按照元素模以2的值進行分組

scala> val group = rdd.groupBy(_%2)

group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

(3)打印結果

scala> group.collect

res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

2.3.1.8 filter(func) 案例

1. 做用:過濾。返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成。

2. 需求:建立一個RDD(由字符串組成),過濾出一個新RDD(包含」xiao」子串)

1)建立

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))

sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

2)打印

scala> sourceFilter.collect()

res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

3)過濾出含」 xiao」子串的造成一個新的RDD

scala> val filter = sourceFilter.filter(_.contains("xiao"))

filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

4)打印新RDD

scala> filter.collect()

res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

2.3.1.9 sample(withReplacement, fraction, seed) 案例

1. 做用:以指定的隨機種子隨機抽樣出數量爲fraction的數據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣seed用於指定隨機數生成器種子。例子RDD中隨機且有放回的抽出50%的數據,隨機種子值爲3(便可能以1 2 3的其中一個起始值)

2. 需求:建立一個RDD1-10),從中選擇放回和不放回抽樣

1)建立RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

2)打印

scala> rdd.collect()

res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3)放回抽樣

scala> var sample1 = rdd.sample(true,0.4,2)

sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26

4)打印放回抽樣結果

scala> sample1.collect()

res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)

5)不放回抽樣

scala> var sample2 = rdd.sample(false,0.2,3)

sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26

(6)打印不放回抽樣結果

scala> sample2.collect()

res17: Array[Int] = Array(1, 9)

2.3.1.10 distinct([numTasks])) 案例

1. 做用:對源RDD進行去重後返回一個新的RDD默認狀況下,只有8個並行任務來操做,可是能夠傳入一個可選的numTasks參數改變它。

2. 需求:建立一個RDD,使用distinct()對其去重。

1)建立一個RDD

scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))

distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24

2)對RDD進行去重(不指定並行度)

scala> val unionRDD = distinctRdd.distinct()

unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26

3)打印去重後生成的新RDD

scala> unionRDD.collect()

res20: Array[Int] = Array(1, 9, 5, 6, 2)

4)對RDD(指定並行度爲2

scala> val unionRDD = distinctRdd.distinct(2)

unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26

(5)打印去重後生成的新RDD

scala> unionRDD.collect()

res21: Array[Int] = Array(6, 2, 1, 9, 5)

2.3.1.11 coalesce(numPartitions) 案例

1. 做用:縮減分區數,用於大數據集過濾後,提升小數據集的執行效率。

2. 需求:建立一個4個分區的RDD,對其縮減分區

1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 16,4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24

(2)查看RDD的分區數

scala> rdd.partitions.size

res20: Int = 4

(3)對RDD從新分區

scala> val coalesceRDD = rdd.coalesce(3)

coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26

(4)查看新RDD的分區數

scala> coalesceRDD.partitions.size

res21: Int = 3

2.3.1.12 repartition(numPartitions) 案例

1. 做用:根據分區數,從新經過網絡隨機洗牌全部數據。

2. 需求:建立一個4個分區的RDD,對其從新分區

1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 16,4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24

2)查看RDD的分區數

scala> rdd.partitions.size

res22: Int = 4

3)對RDD從新分區

scala> val rerdd = rdd.repartition(2)

rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26

4)查看新RDD的分區數

scala> rerdd.partitions.size

res23: Int = 2

2.3.1.13 coalescerepartition的區別

1. coalesce從新分區,能夠選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。

2. repartition其實是調用的coalesce,默認是不進行shuffle的。源碼以下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

2.3.1.14 sortBy(func,[ascending], [numTasks]) 案例

1. 做用;使用func先對數據進行處理,按照處理後的數據比較結果排序,默認爲正序。

2. 需求:建立一個RDD,按照不一樣的規則進行排序

1)建立一個RDD

scala> val rdd = sc.parallelize(List(2,1,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

2)按照自身大小排序

scala> rdd.sortBy(x => x).collect()

res11: Array[Int] = Array(1, 2, 3, 4)

3)按照與3餘數的大小排序

scala> rdd.sortBy(x => x%3).collect()

res12: Array[Int] = Array(3, 4, 1, 2)

2.3.1.15 pipe(command, [envVars]) 案例

1. 做用:管道,針對每一個分區,都執行一個shell腳本,返回輸出的RDD

注意:腳本須要放在Worker節點能夠訪問到的位置

2. 需求:編寫一個腳本,使用管道將腳本做用於RDD上。

1)編寫一個腳本

Shell腳本

#!/bin/sh

echo "AA"

while read LINE; do

   echo ">>>"${LINE}

done

2)建立一個只有一個分區的RDD

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24

3)將腳本做用該RDD並打印

scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()

res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

(4)建立一個有兩個分區的RDD

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24

(5)將腳本做用該RDD並打印

scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()

res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

2.3.2 Value類型交互

2.3.2.1 union(otherDataset) 案例

1. 做用:對源RDD和參數RDD求並集後返回一個新的RDD

2. 需求:建立兩個RDD,求並集

1)建立第一個RDD

scala> val rdd1 = sc.parallelize(1 to 5)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

2)建立第二個RDD

scala> val rdd2 = sc.parallelize(5 to 10)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

(3)計算兩個RDD的並集

scala> val rdd3 = rdd1.union(rdd2)

rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28

4)打印並集結果

scala> rdd3.collect()

res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

2.3.2.2 subtract (otherDataset) 案例

1. 做用:計算差的一種函數,去除兩個RDD中相同的元素,不一樣的RDD將保留下來

2. 需求:建立兩個RDD,求第一個RDD與第二個RDD的差集

1)建立第一個RDD

scala> val rdd = sc.parallelize(3 to 8)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24

2)建立第二個RDD

scala> val rdd1 = sc.parallelize(1 to 5)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24

3)計算第一個RDD與第二個RDD的差集並打印

scala> rdd.subtract(rdd1).collect()

res27: Array[Int] = Array(8, 6, 7)

2.3.2.3 intersection(otherDataset) 案例

1. 做用:對源RDD和參數RDD求交集後返回一個新的RDD

2. 需求:建立兩個RDD,求兩個RDD的交集

1)建立第一個RDD

scala> val rdd1 = sc.parallelize(1 to 7)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24

(2)建立第二個RDD

scala> val rdd2 = sc.parallelize(5 to 10)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24

(3)計算兩個RDD的交集

scala> val rdd3 = rdd1.intersection(rdd2)

rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28

(4)打印計算結果

scala> rdd3.collect()

res19: Array[Int] = Array(5, 6, 7)

2.3.2.4 cartesian(otherDataset) 案例

1. 做用:笛卡爾積(儘可能避免使用)

2. 需求:建立兩個RDD,計算兩個RDD的笛卡爾積

1)建立第一個RDD

scala> val rdd1 = sc.parallelize(1 to 3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24

2)建立第二個RDD

scala> val rdd2 = sc.parallelize(2 to 5)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24

3)計算兩個RDD的笛卡爾積並打印

scala> rdd1.cartesian(rdd2).collect()

res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))

2.3.2.5 zip(otherDataset)案例

1. 做用:將兩個RDD組合成Key/Value形式的RDD,這裏默認兩個RDDpartition數量以及元素數量都相同,不然會拋出異常。

2. 需求:建立兩個RDD,並將兩個RDD組合到一塊兒造成一個(k,v)RDD

1)建立第一個RDD

scala> val rdd1 = sc.parallelize(Array(1,2,3),3)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

2)建立第二個RDD(與1分區數相同)

scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

3)第一個RDD組合第二個RDD並打印

scala> rdd1.zip(rdd2).collect

res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c))

4)第二個RDD組合第一個RDD並打印

scala> rdd2.zip(rdd1).collect

res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3))

5)建立第三個RDD(與1,2分區數不一樣)

scala> val rdd3 = sc.parallelize(Array("a","b","c"),2)

rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24

6)第一個RDD組合第三個RDD並打印

scala> rdd1.zip(rdd3).collect

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2)

  at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)

  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)

  ... 48 elided

2.3.3 Key-Value類型

2.3.3.1 partitionBy案例

1. 做用:對pairRDD進行分區操做,若是原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 不然會生成ShuffleRDD,即會產生shuffle過程。

2. 需求:建立一個4個分區的RDD,對其從新分區

1)建立一個RDD

scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

2)查看RDD的分區數

scala> rdd.partitions.size

res24: Int = 4

(3)對RDD從新分區

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))

rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

4查看新RDD的分區數

scala> rdd2.partitions.size

res25: Int = 2

2.3.3.2 groupByKey案例

1. 做用:groupByKey也是對每一個key進行操做,但只生成一個sequence

2. 需求:建立一個pairRDD,將相同key對應值聚合到一個sequence中,並計算相同key對應值的相加結果。

1)建立一個pairRDD

scala> val words = Array("one", "two", "two", "three", "three", "three")

words: Array[String] = Array(one, two, two, three, three, three)

 

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

2)將相同key對應值聚合到一個sequence中

scala> val group = wordPairsRDD.groupByKey()

group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

3)打印結果

scala> group.collect()

res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

4)計算相同key對應值的相加結果

scala> group.map(t => (t._1, t._2.sum))

res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

5)打印結果

scala> res2.collect()

res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

2.3.3.3 reduceByKeygroupByKey的區別

1. reduceByKey:按照key進行聚合,在shuffle以前有combine(預聚合)操做,返回結果是RDD[k,v].

2. groupByKey:按照key進行分組,直接進行shuffle

3. 開發指導:reduceByKey比groupByKey,建議使用。可是須要注意是否會影響業務邏輯。

2.3.3.4 reduceByKey(func, [numTasks]) 案例

1. 在一個(K,V)RDD上調用,返回一個(K,V)RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,reduce任務的個數能夠經過第二個可選的參數來設置。

2. 需求:建立一個pairRDD,計算相同key對應值的相加結果

1)建立一個pairRDD

scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

2)計算相同key對應值的相加結果

scala> val reduce = rdd.reduceByKey((x,y) => x+y)

reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

3)打印結果

scala> reduce.collect()

res29: Array[(String, Int)] = Array((female,6), (male,7))

2.3.3.5 aggregateByKey案例

參數:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

1. 做用:在kv對的RDD中,,按keyvalue進行分組合並,合併時,將每一個value和初始值做爲seq函數的參數,進行計算,返回的結果做爲一個新的kv對,而後再將結果按照key進行合併,最後將每一個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果做爲一個新的kv對輸出。

2. 參數描述:

1zeroValue給每個分區中的每個key一個初始值;

2seqOp函數用於在每個分區中用初始值逐步迭代value

3combOp函數用於合併每一個分區中的結果。

3. 需求:建立一個pairRDD,取出每一個分區相同key對應值的最大值,而後相加

4. 需求分析

 

1-aggregate案例分析

1)建立一個pairRDD

scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

2)取出每一個分區相同key對應值的最大值,而後相加

scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)

agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

3)打印結果

scala> agg.collect()

res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))

2.3.3.6 foldByKey案例

參數:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

  1. 做用:aggregateByKey的簡化操做,seqopcombop相同
  2. 需求:建立一個pairRDD,計算相同key對應值的相加結果

1)建立一個pairRDD

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24

2)計算相同key對應值的相加結果

scala> val agg = rdd.foldByKey(0)(_+_)

agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26

3)打印結果

scala> agg.collect()

res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))

2.3.3.7 combineByKey[C] 案例

參數:(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C)

  1. 做用:對相同K,把V合併成一個集合。
  2. 參數描述:

1createCombiner: combineByKey() 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。若是這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函數來建立那個鍵對應的累加器的初始值

2mergeValue: 若是這是一個在處理當前分區以前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併

3mergeCombiners: 因爲每一個分區都是獨立處理的, 所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應同一個鍵的累加器, 就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。

  1. 需求:建立一個pairRDD,根據key計算每種key的均值。(先計算每一個key出現的次數以及能夠對應值的總和,再相除獲得結果)
  2. 需求分析:

 

2- combineByKey案例分析

1)建立一個pairRDD

scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)

input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26

(2)將相同key對應的值相加,同時記錄該key出現的次數,放入一個二元組

scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))

combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[5] at combineByKey at <console>:28

(3)打印合並後的結果

scala> combine.collect

res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))

(4)計算平均值

scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}

result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30

(5)打印結果

scala> result.collect()

res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))

2.3.3.8 sortByKey([ascending], [numTasks]) 案例

1. 做用:在一個(K,V)RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)RDD

2. 需求:建立一個pairRDD,按照key的正序和倒序進行排序

1)建立一個pairRDD

scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

2)按照key的正序

scala> rdd.sortByKey(true).collect()

res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

(3)按照key的倒序

scala> rdd.sortByKey(false).collect()

res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

2.3.3.9 mapValues案例

1. 針對於(K,V)形式的類型只對V進行操做

2. 需求:建立一個pairRDD,並將value添加字符串"|||"

1)建立一個pairRDD

scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))

rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24

2)對value添加字符串"|||"

scala> rdd3.mapValues(_+"|||").collect()

res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

2.3.3.10 join(otherDataset, [numTasks]) 案例

1. 做用:在類型爲(K,V)(K,W)RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))RDD

2. 需求:建立兩個pairRDD,並將key相同的數據聚合到一個元組。

1)建立第一個pairRDD

scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24

(2)建立第二個pairRDD

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))

rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

(3join操做並打印結果

scala> rdd.join(rdd1).collect()

res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

2.3.3.11 cogroup(otherDataset, [numTasks]) 案例

1. 做用:在類型爲(K,V)(K,W)RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

2. 需求:建立兩個pairRDD,並將key相同的數據聚合到一個迭代器。

1)建立第一個pairRDD

scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))

rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24

(2)建立第二個pairRDD

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))

rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24

(3cogroup兩個RDD並打印結果

scala> rdd.cogroup(rdd1).collect()

res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))

2.3.4 案例實操

1. 數據結構:時間戳,省份,城市,用戶,廣告,中間字段使用空格分割。

 

樣本以下: 

1516609143867 6 7 64 16

1516609143869 9 4 75 18

1516609143869 1 7 87 12

2. 需求:統計出每個省份廣告被點擊次數的TOP3

3. 實現過程:

package com.atguigu.practice

 

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

 

//需求:統計出每個省份廣告被點擊次數的TOP3

object Practice {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化spark配置信息並創建與spark的鏈接

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")

    val sc = new SparkContext(sparkConf)

 

    //2.讀取數據生成RDDTSProvinceCityUserAD

    val line = sc.textFile("E:\\IDEAWorkSpace\\SparkTest\\src\\main\\resources\\agent.log")

 

    //3.按照最小粒度聚合:((Province,AD),1)

    val provinceAdAndOne = line.map { x =>

      val fields: Array[String] = x.split(" ")

      ((fields(1), fields(3)), 1)

    }

 

    //4.計算每一個省中每一個廣告被點擊的總數:((Province,AD),sum)

    val provinceAdToSum = provinceAdAndOne.reduceByKey(_ + _)

 

    //5.將省份做爲key,廣告加點擊數爲value(Province,(AD,sum))

    val provinceToAdSum = provinceAdToSum.map(x => (x._1._1, (x._1._2, x._2)))

 

    //6.將同一個省份的全部廣告進行聚合(Province,List((AD1,sum1),(AD2,sum2)...))

    val provinceGroup = provinceToAdSum.groupByKey()

 

    //7.對同一個省份全部廣告的集合進行排序並取前3條,排序規則爲廣告點擊總數

    val provinceAdTop3 = provinceGroup.mapValues { x =>

      x.toList.sortWith((x, y) => x._2 > y._2).take(3)

    }

 

    //8.將數據拉取到Driver端並打印

    provinceAdTop3.collect().foreach(println)

 

    //9.關閉與spark的鏈接

    sc.stop()

 

  }

  

}

2.4 Action

2.4.1 reduce(func)案例

1. 做用:經過func函數彙集RDD中的全部元素,先聚合分區內數據,再聚合分區間數據。

2. 需求:建立一個RDD,將全部元素聚合獲得結果

1)建立一個RDD[Int]

scala> val rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

2)聚合RDD[Int]全部元素

scala> rdd1.reduce(_+_)

res50: Int = 55

(3)建立一個RDD[String]

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))

rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

4)聚合RDD[String]全部數據

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))

res51: (String, Int) = (adca,12)

2.4.2 collect()案例

1. 做用:在驅動程序中,以數組的形式返回數據集的全部元素。

2. 需求:建立一個RDD,並將RDD內容收集到Driver端打印

1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2)將結果收集到Driver

scala> rdd.collect

res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)   

2.4.3 count()案例

1. 做用:返回RDD中元素的個數

2. 需求:建立一個RDD,統計該RDD的條數

1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2)統計該RDD的條數

scala> rdd.count

res1: Long = 10

2.4.4 first()案例

1. 做用:返回RDD中的第一個元素

2. 需求:建立一個RDD,返回該RDD中的第一個元素

1)建立一個RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2)統計該RDD的條數

scala> rdd.first

res2: Int = 1

2.4.5 take(n)案例

1. 做用:返回一個由RDD的前n個元素組成的數組

2. 需求:建立一個RDD,統計該RDD的條數

1)建立一個RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

2)統計該RDD的條數

scala> rdd.take(3)

res10: Array[Int] = Array(2, 5, 4)

2.4.6 takeOrdered(n)案例

1. 做用:返回該RDD排序後的前n個元素組成的數組

2. 需求:建立一個RDD,統計該RDD的條數

1)建立一個RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

2)統計該RDD的條數

scala> rdd.takeOrdered(3)

res18: Array[Int] = Array(2, 3, 4)

2.4.7 aggregate案例

1. 參數:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

2. 做用:aggregate函數將每一個分區裏面的元素經過seqOp和初始值進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個函數最終返回的類型不須要和RDD中元素類型一致。

3. 需求:建立一個RDD,將全部元素相加獲得結果

1)建立一個RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

2)將該RDD全部元素相加獲得結果

scala> rdd.aggregate(0)(_+_,_+_)

res22: Int = 55

2.4.8 fold(num)(func)案例

1. 做用:摺疊操做,aggregate的簡化操做,seqopcombop同樣。

2. 需求:建立一個RDD,將全部元素相加獲得結果

1)建立一個RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

2)將該RDD全部元素相加獲得結果

scala> rdd.fold(0)(_+_)

res24: Int = 55

2.4.9 saveAsTextFile(path)

做用:將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本

2.4.10 saveAsSequenceFile(path) 

做用:將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。

2.4.11 saveAsObjectFile(path) 

做用:用於將RDD中的元素序列化成對象,存儲到文件中。

2.4.12 countByKey()案例

1. 做用:針對(K,V)類型的RDD,返回一個(K,Int)map,表示每個key對應的元素個數。

2. 需求:建立一個PairRDD,統計每種key的個數

1)建立一個PairRDD

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

2)統計每種key的個數

scala> rdd.countByKey

res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

2.4.13 foreach(func)案例

1. 做用:在數據集的每個元素上,運行函數func進行更新。

2. 需求:建立一個RDD,對每一個元素進行打印

1)建立一個RDD

scala> var rdd = sc.makeRDD(1 to 5,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

2)對該RDD每一個元素進行打印

scala> rdd.foreach(println(_))

3

4

5

1

2

2.5 RDD中的函數傳遞

在實際開發中咱們每每須要本身定義一些對於RDD的操做,那麼此時須要主要的是,初始化工做是在Driver端進行的,而實際運行程序是在Executor端進行的,這就涉及到了跨進程通訊,是須要序列化的。下面咱們看幾個例子:

2.5.1 傳遞一個方法

1.建立一個類

class Search(s:String){

 

//過濾出包含字符串的數據

  def isMatch(s: String): Boolean = {

    s.contains(query)

  }

 

//過濾出包含字符串的RDD

  def getMatch1 (rdd: RDD[String]): RDD[String] = {

    rdd.filter(isMatch)

  }

 

  //過濾出包含字符串的RDD

  def getMatche2(rdd: RDD[String]): RDD[String] = {

    rdd.filter(x => x.contains(query))

  }

 

}

2.建立Spark主程序

object SeriTest {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化配置信息及SparkContext

    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

 

//2.建立一個RDD

    val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))

 

//3.建立一個Search對象

    val search = new Search()

 

//4.運用第一個過濾函數並打印結果

    val match1: RDD[String] = search.getMatche1(rdd)

    match1.collect().foreach(println)

    }

}

3.運行程序

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)

    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

    at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

    at org.apache.spark.rdd.RDD.filter(RDD.scala:386)

    at com.atguigu.Search.getMatche1(SeriTest.scala:39)

    at com.atguigu.SeriTest$.main(SeriTest.scala:18)

    at com.atguigu.SeriTest.main(SeriTest.scala)

Caused by: java.io.NotSerializableException: com.atguigu.Search

4.問題說明

//過濾出包含字符串的RDD

  def getMatch1 (rdd: RDD[String]): RDD[String] = {

    rdd.filter(isMatch)

  }

在這個方法中所調用的方法isMatch()是定義在Search這個類中的,實際上調用的是this. isMatch()this表示Search這個類的對象,程序在運行過程當中須要將Search對象序列化之後傳遞到Executor端。

5.解決方案

使類繼承scala.Serializable便可。

class Search() extends Serializable{...}

2.5.2 傳遞一個屬性

1.建立Spark主程序

object TransmitTest {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化配置信息及SparkContext

    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

 

//2.建立一個RDD

    val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))

 

//3.建立一個Search對象

    val search = new Search()

 

//4.運用第一個過濾函數並打印結果

    val match1: RDD[String] = search.getMatche2(rdd)

    match1.collect().foreach(println)

    }

}

2.運行程序

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)

    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

    at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

    at org.apache.spark.rdd.RDD.filter(RDD.scala:386)

    at com.atguigu.Search.getMatche1(SeriTest.scala:39)

    at com.atguigu.SeriTest$.main(SeriTest.scala:18)

    at com.atguigu.SeriTest.main(SeriTest.scala)

Caused by: java.io.NotSerializableException: com.atguigu.Search

3.問題說明

  //過濾出包含字符串的RDD

  def getMatche2(rdd: RDD[String]): RDD[String] = {

    rdd.filter(x => x.contains(query))

  }

在這個方法中所調用的方法query是定義在Search這個類中的字段,實際上調用的是this. querythis表示Search這個類的對象,程序在運行過程當中須要將Search對象序列化之後傳遞到Executor端。

4.解決方案

1)使類繼承scala.Serializable便可。

class Search() extends Serializable{...}

2)將類變量query賦值給局部變量

修改getMatche2爲

  //過濾出包含字符串的RDD

  def getMatche2(rdd: RDD[String]): RDD[String] = {

    val query_ : String = this.query//將類變量賦值給局部變量

    rdd.filter(x => x.contains(query_))

  }

2.6 RDD依賴關係

2.6.1 Lineage

RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。RDDLineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。

 

1)讀取一個HDFS文件並將其中內容映射成一個個元組

scala> val wordAndOne = sc.textFile("/fruit.tsv").flatMap(_.split("\t")).map((_,1))

wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:24

2)統計每一種key對應的個數

scala> val wordAndCount = wordAndOne.reduceByKey(_+_)

wordAndCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26

(3)查看「wordAndOne」的Lineage

scala> wordAndOne.toDebugString

res5: String =

(2) MapPartitionsRDD[22] at map at <console>:24 []

 |  MapPartitionsRDD[21] at flatMap at <console>:24 []

 |  /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []

 |  /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []

4)查看「wordAndCount」的Lineage

scala> wordAndCount.toDebugString

res6: String =

(2) ShuffledRDD[23] at reduceByKey at <console>:26 []

 +-(2) MapPartitionsRDD[22] at map at <console>:24 []

    |  MapPartitionsRDD[21] at flatMap at <console>:24 []

    |  /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []

    |  /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []

5)查看「wordAndOne」的依賴類型

scala> wordAndOne.dependencies

res7: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@5d5db92b)

(6)查看「wordAndCount」的依賴類型

scala> wordAndCount.dependencies

res8: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@63f3e6a8)

注意:RDD和它依賴的父RDDs)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

2.6.2 窄依賴

窄依賴指的是每個父RDDPartition最多被子RDD的一個Partition使用,窄依賴咱們形象的比喻爲獨生子女

 

2.6.3 寬依賴

寬依賴指的是多個子RDDPartition會依賴同一個父RDDPartition,會引發shuffle,總結:寬依賴咱們形象的比喻爲超生

 

2.6.4 DAG

DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。

 

2.6.5 任務劃分(面試重點)

RDD任務切分中間分爲:ApplicationJobStageTask

1Application:初始化一個SparkContext即生成一個Application

2Job:一個Action算子就會生成一個Job

3Stage:根據RDD之間的依賴關係的不一樣將Job劃分紅不一樣的Stage,遇到一個寬依賴則劃分一個Stage

 

4TaskStage是一個TaskSet,將Stage劃分的結果發送到不一樣的Executor執行即爲一個Task

注意:Application->Job->Stage->Task每一層都是1n的關係。

2.7 RDD緩存

RDD經過persist方法或cache方法能夠將前面的計算結果緩存,默認狀況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中。

可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。

 

經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

 

在存儲級別的末尾加上「_2」來把持久化數據存爲兩份 

緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition

1)建立一個RDD

scala> val rdd = sc.makeRDD(Array("atguigu"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at <console>:25

2)將RDD轉換爲攜帶當前時間戳不作緩存

scala> val nocache = rdd.map(_.toString+System.currentTimeMillis)

nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27

(3)屢次打印結果

scala> nocache.collect

res0: Array[String] = Array(atguigu1538978275359)

 

scala> nocache.collect

res1: Array[String] = Array(atguigu1538978282416)

 

scala> nocache.collect

res2: Array[String] = Array(atguigu1538978283199)

(4)將RDD轉換爲攜帶當前時間戳並作緩存

scala> val cache =  rdd.map(_.toString+System.currentTimeMillis).cache

cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27

5)屢次打印作了緩存的結果

scala> cache.collect

res3: Array[String] = Array(atguigu1538978435705)                                   

 

scala> cache.collect

res4: Array[String] = Array(atguigu1538978435705)

 

scala> cache.collect

res5: Array[String] = Array(atguigu1538978435705)

2.8 RDD CheckPoint

Spark中對於數據的保存除了持久化操做以外,還提供了一種檢查點的機制,檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。檢查點經過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。

爲當前RDD設置檢查點。該函數將會建立一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程當中,該RDD的全部依賴於父RDD中的信息將所有被移除。對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。

案例實操:

1)設置檢查點

scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")

2)建立一個RDD

scala> val rdd = sc.parallelize(Array("atguigu"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24

3)將RDD轉換爲攜帶當前時間戳並作checkpoint

scala> val ch = rdd.map(_+System.currentTimeMillis)

ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26

 

scala> ch.checkpoint

4)屢次打印結果

scala> ch.collect

res55: Array[String] = Array(atguigu1538981860336)

 

scala> ch.collect

res56: Array[String] = Array(atguigu1538981860504)

 

scala> ch.collect

res57: Array[String] = Array(atguigu1538981860504)

 

scala> ch.collect

res58: Array[String] = Array(atguigu1538981860504)

3章 鍵值對RDD數據分區

Spark目前支持Hash分區和Range分區,用戶也能夠自定義分區,Hash分區爲當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據通過Shuffle過程屬於哪一個分區和Reduce的個數

注意:

(1)只有Key-Value類型的RDD纔有分區的,非Key-Value類型的RDD分區的值是None
(2)每一個RDD的分區ID範圍:0~numPartitions-1,決定這個值是屬於那個分區的。

3.1 獲取RDD分區

能夠經過使用RDDpartitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象, 經過get方法獲取其中的值。相關源碼以下:

def getPartition(key: Any): Int = key match {
  case null => 0
  case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

1)建立一個pairRDD

scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))

pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24

2)查看RDD的分區器

scala> pairs.partitioner

res1: Option[org.apache.spark.Partitioner] = None

3)導入HashPartitioner

scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner

(4)使用HashPartitionerRDD進行從新分區

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))

partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27

5)查看從新分區後RDD的分區器

scala> partitioned.partitioner

res2: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

3.2 Hash分區

HashPartitioner分區的原理:對於給定的key,計算其hashCode,併除以分區的個數取餘,若是餘數小於0,則用餘數+分區的個數(不然加0),最後返回的值就是這個key所屬的分區ID

使用Hash分區的實操

scala> nopar.partitioner

res20: Option[org.apache.spark.Partitioner] = None

 

scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)

nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24

 

scala>nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect

res0: Array[String] = Array("0 : ", 1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : ", 5 : (2,3), 6 : (3,6), 7 : (3,8))

scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))

hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[12] at partitionBy at <console>:26

 

scala> hashpar.count

res18: Long = 6

 

scala> hashpar.partitioner

res21: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)

 

scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()

res19: Array[Int] = Array(0, 3, 1, 2, 0, 0, 0)

3.3 Ranger分區

HashPartitioner分區弊端:可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有RDD的所有數據。

RangePartitioner做用:將必定範圍內的數映射到某一個分區內,儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,一個分區中的元素確定都是比另外一個分區內的元素小或者大,可是分區內的元素是不能保證順序的。簡單的說就是將必定範圍內的數映射到某一個分區內。實現過程爲:

第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每一個分區的最大key值,造成一個Array[KEY]類型的數組變量rangeBounds

第二步:判斷keyrangeBounds中所處的範圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是能夠排序的

3.4 自定義分區

要實現自定義的分區器,你須要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。

1numPartitions: Int:返回建立出來的分區數。

2getPartition(key: Any): Int:返回給定鍵的分區編號(0numPartitions-1)

3equals():Java 判斷相等性的標準方法。這個方法的實現很是重要,Spark 須要用這個方法來檢查你的分區器對象是否和其餘分區器實例相同,這樣 Spark 才能夠判斷兩個 RDD 的分區方式是否相同。

需求:將相同後綴的數據寫入相同的文件,經過將相同後綴的數據分區到相同的分區並保存輸出來實現。

1)建立一個pairRDD

scala> val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))

data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24

2)定義一個自定義分區類

scala> :paste

// Entering paste mode (ctrl-D to finish)

class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{

 

  //覆蓋分區數

  override def numPartitions: Int = numParts

 

  //覆蓋分區號獲取函數

  override def getPartition(key: Any): Int = {

    val ckey: String = key.toString

    ckey.substring(ckey.length-1).toInt%numParts

  }

}

 

// Exiting paste mode, now interpreting.

 

defined class CustomerPartitioner

3)將RDD使用自定義的分區類進行從新分區

scala> val par = data.partitionBy(new CustomerPartitioner(2))

par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27

4)查看從新分區後的數據分佈

scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect

res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))

使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法便可。Spark 中有許多依賴於數據混洗的方法,好比 join() groupByKey(),它們也能夠接收一個可選的 Partitioner 對象來控制輸出數據的分區方式。

4章 數據讀取與保存

Spark的數據讀取及數據保存能夠從兩個維度來做區分:文件格式以及文件系統。

文件格式分爲:Text文件、Json文件Csv文件、Sequence文件以及Object文件;

文件系統分爲:本地文件系統、HDFS、HBASE以及數據庫。

4.1 文件類數據讀取與保存

4.1.1 Text文件

1)數據讀取:textFile(String)

scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")

hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24

 

2)數據保存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut")

 

4.1.2 Json文件

若是JSON文件中每一行就是一個JSON記錄,那麼能夠經過將JSON文件當作文本文件來讀取,而後利用相關的JSON庫對每一條數據進行JSON解析。

注意:使用RDD讀取JSON文件處理很複雜,同時SparkSQL集成了很好的處理JSON文件的方式,因此應用中可能是採用SparkSQL處理JSON文件。

1)導入解析json所需的包

scala> import scala.util.parsing.json.JSON

2)上傳json文件到HDFS

[atguigu@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /

3)讀取文件

scala> val json = sc.textFile("/people.json")

json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24

4)解析json數據

scala> val result  = json.map(JSON.parseFull)

result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27

5)打印

scala> result.collect

res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

4.1.3 Sequence文件

 SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中,能夠調用 sequenceFile[ keyClass, valueClass](path)

注意:SequenceFile文件只針對PairRDD

1)建立一個RDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

2)將RDD保存爲Sequence文件

scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")

3)查看該文件

[atguigu@hadoop102 seqFile]$ pwd

/opt/module/spark/seqFile

 

[atguigu@hadoop102 seqFile]$ ll

總用量 8

-rw-r--r-- 1 atguigu atguigu 108 10月  9 10:29 part-00000

-rw-r--r-- 1 atguigu atguigu 124 10月  9 10:29 part-00001

-rw-r--r-- 1 atguigu atguigu   0 10月  9 10:29 _SUCCESS

 

[atguigu@hadoop102 seqFile]$ cat part-00000

SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط

4)讀取Sequence文件

scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")

seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24

5)打印讀取後的Sequence文件

scala> seq.collect

res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

4.1.4 對象文件

對象文件是將對象序列化後保存的文件,採用Java的序列化機制。能夠經過objectFile[k,v](path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD也能夠經過調用saveAsObjectFile() 實現對對象文件的輸出。由於是序列化因此要指定類型。

1)建立一個RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24

2)將RDD保存爲Object文件

scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")

3)查看該文件

[atguigu@hadoop102 objectFile]$ pwd

/opt/module/spark/objectFile

 

[atguigu@hadoop102 objectFile]$ ll

總用量 8

-rw-r--r-- 1 atguigu atguigu 142 10月  9 10:37 part-00000

-rw-r--r-- 1 atguigu atguigu 142 10月  9 10:37 part-00001

-rw-r--r-- 1 atguigu atguigu   0 10月  9 10:37 _SUCCESS

 

[atguigu@hadoop102 objectFile]$ cat part-00000

SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l

4)讀取Object文件

scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")

objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24

5)打印讀取後的Sequence文件

scala> objFile.collect

res19: Array[Int] = Array(1, 2, 3, 4)

4.2 文件系統類數據讀取與保存

4.2.1 HDFS

Spark的整個生態系統與Hadoop是徹底兼容的,因此對於Hadoop所支持的文件類型或者數據庫類型,Spark也一樣支持.另外,因爲HadoopAPI有新舊兩個版本,因此Spark爲了可以兼容Hadoop全部的版本,也提供了兩套建立操做接口.對於外部存儲建立操做而言,hadoopRDDnewHadoopRDD是最爲抽象的兩個函數接口,主要包含如下四個參數.

1輸入格式(InputFormat): 制定數據輸入的類型,TextInputFormat,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormatorg.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

2)鍵類型: 指定[K,V]鍵值對中K的類型

3)值類型: 指定[K,V]鍵值對中V的類型

4)分區值: 指定由外部存儲生成的RDDpartition數量的最小值,若是沒有指定,系統會使用默認值defaultMinSplits

注意:其餘建立操做的API接口都是爲了方便最終的Spark程序開發者而設置的,是這兩個接口的高效實現版本.例如,對於textFile而言,只有path這個指定文件路徑的參數,其餘參數在系統內部指定了默認值

1.Hadoop中以壓縮形式存儲的數據,不須要指定解壓方式就可以進行讀取,由於Hadoop自己有一個解壓器會根據壓縮文件的後綴推斷解壓算法進行解壓.

2.若是用SparkHadoop中讀取某種類型的數據不知道怎麼讀取的時候,上網查找一個使用map-reduce的時候是怎麼讀取這種這種數據的,而後再將對應的讀取方式改寫成上面的hadoopRDDnewAPIHadoopRDD兩個類就好了

4.2.2 MySQL數據庫鏈接

支持經過Java JDBC訪問關係型數據庫。須要經過JdbcRDD進行,示例以下:

1)添加依賴

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2Mysql讀取:

package com.atguigu

 

import java.sql.DriverManager

 

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{SparkConf, SparkContext}

 

object MysqlRDD {

 

 def main(args: Array[String]): Unit = {

 

   //1.建立spark配置信息

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

 

   //2.建立SparkContext

   val sc = new SparkContext(sparkConf)

 

   //3.定義鏈接mysql的參數

   val driver = "com.mysql.jdbc.Driver"

   val url = "jdbc:mysql://hadoop102:3306/rdd"

   val userName = "root"

   val passWd = "000000"

 

   //建立JdbcRDD

   val rdd = new JdbcRDD(sc, () => {

     Class.forName(driver)

     DriverManager.getConnection(url, userName, passWd)

   },

     "select * from `rddtable` where `id`>=?;",

     1,

     10,

     1,

     r => (r.getInt(1), r.getString(2))

   )

 

   //打印最後結果

   println(rdd.count())

   rdd.foreach(println)

 

   sc.stop()

 }

}

Mysql寫入:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)
  val data = sc.parallelize(List("Female", "Male","Female"))

  data.foreachPartition(insertData)
}

def insertData(iterator: Iterator[String]): Unit = {

Class.forName ("com.mysql.jdbc.Driver").newInstance()
  val conn = java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root", "hive")
  iterator.foreach(data => {
    val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
    ps.setString(1, data)
    ps.executeUpdate()
  })
}

4.2.3 HBase數據庫

因爲 org.apache.hadoop.hbase.mapreduce.TableInputFormat 類的實現,Spark 能夠經過Hadoop輸入格式訪問HBase。這個輸入格式會返回鍵值對數據,其中鍵的類型爲org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型爲org.apache.hadoop.hbase.client.

Result

1)添加依賴

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-server</artifactId>

<version>1.3.1</version>

</dependency>

 

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-client</artifactId>

<version>1.3.1</version>

</dependency>

2)從HBase讀取數據

package com.atguigu

 

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.hadoop.hbase.util.Bytes

 

object HBaseSpark {

 

  def main(args: Array[String]): Unit = {

 

    //建立spark配置信息

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

 

    //建立SparkContext

    val sc = new SparkContext(sparkConf)

 

    //構建HBase配置信息

    val conf: Configuration = HBaseConfiguration.create()

    conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")

    conf.set(TableInputFormat.INPUT_TABLE, "rddtable")

 

    //HBase讀取數據造成RDD

    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(

      conf,

      classOf[TableInputFormat],

      classOf[ImmutableBytesWritable],

      classOf[Result])

 

    val count: Long = hbaseRDD.count()

    println(count)

 

    //hbaseRDD進行處理

    hbaseRDD.foreach {

      case (_, result) =>

        val key: String = Bytes.toString(result.getRow)

        val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))

        val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))

        println("RowKey:" + key + ",Name:" + name + ",Color:" + color)

    }

 

    //關閉鏈接

    sc.stop()

  }

 

}

3)往HBase寫入

def main(args: Array[String]) {

//獲取Spark配置信息並建立與spark的鏈接
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)

//建立HBaseConf
  val conf = HBaseConfiguration.create()
  val jobConf = new JobConf(conf)
  jobConf.setOutputFormat(classOf[TableOutputFormat])
  jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")

//構建Hbase表描述器

  val fruitTable = TableName.valueOf("fruit_spark")
  val tableDescr = new HTableDescriptor(fruitTable)
  tableDescr.addFamily(new HColumnDescriptor("info".getBytes))

//建立Hbase
  val admin = new HBaseAdmin(conf)
  if (admin.tableExists(fruitTable)) {
    admin.disableTable(fruitTable)
    admin.deleteTable(fruitTable)
  }
  admin.createTable(tableDescr)

//定義往Hbase插入數據的方法
  def convert(triple: (Int, String, Int)) = {
    val put = new Put(Bytes.toBytes(triple._1))
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
    (new ImmutableBytesWritable, put)
  }

 

//建立一個RDD
  val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))

 

//RDD內容寫到HBase
  val localData = initialRDD.map(convert)

  localData.saveAsHadoopDataset(jobConf)
}

5RDD編程進階

5.1 累加器

累加器用來對信息進行聚合,一般在向 Spark傳遞函數時,好比使用 map() 函數或者用 filter() 傳條件時,可使用驅動器程序中定義的變量,可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。若是咱們想實現全部分片處理時更新共享變量的功能,那麼累加器能夠實現咱們想要的效果。

5.1.1 系統累加器

針對一個輸入的日誌文件,若是咱們想計算文件中全部空行的數量,咱們能夠編寫如下程序:

scala> val notice = sc.textFile("./NOTICE")

notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32

 

scala> val blanklines = sc.accumulator(0)

warning: there were two deprecation warnings; re-run with -deprecation for details

blanklines: org.apache.spark.Accumulator[Int] = 0

 

scala> val tmp = notice.flatMap(line => {

     |    if (line == "") {

     |       blanklines += 1

     |    }

     |    line.split(" ")

     | })

tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36

 

scala> tmp.count()

res31: Long = 3213

 

scala> blanklines.value

res32: Int = 171

累加器的用法以下所示。

經過在驅動器中調用SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器。返回值爲 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型。Spark閉包裏的執行器代碼可使用累加器的 += 方法(在Java中是 add)增長累加器的值。 驅動器程序能夠調用累加器的value屬性(Java中使用value()setValue())來訪問累加器的值。

注意:工做節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。

對於要在行動操做中使用的累加器,Spark只會把每一個任務對各累加器的修改應用一次。所以,若是想要一個不管在失敗仍是重複計算時都絕對可靠的累加器,咱們必須把它放在 foreach() 這樣的行動操做中。轉化操做中累加器可能會發生不止一次更新

5.1.2 自定義累加器

自定義累加器類型的功能在1.X版本中就已經提供了,可是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,並且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。實現自定義類型累加器須要繼承AccumulatorV2並至少覆寫下例中出現的方法,下面這個累加器能夠用於在程序運行過程當中收集一些文本類信息,最終以Set[String]的形式返回。

package com.atguigu.spark

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._

class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
  private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()

  override def isZero: Boolean = {
    _logArray.isEmpty
  }

  override def reset(): Unit = {
    _logArray.clear()
  }

  override def add(v: String): Unit = {
    _logArray.add(v)
  }

  override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
    other match {
      case o: LogAccumulator => _logArray.addAll(o.value)
    }

  }

  override def value: java.util.Set[String] = {
    java.util.Collections.unmodifiableSet(_logArray)
  }

  override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
    val newAcc = new LogAccumulator()
    _logArray.synchronized{
      newAcc._logArray.addAll(_logArray)
    }
    newAcc
  }
}

// 過濾掉帶字母的
object LogAccumulator {
  def main(args: Array[String]) {
    val conf=new SparkConf().setAppName("LogAccumulator")
    val sc=new SparkContext(conf)

    val accum = new LogAccumulator
    sc.register(accum, "logAccum")
    val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
      val pattern = """^-?(\d+)"""
      val flag = line.matches(pattern)
      if (!flag) {
        accum.add(line)
      }
      flag
    }).map(_.toInt).reduce(_ + _)

    println("sum: " + sum)
    for (v <- accum.value) print(v + "")
    println()
    sc.stop()
  }
}

5.2 廣播變量(調優策略)

廣播變量用來高效分發較大的對象。向全部工做節點發送一個較大的只讀值,以供一個或多個Spark操做使用。好比,若是你的應用須要向全部節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起來都很順手。 在多個並行操做中使用同一個變量,可是 Spark會爲每一個任務分別發送。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)

 

scala> broadcastVar.value

res33: Array[Int] = Array(1, 2, 3)

使用廣播變量的過程以下:

(1) 經過對一個類型 T 的對象調用 SparkContext.broadcast 建立出一個 Broadcast[T] 對象。 任何可序列化的類型均可以這麼實現。

(2) 經過 value 屬性訪問該對象的值(Java 中爲 value() 方法)

(3) 變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)

6章 擴展

6.1 RDD相關概念關係

 

輸入可能以多個文件的形式存儲在HDFS上,每一個File都包含了不少塊,稱爲Block。當Spark讀取這些文件做爲輸入時,會根據具體數據格式對應的InputFormat進行解析,通常是將若干個Block合併成一個輸入分片,稱爲InputSplit,注意InputSplit不能跨越文件。隨後將爲這些輸入分片生成具體的TaskInputSplitTask是一一對應的關係。隨後這些具體的Task每一個都會被分配到集羣上的某個節點的某個Executor去執行。

1) 每一個節點能夠起一個或多個Executor

2) 每一個Executor由若干core組成,每一個Executor的每一個core一次只能執行一個Task

3) 每一個Task執行的結果就是生成了目標RDD的一個partiton

注意這裏的core是虛擬的core而不是機器的物理CPU核,能夠理解爲就是Executor的一個工做線程。而 Task被執行的併發度 = Executor數目 * 每一個Executor核數。至於partition的數目:

1) 對於數據讀入階段,例如sc.textFile,輸入文件被劃分爲多少InputSplit就會須要多少初始Task

2) Map階段partition數目保持不變。

3) Reduce階段,RDD的聚合會觸發shuffle操做,聚合後的RDDpartition數目跟具體操做有關,例如repartition操做會聚合成指定分區數,還有一些算子是可配置的。

RDD在計算的時候,每一個分區都會起一個task,因此rdd的分區數目決定了總的的task數目。申請的計算節點(Executor)數目和每一個計算節點核數,決定了你同一時刻能夠並行執行的task

好比的RDD100個分區,那麼計算的時候就會生成100task,你的資源配置爲10個計算節點,每一個兩2個核,同一時刻能夠並行的task數目爲20,計算這個RDD就須要5個輪次。若是計算資源不變,你有101task的話,就須要6個輪次,在最後一輪中,只有一個task在執行,其他核都在空轉。若是資源不變,你的RDD只有2個分區,那麼同一時刻只有2task運行,其他18個核空轉,形成資源浪費。這就是在spark調優中,增大RDD分區數目,增大任務並行度的作法。

相關文章
相關標籤/搜索