RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象。代碼中是一個抽象類,它表明一個不可變、可分區、裏面的元素可並行計算的集合。java
1) 一組分區(Partition),即數據集的基本組成單位;mysql
2) 一個計算每一個分區的函數;es6
3) RDD之間的依賴關係;面試
4) 一個Partitioner,即RDD的分片函數;算法
5) 一個列表,存儲存取每一個Partition的優先位置(preferred location)。sql
RDD表示只讀的分區的數據集,對RDD進行改動,只能經過RDD的轉換操做,由一個RDD獲得一個新的RDD,新的RDD包含了從其餘RDD衍生所必需的信息。RDDs之間存在依賴,RDD的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化RDD來切斷血緣關係。shell
RDD邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個compute函數獲得每一個分區的數據。若是RDD是經過已有的文件系統構建,則compute函數是讀取指定文件系統中的數據,若是RDD是經過其餘RDD轉換而來,則compute函數是執行轉換邏輯將其餘RDD的數據進行轉換。數據庫
以下圖所示,RDD是隻讀的,要想改變RDD中的數據,只能在現有的RDD基礎上建立新的RDD。apache
由一個RDD轉換到另外一個RDD,能夠經過豐富的操做算子實現,再也不像MapReduce那樣只能寫map和reduce了,以下圖所示。編程
RDD的操做算子包括兩類,一類叫作transformations,它是用來將RDD進行轉化,構建RDD的血緣關係;另外一類叫作actions,它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中。下圖是RDD所支持的操做算子列表。
RDDs經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區是一一對應的,另外一種是寬依賴,下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。
若是在應用程序中屢次使用同一個RDD,能夠將該RDD緩存起來,該RDD只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用。以下圖所示,RDD-1通過一系列的轉換後獲得RDD-n並保存到hdfs,RDD-1在這一過程當中會有個中間結果,若是將其緩存到內存,那麼在隨後的RDD-1轉換到RDD-m這一過程當中,就不會計算其以前的RDD-0了。
雖然RDD的血緣關係自然地能夠實現容錯,當RDD的某個分區數據失敗或丟失,能夠經過血緣關係重建。可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。爲此,RDD支持checkpoint將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於checkpoint後的RDD不須要知道它的父RDDs了,它能夠從checkpoint處拿到數據。
在Spark中,RDD被表示爲對象,經過對象上的方法調用來對RDD進行轉換。通過一系列的transformations定義RDD以後,就能夠調用actions觸發RDD的計算,action能夠是嚮應用程序返回結果(count, collect等),或者是向存儲系統保存數據(saveAsTextFile等)。在Spark中,只有遇到action,纔會執行RDD的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。
要使用Spark,開發者須要編寫一個Driver程序,它被提交到集羣以調度運行Worker,以下圖所示。Driver中定義了一個或多個RDD,並調用RDD上的action,Worker則執行RDD分區計算任務。
在Spark中建立RDD的建立方式能夠分爲三種:從集合中建立RDD;從外部存儲建立RDD;從其餘RDD建立。
從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD
1)使用parallelize()從集合建立
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
2)使用makeRDD()從集合建立
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
|
包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等,咱們會在第4章詳細介紹。
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
詳見2.3節
RDD總體上分爲Value類型和Key-Value類型
1. 做用:返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成
2. 需求:建立一個1-10數組的RDD,將全部元素*2造成新的RDD
(1)建立
scala> var source = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)打印
scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)將全部元素*2
scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
(4)打印最終結果
scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
1. 做用:相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那麼map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理全部分區。
2. 需求:建立一個RDD,使每一個元素*2組成新的RDD
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每一個元素*2組成新的RDD
scala> rdd.mapPartitions(x=>x.map(_*2))
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27
(3)打印新的RDD
scala> res3.collect
res4: Array[Int] = Array(2, 4, 6, 8)
1. 做用:相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U];
2. 需求:建立一個RDD,使每一個元素跟所在分區造成一個元組組成一個新的RDD
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每一個元素跟所在分區造成一個元組組成一個新的RDD
scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))
indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26
(3)打印新的RDD
scala> indexRdd.collect
res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))
1. 做用:相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
2. 需求:建立一個元素爲1-5的RDD,運用flatMap建立一個新的RDD,新的RDD爲原RDD的每一個元素的擴展(1->1,2->1,2……5->1,2,3,4,5)
(1)建立
scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
(2)打印
scala> sourceFlat.collect()
res11: Array[Int] = Array(1, 2, 3, 4, 5)
(3)根據原RDD建立新RDD(1->1,2->1,2……5->1,2,3,4,5)
scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26
(4)打印新RDD
scala> flatMap.collect()
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
1. map():每次處理一條數據。
2. mapPartition():每次處理一個分區的數據,這個分區的數據處理完後,原RDD中分區的數據才能釋放,可能致使OOM。
3. 開發指導:當內存空間較大的時候建議使用mapPartition(),以提升處理效率。
1. 做用:將每個分區造成一個數組,造成新的RDD類型時RDD[Array[T]]
2. 需求:建立一個4個分區的RDD,並將每一個分區的數據放到一個數組
(1)建立
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
(2)將每一個分區的數據放到一個數組並收集到Driver端打印
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
1. 做用:分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。
2. 需求:建立一個RDD,按照元素模以2的值進行分組。
(1)建立
scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
(2)按照元素模以2的值進行分組
scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
(3)打印結果
scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
1. 做用:過濾。返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成。
2. 需求:建立一個RDD(由字符串組成),過濾出一個新RDD(包含」xiao」子串)
(1)建立
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
(2)打印
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
(3)過濾出含」 xiao」子串的造成一個新的RDD
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
(4)打印新RDD
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
1. 做用:以指定的隨機種子隨機抽樣出數量爲fraction的數據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣,seed用於指定隨機數生成器種子。例子從RDD中隨機且有放回的抽出50%的數據,隨機種子值爲3(便可能以1 2 3的其中一個起始值)
2. 需求:建立一個RDD(1-10),從中選擇放回和不放回抽樣
(1)建立RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
(2)打印
scala> rdd.collect()
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)放回抽樣
scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26
(4)打印放回抽樣結果
scala> sample1.collect()
res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)
(5)不放回抽樣
scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26
(6)打印不放回抽樣結果
scala> sample2.collect()
res17: Array[Int] = Array(1, 9)
1. 做用:對源RDD進行去重後返回一個新的RDD。默認狀況下,只有8個並行任務來操做,可是能夠傳入一個可選的numTasks參數改變它。
2. 需求:建立一個RDD,使用distinct()對其去重。
(1)建立一個RDD
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
(2)對RDD進行去重(不指定並行度)
scala> val unionRDD = distinctRdd.distinct()
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26
(3)打印去重後生成的新RDD
scala> unionRDD.collect()
res20: Array[Int] = Array(1, 9, 5, 6, 2)
(4)對RDD(指定並行度爲2)
scala> val unionRDD = distinctRdd.distinct(2)
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26
(5)打印去重後生成的新RDD
scala> unionRDD.collect()
res21: Array[Int] = Array(6, 2, 1, 9, 5)
1. 做用:縮減分區數,用於大數據集過濾後,提升小數據集的執行效率。
2. 需求:建立一個4個分區的RDD,對其縮減分區
(1)建立一個RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24
(2)查看RDD的分區數
scala> rdd.partitions.size
res20: Int = 4
(3)對RDD從新分區
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26
(4)查看新RDD的分區數
scala> coalesceRDD.partitions.size
res21: Int = 3
1. 做用:根據分區數,從新經過網絡隨機洗牌全部數據。
2. 需求:建立一個4個分區的RDD,對其從新分區
(1)建立一個RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24
(2)查看RDD的分區數
scala> rdd.partitions.size
res22: Int = 4
(3)對RDD從新分區
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26
(4)查看新RDD的分區數
scala> rerdd.partitions.size
res23: Int = 2
1. coalesce從新分區,能夠選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。
2. repartition其實是調用的coalesce,默認是不進行shuffle的。源碼以下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
1. 做用;使用func先對數據進行處理,按照處理後的數據比較結果排序,默認爲正序。
2. 需求:建立一個RDD,按照不一樣的規則進行排序
(1)建立一個RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
(2)按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)
(3)按照與3餘數的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)
1. 做用:管道,針對每一個分區,都執行一個shell腳本,返回輸出的RDD。
注意:腳本須要放在Worker節點能夠訪問到的位置
2. 需求:編寫一個腳本,使用管道將腳本做用於RDD上。
(1)編寫一個腳本
Shell腳本
#!/bin/sh
echo "AA"
while read LINE; do
echo ">>>"${LINE}
done
(2)建立一個只有一個分區的RDD
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
(3)將腳本做用該RDD並打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
(4)建立一個有兩個分區的RDD
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24
(5)將腳本做用該RDD並打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
1. 做用:對源RDD和參數RDD求並集後返回一個新的RDD
2. 需求:建立兩個RDD,求並集
(1)建立第一個RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
(2)建立第二個RDD
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
(3)計算兩個RDD的並集
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28
(4)打印並集結果
scala> rdd3.collect()
res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
1. 做用:計算差的一種函數,去除兩個RDD中相同的元素,不一樣的RDD將保留下來
2. 需求:建立兩個RDD,求第一個RDD與第二個RDD的差集
(1)建立第一個RDD
scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24
(2)建立第二個RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24
(3)計算第一個RDD與第二個RDD的差集並打印
scala> rdd.subtract(rdd1).collect()
res27: Array[Int] = Array(8, 6, 7)
1. 做用:對源RDD和參數RDD求交集後返回一個新的RDD
2. 需求:建立兩個RDD,求兩個RDD的交集
(1)建立第一個RDD
scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
(2)建立第二個RDD
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
(3)計算兩個RDD的交集
scala> val rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28
(4)打印計算結果
scala> rdd3.collect()
res19: Array[Int] = Array(5, 6, 7)
1. 做用:笛卡爾積(儘可能避免使用)
2. 需求:建立兩個RDD,計算兩個RDD的笛卡爾積
(1)建立第一個RDD
scala> val rdd1 = sc.parallelize(1 to 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24
(2)建立第二個RDD
scala> val rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24
(3)計算兩個RDD的笛卡爾積並打印
scala> rdd1.cartesian(rdd2).collect()
res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))
1. 做用:將兩個RDD組合成Key/Value形式的RDD,這裏默認兩個RDD的partition數量以及元素數量都相同,不然會拋出異常。
2. 需求:建立兩個RDD,並將兩個RDD組合到一塊兒造成一個(k,v)RDD
(1)建立第一個RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
(2)建立第二個RDD(與1分區數相同)
scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
(3)第一個RDD組合第二個RDD並打印
scala> rdd1.zip(rdd2).collect
res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
(4)第二個RDD組合第一個RDD並打印
scala> rdd2.zip(rdd1).collect
res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3))
(5)建立第三個RDD(與1,2分區數不一樣)
scala> val rdd3 = sc.parallelize(Array("a","b","c"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
(6)第一個RDD組合第三個RDD並打印
scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
... 48 elided
1. 做用:對pairRDD進行分區操做,若是原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 不然會生成ShuffleRDD,即會產生shuffle過程。
2. 需求:建立一個4個分區的RDD,對其從新分區
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24
(2)查看RDD的分區數
scala> rdd.partitions.size
res24: Int = 4
(3)對RDD從新分區
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26
(4)查看新RDD的分區數
scala> rdd2.partitions.size
res25: Int = 2
1. 做用:groupByKey也是對每一個key進行操做,但只生成一個sequence。
2. 需求:建立一個pairRDD,將相同key對應值聚合到一個sequence中,並計算相同key對應值的相加結果。
(1)建立一個pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26
(2)將相同key對應值聚合到一個sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28
(3)打印結果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
(4)計算相同key對應值的相加結果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31
(5)打印結果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
1. reduceByKey:按照key進行聚合,在shuffle以前有combine(預聚合)操做,返回結果是RDD[k,v].
2. groupByKey:按照key進行分組,直接進行shuffle。
3. 開發指導:reduceByKey比groupByKey,建議使用。可是須要注意是否會影響業務邏輯。
1. 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,reduce任務的個數能夠經過第二個可選的參數來設置。
2. 需求:建立一個pairRDD,計算相同key對應值的相加結果
(1)建立一個pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24
(2)計算相同key對應值的相加結果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26
(3)打印結果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))
參數:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
1. 做用:在kv對的RDD中,,按key將value進行分組合並,合併時,將每一個value和初始值做爲seq函數的參數,進行計算,返回的結果做爲一個新的kv對,而後再將結果按照key進行合併,最後將每一個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果做爲一個新的kv對輸出。
2. 參數描述:
(1)zeroValue:給每個分區中的每個key一個初始值;
(2)seqOp:函數用於在每個分區中用初始值逐步迭代value;
(3)combOp:函數用於合併每一個分區中的結果。
3. 需求:建立一個pairRDD,取出每一個分區相同key對應值的最大值,而後相加
4. 需求分析
圖1-aggregate案例分析
(1)建立一個pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)取出每一個分區相同key對應值的最大值,而後相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
(3)打印結果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
參數:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
(1)建立一個pairRDD
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24
(2)計算相同key對應值的相加結果
scala> val agg = rdd.foldByKey(0)(_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26
(3)打印結果
scala> agg.collect()
res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
參數:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
(1)createCombiner: combineByKey() 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。若是這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函數來建立那個鍵對應的累加器的初始值
(2)mergeValue: 若是這是一個在處理當前分區以前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併
(3)mergeCombiners: 因爲每一個分區都是獨立處理的, 所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應同一個鍵的累加器, 就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。
圖2- combineByKey案例分析
(1)建立一個pairRDD
scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26
(2)將相同key對應的值相加,同時記錄該key出現的次數,放入一個二元組
scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))
combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[5] at combineByKey at <console>:28
(3)打印合並後的結果
scala> combine.collect
res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
(4)計算平均值
scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}
result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30
(5)打印結果
scala> result.collect()
res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
1. 做用:在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
2. 需求:建立一個pairRDD,按照key的正序和倒序進行排序
(1)建立一個pairRDD
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
(2)按照key的正序
scala> rdd.sortByKey(true).collect()
res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
(3)按照key的倒序
scala> rdd.sortByKey(false).collect()
res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
1. 針對於(K,V)形式的類型只對V進行操做
2. 需求:建立一個pairRDD,並將value添加字符串"|||"
(1)建立一個pairRDD
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24
(2)對value添加字符串"|||"
scala> rdd3.mapValues(_+"|||").collect()
res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
1. 做用:在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD
2. 需求:建立兩個pairRDD,並將key相同的數據聚合到一個元組。
(1)建立第一個pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24
(2)建立第二個pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
(3)join操做並打印結果
scala> rdd.join(rdd1).collect()
res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
1. 做用:在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
2. 需求:建立兩個pairRDD,並將key相同的數據聚合到一個迭代器。
(1)建立第一個pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24
(2)建立第二個pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24
(3)cogroup兩個RDD並打印結果
scala> rdd.cogroup(rdd1).collect()
res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
1. 數據結構:時間戳,省份,城市,用戶,廣告,中間字段使用空格分割。
樣本以下:
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
2. 需求:統計出每個省份廣告被點擊次數的TOP3
3. 實現過程:
package com.atguigu.practice
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//需求:統計出每個省份廣告被點擊次數的TOP3
object Practice {
def main(args: Array[String]): Unit = {
//1.初始化spark配置信息並創建與spark的鏈接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")
val sc = new SparkContext(sparkConf)
//2.讀取數據生成RDD:TS,Province,City,User,AD
val line = sc.textFile("E:\\IDEAWorkSpace\\SparkTest\\src\\main\\resources\\agent.log")
//3.按照最小粒度聚合:((Province,AD),1)
val provinceAdAndOne = line.map { x =>
val fields: Array[String] = x.split(" ")
((fields(1), fields(3)), 1)
}
//4.計算每一個省中每一個廣告被點擊的總數:((Province,AD),sum)
val provinceAdToSum = provinceAdAndOne.reduceByKey(_ + _)
//5.將省份做爲key,廣告加點擊數爲value:(Province,(AD,sum))
val provinceToAdSum = provinceAdToSum.map(x => (x._1._1, (x._1._2, x._2)))
//6.將同一個省份的全部廣告進行聚合(Province,List((AD1,sum1),(AD2,sum2)...))
val provinceGroup = provinceToAdSum.groupByKey()
//7.對同一個省份全部廣告的集合進行排序並取前3條,排序規則爲廣告點擊總數
val provinceAdTop3 = provinceGroup.mapValues { x =>
x.toList.sortWith((x, y) => x._2 > y._2).take(3)
}
//8.將數據拉取到Driver端並打印
provinceAdTop3.collect().foreach(println)
//9.關閉與spark的鏈接
sc.stop()
}
}
1. 做用:經過func函數彙集RDD中的全部元素,先聚合分區內數據,再聚合分區間數據。
2. 需求:建立一個RDD,將全部元素聚合獲得結果
(1)建立一個RDD[Int]
scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
(2)聚合RDD[Int]全部元素
scala> rdd1.reduce(_+_)
res50: Int = 55
(3)建立一個RDD[String]
scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
(4)聚合RDD[String]全部數據
scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)
1. 做用:在驅動程序中,以數組的形式返回數據集的全部元素。
2. 需求:建立一個RDD,並將RDD內容收集到Driver端打印
(1)建立一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)將結果收集到Driver端
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
1. 做用:返回RDD中元素的個數
2. 需求:建立一個RDD,統計該RDD的條數
(1)建立一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.count
res1: Long = 10
1. 做用:返回RDD中的第一個元素
2. 需求:建立一個RDD,返回該RDD中的第一個元素
(1)建立一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.first
res2: Int = 1
1. 做用:返回一個由RDD的前n個元素組成的數組
2. 需求:建立一個RDD,統計該RDD的條數
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.take(3)
res10: Array[Int] = Array(2, 5, 4)
1. 做用:返回該RDD排序後的前n個元素組成的數組
2. 需求:建立一個RDD,統計該RDD的條數
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.takeOrdered(3)
res18: Array[Int] = Array(2, 3, 4)
1. 參數:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
2. 做用:aggregate函數將每一個分區裏面的元素經過seqOp和初始值進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個函數最終返回的類型不須要和RDD中元素類型一致。
3. 需求:建立一個RDD,將全部元素相加獲得結果
(1)建立一個RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
(2)將該RDD全部元素相加獲得結果
scala> rdd.aggregate(0)(_+_,_+_)
res22: Int = 55
1. 做用:摺疊操做,aggregate的簡化操做,seqop和combop同樣。
2. 需求:建立一個RDD,將全部元素相加獲得結果
(1)建立一個RDD
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24
(2)將該RDD全部元素相加獲得結果
scala> rdd.fold(0)(_+_)
res24: Int = 55
做用:將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本
做用:將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。
做用:用於將RDD中的元素序列化成對象,存儲到文件中。
1. 做用:針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。
2. 需求:建立一個PairRDD,統計每種key的個數
(1)建立一個PairRDD
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
(2)統計每種key的個數
scala> rdd.countByKey
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
1. 做用:在數據集的每個元素上,運行函數func進行更新。
2. 需求:建立一個RDD,對每一個元素進行打印
(1)建立一個RDD
scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24
(2)對該RDD每一個元素進行打印
scala> rdd.foreach(println(_))
3
4
5
1
2
在實際開發中咱們每每須要本身定義一些對於RDD的操做,那麼此時須要主要的是,初始化工做是在Driver端進行的,而實際運行程序是在Executor端進行的,這就涉及到了跨進程通訊,是須要序列化的。下面咱們看幾個例子:
1.建立一個類
class Search(s:String){
//過濾出包含字符串的數據
def isMatch(s: String): Boolean = {
s.contains(query)
}
//過濾出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
//過濾出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
2.建立Spark主程序
object SeriTest {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.建立一個RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.建立一個Search對象
val search = new Search()
//4.運用第一個過濾函數並打印結果
val match1: RDD[String] = search.getMatche1(rdd)
match1.collect().foreach(println)
}
}
3.運行程序
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
4.問題說明
//過濾出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
在這個方法中所調用的方法isMatch()是定義在Search這個類中的,實際上調用的是this. isMatch(),this表示Search這個類的對象,程序在運行過程當中須要將Search對象序列化之後傳遞到Executor端。
5.解決方案
使類繼承scala.Serializable便可。
class Search() extends Serializable{...}
1.建立Spark主程序
object TransmitTest {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.建立一個RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.建立一個Search對象
val search = new Search()
//4.運用第一個過濾函數並打印結果
val match1: RDD[String] = search.getMatche2(rdd)
match1.collect().foreach(println)
}
}
2.運行程序
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
3.問題說明
//過濾出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
在這個方法中所調用的方法query是定義在Search這個類中的字段,實際上調用的是this. query,this表示Search這個類的對象,程序在運行過程當中須要將Search對象序列化之後傳遞到Executor端。
4.解決方案
1)使類繼承scala.Serializable便可。
class Search() extends Serializable{...}
2)將類變量query賦值給局部變量
修改getMatche2爲
//過濾出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
val query_ : String = this.query//將類變量賦值給局部變量
rdd.filter(x => x.contains(query_))
}
RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。
(1)讀取一個HDFS文件並將其中內容映射成一個個元組
scala> val wordAndOne = sc.textFile("/fruit.tsv").flatMap(_.split("\t")).map((_,1))
wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:24
(2)統計每一種key對應的個數
scala> val wordAndCount = wordAndOne.reduceByKey(_+_)
wordAndCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26
(3)查看「wordAndOne」的Lineage
scala> wordAndOne.toDebugString
res5: String =
(2) MapPartitionsRDD[22] at map at <console>:24 []
| MapPartitionsRDD[21] at flatMap at <console>:24 []
| /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []
| /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
(4)查看「wordAndCount」的Lineage
scala> wordAndCount.toDebugString
res6: String =
(2) ShuffledRDD[23] at reduceByKey at <console>:26 []
+-(2) MapPartitionsRDD[22] at map at <console>:24 []
| MapPartitionsRDD[21] at flatMap at <console>:24 []
| /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []
| /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []
(5)查看「wordAndOne」的依賴類型
scala> wordAndOne.dependencies
res7: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@5d5db92b)
(6)查看「wordAndCount」的依賴類型
scala> wordAndCount.dependencies
res8: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@63f3e6a8)
注意:RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴咱們形象的比喻爲獨生子女
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引發shuffle,總結:寬依賴咱們形象的比喻爲超生
DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。
RDD任務切分中間分爲:Application、Job、Stage和Task
1)Application:初始化一個SparkContext即生成一個Application
2)Job:一個Action算子就會生成一個Job
3)Stage:根據RDD之間的依賴關係的不一樣將Job劃分紅不一樣的Stage,遇到一個寬依賴則劃分一個Stage。
4)Task:Stage是一個TaskSet,將Stage劃分的結果發送到不一樣的Executor執行即爲一個Task。
注意:Application->Job->Stage->Task每一層都是1對n的關係。
RDD經過persist方法或cache方法能夠將前面的計算結果緩存,默認狀況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中。
可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
在存儲級別的末尾加上「_2」來把持久化數據存爲兩份
緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。
(1)建立一個RDD
scala> val rdd = sc.makeRDD(Array("atguigu"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at <console>:25
(2)將RDD轉換爲攜帶當前時間戳不作緩存
scala> val nocache = rdd.map(_.toString+System.currentTimeMillis)
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27
(3)屢次打印結果
scala> nocache.collect
res0: Array[String] = Array(atguigu1538978275359)
scala> nocache.collect
res1: Array[String] = Array(atguigu1538978282416)
scala> nocache.collect
res2: Array[String] = Array(atguigu1538978283199)
(4)將RDD轉換爲攜帶當前時間戳並作緩存
scala> val cache = rdd.map(_.toString+System.currentTimeMillis).cache
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27
(5)屢次打印作了緩存的結果
scala> cache.collect
res3: Array[String] = Array(atguigu1538978435705)
scala> cache.collect
res4: Array[String] = Array(atguigu1538978435705)
scala> cache.collect
res5: Array[String] = Array(atguigu1538978435705)
Spark中對於數據的保存除了持久化操做以外,還提供了一種檢查點的機制,檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。檢查點經過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。
爲當前RDD設置檢查點。該函數將會建立一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程當中,該RDD的全部依賴於父RDD中的信息將所有被移除。對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。
案例實操:
(1)設置檢查點
scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")
(2)建立一個RDD
scala> val rdd = sc.parallelize(Array("atguigu"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
(3)將RDD轉換爲攜帶當前時間戳並作checkpoint
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26
scala> ch.checkpoint
(4)屢次打印結果
scala> ch.collect
res55: Array[String] = Array(atguigu1538981860336)
scala> ch.collect
res56: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res57: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res58: Array[String] = Array(atguigu1538981860504)
Spark目前支持Hash分區和Range分區,用戶也能夠自定義分區,Hash分區爲當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據通過Shuffle過程屬於哪一個分區和Reduce的個數
注意:
(1)只有Key-Value類型的RDD纔有分區的,非Key-Value類型的RDD分區的值是None
(2)每一個RDD的分區ID範圍:0~numPartitions-1,決定這個值是屬於那個分區的。
能夠經過使用RDD的partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象, 經過get方法獲取其中的值。相關源碼以下:
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
(1)建立一個pairRDD
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)查看RDD的分區器
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None
(3)導入HashPartitioner類
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
(4)使用HashPartitioner對RDD進行從新分區
scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27
(5)查看從新分區後RDD的分區器
scala> partitioned.partitioner
res2: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
HashPartitioner分區的原理:對於給定的key,計算其hashCode,併除以分區的個數取餘,若是餘數小於0,則用餘數+分區的個數(不然加0),最後返回的值就是這個key所屬的分區ID。
使用Hash分區的實操
scala> nopar.partitioner
res20: Option[org.apache.spark.Partitioner] = None
scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala>nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect
res0: Array[String] = Array("0 : ", 1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : ", 5 : (2,3), 6 : (3,6), 7 : (3,8))
scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[12] at partitionBy at <console>:26
scala> hashpar.count
res18: Long = 6
scala> hashpar.partitioner
res21: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)
scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res19: Array[Int] = Array(0, 3, 1, 2, 0, 0, 0)
HashPartitioner分區弊端:可能致使每一個分區中數據量的不均勻,極端狀況下會致使某些分區擁有RDD的所有數據。
RangePartitioner做用:將必定範圍內的數映射到某一個分區內,儘可能保證每一個分區中數據量的均勻,並且分區與分區之間是有序的,一個分區中的元素確定都是比另外一個分區內的元素小或者大,可是分區內的元素是不能保證順序的。簡單的說就是將必定範圍內的數映射到某一個分區內。實現過程爲:
第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每一個分區的最大key值,造成一個Array[KEY]類型的數組變量rangeBounds;
第二步:判斷key在rangeBounds中所處的範圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是能夠排序的
要實現自定義的分區器,你須要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。
(1)numPartitions: Int:返回建立出來的分區數。
(2)getPartition(key: Any): Int:返回給定鍵的分區編號(0到numPartitions-1)。
(3)equals():Java 判斷相等性的標準方法。這個方法的實現很是重要,Spark 須要用這個方法來檢查你的分區器對象是否和其餘分區器實例相同,這樣 Spark 才能夠判斷兩個 RDD 的分區方式是否相同。
需求:將相同後綴的數據寫入相同的文件,經過將相同後綴的數據分區到相同的分區並保存輸出來實現。
(1)建立一個pairRDD
scala> val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)定義一個自定義分區類
scala> :paste
// Entering paste mode (ctrl-D to finish)
class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
//覆蓋分區數
override def numPartitions: Int = numParts
//覆蓋分區號獲取函數
override def getPartition(key: Any): Int = {
val ckey: String = key.toString
ckey.substring(ckey.length-1).toInt%numParts
}
}
// Exiting paste mode, now interpreting.
defined class CustomerPartitioner
(3)將RDD使用自定義的分區類進行從新分區
scala> val par = data.partitionBy(new CustomerPartitioner(2))
par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27
(4)查看從新分區後的數據分佈
scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))
使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法便可。Spark 中有許多依賴於數據混洗的方法,好比 join() 和 groupByKey(),它們也能夠接收一個可選的 Partitioner 對象來控制輸出數據的分區方式。
Spark的數據讀取及數據保存能夠從兩個維度來做區分:文件格式以及文件系統。
文件格式分爲:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系統分爲:本地文件系統、HDFS、HBASE以及數據庫。
1)數據讀取:textFile(String)
scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
2)數據保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/fruitOut")
若是JSON文件中每一行就是一個JSON記錄,那麼能夠經過將JSON文件當作文本文件來讀取,而後利用相關的JSON庫對每一條數據進行JSON解析。
注意:使用RDD讀取JSON文件處理很複雜,同時SparkSQL集成了很好的處理JSON文件的方式,因此應用中可能是採用SparkSQL處理JSON文件。
(1)導入解析json所需的包
scala> import scala.util.parsing.json.JSON
(2)上傳json文件到HDFS
[atguigu@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
(3)讀取文件
scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
(4)解析json數據
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
(5)打印
scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中,能夠調用 sequenceFile[ keyClass, valueClass](path)。
注意:SequenceFile文件只針對PairRDD
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)將RDD保存爲Sequence文件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)查看該文件
[atguigu@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile
[atguigu@hadoop102 seqFile]$ ll
總用量 8
-rw-r--r-- 1 atguigu atguigu 108 10月 9 10:29 part-00000
-rw-r--r-- 1 atguigu atguigu 124 10月 9 10:29 part-00001
-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:29 _SUCCESS
[atguigu@hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
(4)讀取Sequence文件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)打印讀取後的Sequence文件
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
對象文件是將對象序列化後保存的文件,採用Java的序列化機制。能夠經過objectFile[k,v](path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD,也能夠經過調用saveAsObjectFile() 實現對對象文件的輸出。由於是序列化因此要指定類型。
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)將RDD保存爲Object文件
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
(3)查看該文件
[atguigu@hadoop102 objectFile]$ pwd
/opt/module/spark/objectFile
[atguigu@hadoop102 objectFile]$ ll
總用量 8
-rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00000
-rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00001
-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:37 _SUCCESS
[atguigu@hadoop102 objectFile]$ cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l
(4)讀取Object文件
scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
(5)打印讀取後的Sequence文件
scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)
Spark的整個生態系統與Hadoop是徹底兼容的,因此對於Hadoop所支持的文件類型或者數據庫類型,Spark也一樣支持.另外,因爲Hadoop的API有新舊兩個版本,因此Spark爲了可以兼容Hadoop全部的版本,也提供了兩套建立操做接口.對於外部存儲建立操做而言,hadoopRDD和newHadoopRDD是最爲抽象的兩個函數接口,主要包含如下四個參數.
1)輸入格式(InputFormat): 制定數據輸入的類型,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2)鍵類型: 指定[K,V]鍵值對中K的類型
3)值類型: 指定[K,V]鍵值對中V的類型
4)分區值: 指定由外部存儲生成的RDD的partition數量的最小值,若是沒有指定,系統會使用默認值defaultMinSplits
注意:其餘建立操做的API接口都是爲了方便最終的Spark程序開發者而設置的,是這兩個接口的高效實現版本.例如,對於textFile而言,只有path這個指定文件路徑的參數,其餘參數在系統內部指定了默認值。
1.在Hadoop中以壓縮形式存儲的數據,不須要指定解壓方式就可以進行讀取,由於Hadoop自己有一個解壓器會根據壓縮文件的後綴推斷解壓算法進行解壓.
2.若是用Spark從Hadoop中讀取某種類型的數據不知道怎麼讀取的時候,上網查找一個使用map-reduce的時候是怎麼讀取這種這種數據的,而後再將對應的讀取方式改寫成上面的hadoopRDD和newAPIHadoopRDD兩個類就好了
支持經過Java JDBC訪問關係型數據庫。須要經過JdbcRDD進行,示例以下:
(1)添加依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
(2)Mysql讀取:
package com.atguigu
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
//1.建立spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.建立SparkContext
val sc = new SparkContext(sparkConf)
//3.定義鏈接mysql的參數
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//建立JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
//打印最後結果
println(rdd.count())
rdd.foreach(println)
sc.stop()
}
}
Mysql寫入:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root", "hive")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
因爲 org.apache.hadoop.hbase.mapreduce.TableInputFormat 類的實現,Spark 能夠經過Hadoop輸入格式訪問HBase。這個輸入格式會返回鍵值對數據,其中鍵的類型爲org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型爲org.apache.hadoop.hbase.client.
Result。
(1)添加依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
(2)從HBase讀取數據
package com.atguigu
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.Bytes
object HBaseSpark {
def main(args: Array[String]): Unit = {
//建立spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//建立SparkContext
val sc = new SparkContext(sparkConf)
//構建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//從HBase讀取數據造成RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val count: Long = hbaseRDD.count()
println(count)
//對hbaseRDD進行處理
hbaseRDD.foreach {
case (_, result) =>
val key: String = Bytes.toString(result.getRow)
val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))
println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
}
//關閉鏈接
sc.stop()
}
}
3)往HBase寫入
def main(args: Array[String]) {
//獲取Spark配置信息並建立與spark的鏈接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
//建立HBaseConf
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
//構建Hbase表描述器
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//建立Hbase表
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)
//定義往Hbase插入數據的方法
def convert(triple: (Int, String, Int)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
//建立一個RDD
val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
//將RDD內容寫到HBase
val localData = initialRDD.map(convert)
localData.saveAsHadoopDataset(jobConf)
}
累加器用來對信息進行聚合,一般在向 Spark傳遞函數時,好比使用 map() 函數或者用 filter() 傳條件時,可使用驅動器程序中定義的變量,可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。若是咱們想實現全部分片處理時更新共享變量的功能,那麼累加器能夠實現咱們想要的效果。
針對一個輸入的日誌文件,若是咱們想計算文件中全部空行的數量,咱們能夠編寫如下程序:
scala> val notice = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32
scala> val blanklines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0
scala> val tmp = notice.flatMap(line => {
| if (line == "") {
| blanklines += 1
| }
| line.split(" ")
| })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36
scala> tmp.count()
res31: Long = 3213
scala> blanklines.value
res32: Int = 171
累加器的用法以下所示。
經過在驅動器中調用SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器。返回值爲 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型。Spark閉包裏的執行器代碼可使用累加器的 += 方法(在Java中是 add)增長累加器的值。 驅動器程序能夠調用累加器的value屬性(在Java中使用value()或setValue())來訪問累加器的值。
注意:工做節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。
對於要在行動操做中使用的累加器,Spark只會把每一個任務對各累加器的修改應用一次。所以,若是想要一個不管在失敗仍是重複計算時都絕對可靠的累加器,咱們必須把它放在 foreach() 這樣的行動操做中。轉化操做中累加器可能會發生不止一次更新
自定義累加器類型的功能在1.X版本中就已經提供了,可是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,並且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。實現自定義類型累加器須要繼承AccumulatorV2並至少覆寫下例中出現的方法,下面這個累加器能夠用於在程序運行過程當中收集一些文本類信息,最終以Set[String]的形式返回。
package com.atguigu.spark
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
override def isZero: Boolean = {
_logArray.isEmpty
}
override def reset(): Unit = {
_logArray.clear()
}
override def add(v: String): Unit = {
_logArray.add(v)
}
override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
other match {
case o: LogAccumulator => _logArray.addAll(o.value)
}
}
override def value: java.util.Set[String] = {
java.util.Collections.unmodifiableSet(_logArray)
}
override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
val newAcc = new LogAccumulator()
_logArray.synchronized{
newAcc._logArray.addAll(_logArray)
}
newAcc
}
}
// 過濾掉帶字母的
object LogAccumulator {
def main(args: Array[String]) {
val conf=new SparkConf().setAppName("LogAccumulator")
val sc=new SparkContext(conf)
val accum = new LogAccumulator
sc.register(accum, "logAccum")
val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
val pattern = """^-?(\d+)"""
val flag = line.matches(pattern)
if (!flag) {
accum.add(line)
}
flag
}).map(_.toInt).reduce(_ + _)
println("sum: " + sum)
for (v <- accum.value) print(v + "")
println()
sc.stop()
}
}
廣播變量用來高效分發較大的對象。向全部工做節點發送一個較大的只讀值,以供一個或多個Spark操做使用。好比,若是你的應用須要向全部節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起來都很順手。 在多個並行操做中使用同一個變量,可是 Spark會爲每一個任務分別發送。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)
使用廣播變量的過程以下:
(1) 經過對一個類型 T 的對象調用 SparkContext.broadcast 建立出一個 Broadcast[T] 對象。 任何可序列化的類型均可以這麼實現。
(2) 經過 value 屬性訪問該對象的值(在 Java 中爲 value() 方法)。
(3) 變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。
輸入可能以多個文件的形式存儲在HDFS上,每一個File都包含了不少塊,稱爲Block。當Spark讀取這些文件做爲輸入時,會根據具體數據格式對應的InputFormat進行解析,通常是將若干個Block合併成一個輸入分片,稱爲InputSplit,注意InputSplit不能跨越文件。隨後將爲這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關係。隨後這些具體的Task每一個都會被分配到集羣上的某個節點的某個Executor去執行。
1) 每一個節點能夠起一個或多個Executor。
2) 每一個Executor由若干core組成,每一個Executor的每一個core一次只能執行一個Task。
3) 每一個Task執行的結果就是生成了目標RDD的一個partiton。
注意: 這裏的core是虛擬的core而不是機器的物理CPU核,能夠理解爲就是Executor的一個工做線程。而 Task被執行的併發度 = Executor數目 * 每一個Executor核數。至於partition的數目:
1) 對於數據讀入階段,例如sc.textFile,輸入文件被劃分爲多少InputSplit就會須要多少初始Task。
2) 在Map階段partition數目保持不變。
3) 在Reduce階段,RDD的聚合會觸發shuffle操做,聚合後的RDD的partition數目跟具體操做有關,例如repartition操做會聚合成指定分區數,還有一些算子是可配置的。
RDD在計算的時候,每一個分區都會起一個task,因此rdd的分區數目決定了總的的task數目。申請的計算節點(Executor)數目和每一個計算節點核數,決定了你同一時刻能夠並行執行的task。
好比的RDD有100個分區,那麼計算的時候就會生成100個task,你的資源配置爲10個計算節點,每一個兩2個核,同一時刻能夠並行的task數目爲20,計算這個RDD就須要5個輪次。若是計算資源不變,你有101個task的話,就須要6個輪次,在最後一輪中,只有一個task在執行,其他核都在空轉。若是資源不變,你的RDD只有2個分區,那麼同一時刻只有2個task運行,其他18個核空轉,形成資源浪費。這就是在spark調優中,增大RDD分區數目,增大任務並行度的作法。