Spark的RDD編程實戰案例java
做者:尹正傑apache
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。編程
RDD體現了裝飾者設計模式,將數據處理的邏輯進行封裝,接下來讓咱們一塊兒來體驗一下吧。設計模式
一.RDD概述數組
1>.什麼是RDD緩存
RDD全稱爲"Resilient Distributed Dataset",叫作彈性分佈式數據集,是Spark中最基本的數據抽象。
代碼中是一個抽象類,它表明一個不可變、可分區、裏面的元素可並行計算的集合。
2>.RDD的屬性網絡
Internally, each RDD is characterized by five main properties: A list of partitions: 一組分區(Partition),即數據集的基本組成單位; A function for computing each split: 一個計算每一個分區的函數,換句話說,是計算數據放在哪一個分區中; A list of dependencies on other RDDs: RDD之間的依賴關係; Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned): 一個Partitioner,即RDD的分片函數; Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file): 一個列表,存儲存取每一個Partition的優先位置(preferred location),即數據所存儲的節點;
3>.RDD的特色 dom
RDD表示只讀的分區的數據集,對RDD進行改動,只能經過RDD的轉換操做,由一個RDD獲得一個新的RDD,新的RDD包含了從其餘RDD衍生所必需的信息。
RDDs之間存在依賴,RDD的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化RDD來切斷血緣關係。
RDD有如下幾個顯著特色:
分區 RDD邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個compute函數獲得每一個分區的數據;
若是RDD是經過已有的文件系統構建,則compute函數是讀取指定文件系統中的數據,若是RDD是經過其餘RDD轉換而來,則compute函數是執行轉換邏輯將其餘RDD的數據進行轉換; 只讀 RDD是隻讀的,要想改變RDD中的數據,只能在現有的RDD基礎上建立新的RDD; 由一個RDD轉換到另外一個RDD,能夠經過豐富的操做算子實現,再也不像MapReduce那樣只能寫map和reduce了; RDD的操做算子(Operate)包括兩類:
transformations(轉換算子):
它是用來將RDD進行轉化,構建RDD的血緣關係;
actions(行動算子):
它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中;
依賴 RDDs經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下所示,依賴包括兩種:
窄依賴:
RDDs之間分區是一一對應的;
寬依賴:
下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。 緩存 若是在應用程序中屢次使用同一個RDD,能夠將該RDD緩存起來,該RDD只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用;
CheckPoint 雖然RDD的血緣關係自然地能夠實現容錯,當RDD的某個分區數據失敗或丟失,能夠經過血緣關係重建。
可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。
爲此,RDD支持checkpoint將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於checkpoint後的RDD不須要知道它的父RDDs了,它能夠從checkpoint處拿到數據。
二.RDD的建立分佈式
1>.編程模型ide
在Spark中,RDD被表示爲對象,經過對象上的方法調用來對RDD進行轉換。
通過一系列的transformations定義RDD以後,就能夠調用actions觸發RDD的計算,action能夠是嚮應用程序返回結果(count, collect等),或者是向存儲系統保存數據(saveAsTextFile等)。
在Spark中,只有遇到action,纔會執行RDD的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。
要使用Spark,開發者須要編寫一個Driver程序,它被提交到集羣以調度運行Worker,
2>.RDD的建立
package com.yinzhengjie.bigdata.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CreateRDD { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) /** * RDD的建立: * 在Spark中建立RDD的建立方式能夠分爲三種: * (1)從集合(內存)中建立RDD; * 從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD * (2)從外部存儲建立RDD; * 包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等 * (3)從其餘RDD建立。 */ //使用SparkContext對象的parallelize方法能夠在內存中建立RDD val arrayRDD:RDD[String] = sc.parallelize(Array("yinzhengjie","JasonYin2020")) arrayRDD.collect().foreach(println) //使用SparkContext對象的makeRDD方法也能夠在內存中建立RDD,其底層實現就是parallelize方法 val listRDD:RDD[Int] = sc.makeRDD(List(100,200,300)) listRDD.collect().foreach(println) /** * 使用SparkContext對象的textFile方法從外部存儲中建立RDD * * 舒適提示: * 默認狀況下能夠讀取項目路徑,也能夠讀取其它路徑,好比HDFS,HBase對應的路徑等 * 默認從文件中讀取的數據都是字符串類型 */ val fileRDD:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\spark\\data") fileRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RDDPartition { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) /** * 使用SparkContext對象的makeRDD函數簽名以下: * def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism) : RDD[T] = withScope * * 舒適提示: * seq: * 傳入一個序列集合類型,好比List,Array * Int = defaultParallelism: * 指定分區數的並行度,傳入一個整形,不傳也能夠,即便用defaultParallelism,該值默認是您的操做系統對應的總核心數。 * */ val listRDD:RDD[String] = sc.makeRDD(List("yinzhengjie","JasonYin2020"),6) //使用6個自定義分區 //將RDD的數據保存到文件中 listRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output") /** * 使用SparkContext對象的textFile函數簽名以下: * def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope * * 舒適提示: * path: * 指定文件的路徑,能夠是本地路徑,也能夠是hdfs,hbase等路徑 * minPartitions: * 指定最小的分區數據,可是不必定是這個分區數,取決於Hadoop讀取文件時分片規則。 * */ val fileRDD:RDD[String] = sc.textFile("E:\\yinzhengjie\\bigdata\\spark\\data",2) //自定義2個分區(但實際上可能比2要大,這取決於Hadoop的分片機制) //保存文件時建議不要和源文件在同一個目錄,不然可能會出錯喲~ fileRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output2") } }
三.RDD經常使用的算子(Operate)
RDD的操做算子(Operate)包括兩類,即轉換算子(transformations operate)和actions(行動算子)。
transformations(轉換算子):
它是用來將RDD進行轉化,構建RDD的血緣關係。
actions(行動算子):
它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中。
舒適提示:
轉換算子只是對業務邏輯的封裝並無真正執行代碼,而行動算子就會真正觸發代碼的執行操做。換句話說,行動算子就是用來觸發RDD計算操做的,一旦使用了行動算子,那麼在行動算子以前的轉換算子會被觸發執行。
1>.Value類型
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(20 to 30) //遍歷listRDD listRDD.collect().foreach(println) //使用map算子(Operate),將listRDD的全部元素乘以5獲得新的RDD val mapRDD:RDD[Int] = listRDD.map(x => x * 5) //該行可簡寫爲"val mapRDD:RDD[Int] = listRDD.map(_ * 5)" //遍歷mapRDD mapRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapPartitionsOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(20 to 30) //遍歷listRDD listRDD.collect().foreach(println) /** * 使用mapPartitions算子(Operate),將listRDD的全部元素乘以5獲得新的RDD * * mapPartitionsk能夠對一個RDD中全部的分區進行遍歷,假設有N個元素,有M個分區,那麼map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理全部分區。 * * map()和mapPartition()的區別以下: * map(): * 每次處理一條數據。 * mapPartition(): * 每次處理一個分區的數據,這個分區的數據處理完後,原RDD中分區的數據才能釋放,可能致使OOM。 * 開發指導: * 當內存空間較大的時候建議使用mapPartition(),以提升處理效率。 * * 舒適提示: * mapPartitions效率優於map算子(Operate),減小了發送執行器(Executor)執行交互次數(mapPartitions的Operate是基於分區爲單位發送一次任務調度到Executor,而map的Operate是每處理一條數據就發送一次任務調度給Executor) * 若是分區的數據比執行器(Executor)的內存大,則使用mapPartitions可能會出現內存溢出(OOM),好比一個分區有12G數據,但Executor僅有10G大小,就會出現OOM現象。 * 綜上所述,到底使用map仍是mapPartitions算子(Operate)根據實際狀況而定。 */ val mapPartitionsRDD:RDD[Int] = listRDD.mapPartitions(datas => { datas.map(_ * 5) }) //遍歷mapRDD mapPartitionsRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object mapPartitionsWithIndexOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD並指定分區數爲3 val listRDD:RDD[Int] = sc.makeRDD(20 to 30,3) //遍歷listRDD listRDD.collect().foreach(println) //使用mapPartitionsWithIndex算子(Operate),將listRDD的全部元素跟所在分區造成一個元組組成一個新的RDD val tupleRDD:RDD[(Int,String)] = listRDD.mapPartitionsWithIndex{ case (numPartition,datas) => { datas.map((_,"分區編號: " + numPartition)) } } //遍歷tupleRDD tupleRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FlatMapOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD並指定分區數爲3 val listRDD:RDD[List[Int]] = sc.makeRDD(Array(List(10,20),List(60,80))) //遍歷listRDD listRDD.collect().foreach(println) //使用flatMap算子(Operate),將listRDD的全部元素扁平化,它相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) val flatMapRDD:RDD[Int] = listRDD.flatMap(x =>x) //遍歷flatMapRDD flatMapRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object GlomOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(100 to 161,4) //遍歷listRDD listRDD.collect().foreach(println) //將一個分區的數據放到一個數組中,這樣咱們能夠對其進行操做,好比求和,求最值等。 val glomRDD:RDD[Array[Int]] = listRDD.glom() //遍歷glomRDD glomRDD.collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object GroupByOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(10 to 20) //遍歷listRDD listRDD.collect().foreach(println) /** * 使用groupBy算子(Operate)進行分組,按照傳入函數(指定規則)的返回值進行分組,將相同的key對應的值放入一個迭代器。 * * 分組後的數據造成了對偶元組(K,V),K表示分組的key,V表示分組的數據集合。 * * 下面的案例就是按照元素模以2的值進行分組。 */ val groupByRDD:RDD[(Int,Iterable[Int])] = listRDD.groupBy(i => i % 2) //遍歷groupByRDD groupByRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object FilterOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(10 to 20) //遍歷listRDD listRDD.collect().foreach(println) /** * 使用filter算子(Operate)進行過濾。返回一個新的RDD,該RDD由通過func函數(按照指定的規則)計算後返回值爲true的輸入元素組成。 * * 下面的案例就是按照元素模以2的值進行過濾,即僅保留偶數。 */ val filterRDD:RDD[Int] = listRDD.filter(x => x % 2 == 0) //遍歷filterRDD filterRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SampleOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(10 to 20) //遍歷listRDD listRDD.collect().foreach(println) /** * sample算子(Operate)用以指定的隨機種子隨機抽樣出數量爲fraction的數據。 * * sample的函數簽名以下: * def sample( withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] * * 如下是相關參數說明: * withReplacement: * 表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣, * fraction: * 表示sample的打分,是一個Double類型。 * seed: * 用於指定隨機數生成器種子。 * */ val sampleRDD:RDD[Int] = listRDD.sample(false,0.7,1) //遍歷sampleRDD sampleRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object DistinctOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(List(6,6,8,1,2,1,6,9,5,6,1,8,2,7,0,7,6,3,5,4,6,0,7,1)) //遍歷listRDD listRDD.collect().foreach(println) /** * 使用distinct算子(Operate)對數據去重,可是由於去重後會致使數據減小,因此能夠自定義分區數量,默認分區數是你操做系統的真實core數量。 * */ // val distinctRDD:RDD[Int] = listRDD.distinct() val distinctRDD:RDD[Int] = listRDD.distinct(3) //爲了了看到試驗效果,建議將結果以文件的形式保存,直接打印到控制檯終端可能看不出效果喲~ // distinctRDD.collect().foreach(println) distinctRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\output") } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CoalesceOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD,指定分區數切片爲4 val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4) //遍歷listRDD // listRDD.collect().foreach(println) println("縮減分區前分區數量: " + listRDD.partitions.size) //使用coalesce算子(Operate)縮減分區數,用於大數據集過濾後,提升小數據集的執行效率。能夠簡單理解爲合併分區 val coalesceRDD:RDD[Int] = listRDD.coalesce(3) println("縮減分區後分區數量: " + coalesceRDD.partitions.size) coalesceRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\output") } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RepartitionsOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD,指定分區數切片爲4 val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4) //遍歷listRDD,發現數據是有序的 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) println("Rebalance前分區數量: " + listRDD.partitions.size) /** * 使用repartition算子(Operate)是根據分區數,從新經過網絡隨機洗牌全部數據。 * * coalesce和repartition的區別 * 1>.coalesce從新分區,能夠選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。 * 2>.repartition其實是調用的coalesce,默認是進行shuffle的。 * * 下面的案例就是對listRDD進行從新分區(將listRDD的4個分區數從新分區爲2個),生成一個新的RDD對象rebalanceRDD。 */ val rebalanceRDD:RDD[Int] = listRDD.repartition(2) println("Rebalance後分區數量:" + rebalanceRDD.partitions.size) //遍歷rebalanceRDD,此時你會發現數據並非有序的,而是被打亂啦~ rebalanceRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortByOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[Int] = sc.parallelize(List(2,1,7,6,9,3,8,5)) //遍歷listRDD listRDD.collect().foreach(println) /** * sortBy算子的函數參數列表簽名以下: * def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length) * * 經過函數簽名能夠知道咱們使用時只須要傳入一個參數便可, 其它2個參數均有默認值, * * 使用func先對數據進行處理,按照處理後的數據比較結果排序,默認爲升序(ascending: Boolean = true)。 * * 下面的案例按照自身大小進行排序,默認是升序。 */ val sortByRDD:RDD[Int] = listRDD.sortBy(x => x) //遍歷sortByRDD sortByRDD.collect().foreach( x =>{ println(x) } ) //下面的案例按照自身大小進行排序,咱們指定ascending的值爲false,排序則爲降序。 val sortByRDD2:RDD[Int] = listRDD.sortBy(x => x,false) //遍歷sortByRDD2 sortByRDD2.collect().foreach(println) } }
2>.雙Value類型交互
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object UnionOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(List(1,3,5,7,9)) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(List(2,4,6,8,10)) /** * union算子(Operate)能夠對源RDD和參數RDD求並集後返回一個新的RDD。 * * 下面的案例就是將rdd1和rdd2進行合併爲sumrdd, */ val sumRDD:RDD[Int] =rdd1.union(rdd2) //遍歷sumRDD sumRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubtractOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(10 to 20) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(15 to 30) /** * subtract算子是用來計算差的一種函數,去除兩個RDD中相同的元素,不一樣的RDD將保留下來。 * * 下面的案例就是計算第一個RDD與第二個RDD的差集並打印 */ val subtractRDD:RDD[Int] =rdd1.subtract(rdd2) //遍歷subtractRDD subtractRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object IntersectionOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(10 to 20) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(15 to 30) //使用計算兩個RDD的交集 val intersectionRDD:RDD[Int] = rdd1.intersection(rdd2) //遍歷intersectionRDD intersectionRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CartesianOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(10 to 20) //建立rdd2 val rdd2:RDD[Int] = sc.makeRDD(15 to 30) //計算兩個RDD的笛卡爾積並打印,生產環境中應該儘可能避免使用! val cartesian:RDD[(Int,Int)] = rdd1.cartesian(rdd2) //遍歷cartesian cartesian.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object ZipOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立rdd1 val rdd1:RDD[Int] = sc.parallelize(Array(100,200,300),3) //建立rdd2 val rdd2:RDD[String] = sc.makeRDD(Array("storm","spark","flink"),3) //zip算子能夠將兩個RDD組合成Key/Value形式的RDD,這裏默認兩個RDD的partition數量以及元素數量都相同,不然會拋出異常。 val zipRDD:RDD[(Int,String)] = rdd1.zip(rdd2) //遍歷zipRDD zipRDD.collect().foreach(println) } }
3>.Key-Value類型
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object PartitionByOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[(Int,String)] = sc.makeRDD(Array((1,"hdfs"),(2,"yarn"),(3,"mapreduce"),(4,"spark")),4) //遍歷rdd2 rdd1.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) println("rdd1分區數是: " + rdd1.partitions.size) /** * 對rdd1進行重分區(對rdd1進行分區操做,若是原有的rdd1和現有的rdd2分區數是一致的話就不進行分區, 不然會生成ShuffleRDD,即會產生shuffle過程。) * * 須要注意的是,partitionBy算子屬於PairRDDFunctions類,所以這裏設計到了隱式轉換喲~ * */ val rdd2:RDD[(Int,String)] = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) println("rdd2分區數是: " + rdd2.partitions.size) //遍歷rdd2 rdd2.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object GroupByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立一個數組 val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS") //建立RDD並將上面的words映射爲二元組便於後面使用grooupByKey算子 val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1)) //groupByKey也是對每一個key進行操做,但只生成一個sequence。 val groupByKeyRDD:RDD[(String,Iterable[Int])] = mapRDD.groupByKey() //遍歷groupByKeyRDD groupByKeyRDD.collect().foreach(println) //對每一個單詞進行統計 groupByKeyRDD.map(word => (word._1, word._2.sum)).collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ReduceByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立一個數組 val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS") //建立RDD並將上面的words映射爲二元組便於後面使用reduceByKey算子 val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1)) /** * 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,reduce任務的個數能夠經過第二個可選的參數來設置。 * * reduceByKey和groupByKey的區別以下: * reduceByKey: * 按照key進行聚合,在shuffle以前有combine(預聚合)操做,返回結果是RDD[k,v]. * groupByKey: * 按照key進行分組,直接進行shuffle。 * 開發指導: * reduceByKey比groupByKey建議使用,由於預聚合操做會節省帶寬傳輸,可是須要注意是否會影響業務邏輯。 */ val reduceByKeyRDD:RDD[(String,Int)] = mapRDD.reduceByKey(_+_) //遍歷reduceByKeyRDD reduceByKeyRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object AggregateByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2) //遍歷listRDD各個分區的元素 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) /** * aggregateByKey的函數簽名以下: * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope * * 做用: * 在kv對的RDD中,按key將value進行分組合並,合併時,將每一個value和初始值做爲seq函數的參數,進行計算,返回的結果做爲一個新的kv對,而後再將結果按照key進行合併; * 最後將每一個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果做爲一個新的kv對輸出。 * 參數描述: * zeroValue: * 給每個分區中的每個key一個初始值; * seqOp: * 函數用於在每個分區中用初始值逐步迭代value; * combOp: * 函數用於合併每一個分區中的結果。 * * 下面的案例爲建立一個pairRDD,取出每一個分區相同key對應值的最大值,而後相加 */ val aggregateByKeyRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(math.max(_,_),_+_) //遍歷aggregateByKeyRDD各個分區的元素 aggregateByKeyRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) //使用aggregateByKey也能夠實現相似於WordCount的功能 val wcRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(_+_,_+_) //遍歷wcRDD wcRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object FoldByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2) //遍歷listRDD各個分區的元素 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) /** * aggregateByKey與foldByKey的區別: * aggregateByKey的簡化操做,seqop和combop相同。 * 咱們會發現aggregateByKey須要傳遞2個參數,分別用於分區內和分區間的操做; * 而foldByKey只須要傳遞一個參數,由於分區內和分區間的操做相同,所以只須要傳遞一個參數便可. * * 下面的案例是計算相同key對應值的相加結果 */ val foldByKeyRDD:RDD[(String,Int)] = listRDD.foldByKey(0)(_+_) //遍歷foldByKeyRDD各個分區的元素 foldByKeyRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CombineByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) //遍歷listRDD各個分區的元素 listRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) /** * 參數: * (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) * 做用: * 對相同K,把V合併成一個集合。 * 參數描述: * createCombiner: * combineByKey() 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。 * 若是這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函數來建立那個鍵對應的累加器的初始值 * mergeValue: * 若是這是一個在處理當前分區以前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併 * mergeCombiners: * 因爲每一個分區都是獨立處理的, 所以對於同一個鍵能夠有多個累加器。 * 若是有兩個或者更多的分區都有對應同一個鍵的累加器, 就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。 * * * 下面的案例就是根據key計算每種key的均值。(先計算每一個key出現的次數以及能夠對應值的總和,再相除獲得結果) */ val combineByKeyRDD:RDD[(String,(Int,Int))] = listRDD.combineByKey( (_,1), //轉換結構,一個key第一次出現對其計數爲1 (acc:(Int,Int),v)=>(acc._1+v,acc._2+1), //定義分區內的計算規則,即相同key的vlaue相加,並將計數器加1 (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2) //定義分區間的計算規則,即將各個分區相同key的計算結果進行累加操做。 ) //遍歷combineByKeyRDD各個分區的元素 combineByKeyRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) //計算平均值 val averageValueRDD:RDD[(String,Double)] = combineByKeyRDD.map{case (key,value) => (key,value._1/value._2.toDouble)} //遍歷averageValueRDD各個分區的元素(能夠查看對應key的平均值) averageValueRDD.glom().collect().foreach( array =>{ println(array.mkString(",")) } ) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink"))) /** * sortByKey算子在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD,ascending的值默認爲true */ val positiveSequenceRDD:RDD[(Int,String)] = arrayRDD.sortByKey() positiveSequenceRDD.collect().foreach(println) /** * sortByKey算子在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD,ascending的值爲false時,順序爲倒敘。 */ val ReverseOrderRDD:RDD[(Int,String)] = arrayRDD.sortByKey(false) ReverseOrderRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object MapValuesOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink"),(1,"mapreduce"))) arrayRDD.collect().foreach(println) /** * 針對於(K,V)形式的類型只對V進行操做 * * 下面的案例就是對value添加字符串"*****" */ val mapValuesRDD:RDD[(Int,String)] = arrayRDD.mapValues(_ + "*****") mapValuesRDD.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object JoinOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink"))) val rdd2:RDD[(Int,Int)] = sc.parallelize(Array((1,30),(2,60),(3,90))) /** * 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD. */ val rdd3:RDD[(Int,(String,Int))] = rdd1.join(rdd2) rdd3.collect().foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CogroupOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink"))) val rdd2:RDD[(Int,Int)] = sc.makeRDD(Array((1,30),(2,60),(3,90))) /** * 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD.. */ val rdd3:RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2) rdd3.collect().foreach(println) } }
4>.Actions
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ReduceOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val rdd1:RDD[Int] = sc.makeRDD(1 to 100,2) val rdd2 = sc.parallelize(Array(("Hadoop",100),("Spark",300),("Flink",500),("MapReduce",700))) /** * 經過func函數彙集RDD中的全部元素,先聚合分區內數據,再聚合分區間數據。 */ val res1:Int = rdd1.reduce(_+_) val res2:(String,Int) = rdd2.reduce((x,y)=>(x._1 + "-" + y._1,x._2 + y._2)) println(res1) println(res2) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CollectOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.parallelize(1 to 100) /** * 在驅動程序中,以數組的形式返回數據集的全部元素。 */ val res:Array[Int] = rdd1.collect() res.foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CountOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.makeRDD(1 to 100) /** * 返回RDD中元素的個數 */ val count:Long = rdd1.count println(count) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FirstOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.makeRDD(50 to 100) /** * 返回RDD中的第一個元素 */ val res1:Int = rdd1.first() println(res1) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object TakeOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val rdd1:RDD[Int] = sc.makeRDD(50 to 100) /** * 返回一個由RDD的前n個元素組成的數組 */ val res1:Array[Int] = rdd1.take(5) res1.foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object TakeOrderedOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) //建立RDD val listRDD:RDD[Int] = sc.makeRDD(List(9,5,20,7,10,4,8,30,6)) /** * 返回該RDD排序後的前n個元素組成的數組 */ val res:Array[Int] = listRDD.takeOrdered(5) res.foreach(println) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object AggregateOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val rdd1:RDD[Int] = sc.parallelize(1 to 100,2) /** * aggregate函數將每一個分區裏面的元素經過seqOp和初始值進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個函數最終返回的類型不須要和RDD中元素類型一致。 * * 須要注意的是,aggregate算子在計算時,各分區內部計算須要加上初始值(zeroValue),分區間計算也會加上該初始值(zeroValue) */ val res1:Int = rdd1.aggregate(0)(_+_,_+_) val res2:Int = rdd1.aggregate(100)(_+_,_+_) println(res1) println(res2) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object FoldOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val rdd1:RDD[Int] = sc.parallelize(1 to 100,2) /** * 摺疊操做,aggregate的簡化操做,seqop和combop同樣。 */ val res1:Int = rdd1.fold(0)(_+_) val res2:Int = rdd1.fold(100)(_+_) println(res1) println(res2) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsTextFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) /** * 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 */ listRDD.saveAsTextFile("E:\\yinzhengjie\\bigdata\\spark\\text") } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsSequenceFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config:SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2) /** * 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 */ listRDD.saveAsSequenceFile("E:\\yinzhengjie\\bigdata\\spark\\sequence") } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SaveAsObjectFileOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2) /** * 用於將RDD中的元素序列化成對象,存儲到文件中。 */ listRDD.saveAsObjectFile("E:\\yinzhengjie\\bigdata\\spark\\object") } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.Map object CountByKeyOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2) /** * 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 */ val res:Map[String,Long] = listRDD.countByKey println(res) } }
package com.yinzhengjie.bigdata.spark.transformations.action import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ForeachOperate { def main(args: Array[String]): Unit = { //建立SparkConf對象 val config: SparkConf = new SparkConf() config.setMaster("local[*]") config.setAppName("WordCount") //建立Spark上下文對象 val sc = new SparkContext(config) val listRDD: RDD[Int] = sc.makeRDD(20 to 30,2) /** * 在數據集的每個元素上,運行函數func進行更新。 */ listRDD.foreach(println) } }
5>.RDD的函數傳遞
在實際開發中咱們每每須要本身定義一些對於RDD的操做,那麼此時須要主要的是,初始化工做是在Driver端進行的,而實際運行程序是在Executor端進行的,這就涉及到了跨進程通訊,是須要序列化的。
接下來咱們看下面2個案例操做。
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import java.io.Serializable /** * 傳遞Search對象時,必須得先序列化後才能在網絡傳遞,不然沒法在Exector端進行反序列化。 * */ class Search(query:String) extends Serializable { //過濾出包含字符串的數據 def isMatch(s: String): Boolean = { s.contains(query) } //過濾出包含字符串的RDD def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } //過濾出包含字符串的RDD def getMatche2(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) } } object SerializableableMethod { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一個RDD val rdd: RDD[String] = sc.parallelize(Array("Hadoop", "Spark", "Hive", "Storm")) //3.建立一個Search對象,該過程是在Driver端執行的 val search = new Search("S") //4.運用第一個過濾函數並打印結果,該過程是在Exector端執行的,所以須要將Driver端的Search對象傳遞給Exector,這意味着Search對象必須是序列化的,不然就會報錯喲(Caused by: java.io.NotSerializableException: com.yinzhengjie.bigdata.spark.rdd.functionTransfer.Search) val match1: RDD[String] = search.getMatch1(rdd) match1.collect().foreach(println) //5.釋放資源 sc.stop() } }
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD /** * 僅傳遞字符串時,無需進行序列化操做喲~ */ class Search(query:String) { //過濾出包含字符串的數據 def isMatch(s: String): Boolean = { s.contains(query) } //過濾出包含字符串的RDD def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } //過濾出包含字符串的RDD def getMatche2(rdd: RDD[String]): RDD[String] = { // rdd.filter(x => x.contains(query)) val query_ : String = this.query //將類變量賦值給局部變量,該代碼是在Driver端執行 rdd.filter(x => x.contains(query_)) //該代碼在Exector端執行,所以query_這個成員變量屬性須要傳遞過來,而query_自己就是字符串,所以無需序列化。 } } object SerializableableAttribute { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一個RDD val rdd: RDD[String] = sc.makeRDD(Array("Hadoop", "Spark", "Hive", "Storm")) //3.建立一個Search對象 val search = new Search("o") //4.運用第一個過濾函數並打印結果 val match1: RDD[String] = search.getMatche2(rdd) match1.collect().foreach(println) //5.釋放資源 sc.stop() } }
四.RDD依賴關係
1>.Lineage(血統)
package com.yinzhengjie.bigdata.spark.dependent import org.apache.spark.rdd.RDD import org.apache.spark.{Dependency, SparkConf, SparkContext} /** * RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。 * * RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。 */ object Lineage { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一個RDD val listRDD: RDD[Int] = sc.parallelize(1 to 10) //3.將listRDD射成一個個元組 val mapRDD: RDD[(Int,Int)] = listRDD.map((_,1)) //4.統計每一種key對應的個數 val reduceRDD:RDD[(Int,Int)] = mapRDD.reduceByKey(_+_) /** * 5 >.查看reduceRDD的Lineage(血統) * * RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。 * 窄依賴 * 窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴咱們形象的比喻爲獨生子女。 * 寬依賴 * 寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引發shuffle操做,寬依賴咱們形象的比喻爲超生。 */ val lineage:String = reduceRDD.toDebugString println(lineage) //6>.查看依賴類型 val dependencies:Seq[Dependency[_]] = reduceRDD.dependencies println(dependencies) //5.釋放資源 sc.stop() } }
2>.DAG
DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage;
對於窄依賴,partition的轉換處理在Stage中完成計算;
對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據;
以下圖所示,是某個job分爲3個階段(stage),窄依賴能夠放在同一個階段(stage),而寬依賴因爲shuffle的存在所以不能放在同一個階段(state)中:
A和B:
groupBy操做是寬依賴,存在shuffle操做。
F和G:
join操做是寬依賴,存在shuffle操做。
B和G:
是窄依賴,覺得B的各個分區和G的分區惟一對應。
E和F,D和F,C和D:
map和union均沒有shuffle操做,所以均是宅依賴,所以他們能夠在同一個階段(stage)。
舒適提示:
寬依賴有shufle操做,窄依賴沒有shuffle操做,所以咱們能夠將宅依賴放在同一個階段執行,而寬依賴則須要分開不一樣的階段操做,由於寬依賴要作shuffle的前提是須要依賴上一個階段的執行結果。
因爲窄依賴不須要等待,就能夠利用並行的概念來執行數據,從而提高效率。
3>.任務規劃
RDD任務切分中間分爲:Application、Job、Stage和Task。
Application:
初始化一個SparkContext即生成一個Application
Job:
一個Action算子就會生成一個Job
Stage:
根據RDD之間的依賴關係的不一樣將Job劃分紅不一樣的Stage,遇到一個寬依賴則劃分一個Stage。
Task:
Stage是一個TaskSet,將Stage劃分的結果發送到不一樣的Executor執行即爲一個Task。
舒適提示:
Application->Job->Stage-> Task每一層都是1對n的關係。
五.RDD緩存
1>.RDD緩存概述
RDD經過persist方法或cache方法能夠將前面的計算結果緩存,默認狀況下 persist()會把數據以序列化的形式緩存在JVM的堆空間中。
可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
以下圖所示,在存儲級別的末尾加上"_2"來把持久化數據存爲兩份。
緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。
2>.緩存代碼實現案例
package com.yinzhengjie.bigdata.spark.cache import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CacheDemo { def main(args: Array[String]): Unit = { //1.初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //2.建立一個RDD val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020")) //3.將RDD轉換爲攜帶當前時間戳不作緩存 val nocache = listRDD.map(_.toString+System.currentTimeMillis) //4>.查看nocache的Lineage(血統)關係 System.out.println(nocache.toDebugString) //5>.屢次打印無緩存結果 nocache.collect.foreach(println) nocache.collect.foreach(println) nocache.collect.foreach(println) nocache.collect.foreach(println) nocache.collect.foreach(println) //6>.將RDD轉換爲攜帶當前時間戳並作緩存 val cache = listRDD.map(_.toString+System.currentTimeMillis).cache //7>.查看cache的Lineage(血統)關係 System.out.println(cache.toDebugString) //8>.屢次打印緩存結果 cache.collect.foreach(println) cache.collect.foreach(println) cache.collect.foreach(println) cache.collect.foreach(println) cache.collect.foreach(println) //9.釋放資源 sc.stop() } }
六.RDD 檢查點(CheckPoint)
1>.檢查點概述
Spark中對於數據的保存除了持久化操做以外,還提供了一種檢查點的機制,檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。
檢查點經過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。爲當前RDD設置檢查點,該函數將會建立一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。
在checkpoint的過程當中,該RDD的全部依賴於父RDD中的信息將所有被移除。對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。
2>.檢查點代碼實現案例
package com.yinzhengjie.bigdata.spark.cache import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object CheckpointDemo { def main(args: Array[String]): Unit = { //初始化配置信息及SparkContext val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(sparkConf) //設置檢查點的保存目錄,實際工做中應該使用hdfs路徑,本地目錄通常用於測試使用 sc.setCheckpointDir("E:\\yinzhengjie\\bigdata\\spark\\checkpoint") //建立一個RDD val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020")) //將RDD轉換爲攜帶當前時間戳 val nocache = listRDD.map(_.toString+System.currentTimeMillis) //設置檢查點,數據會被持久化到sc上定義的檢查點保存目錄 nocache.checkpoint() //使用行動算子屢次打印結果 nocache.collect().foreach(println) nocache.collect().foreach(println) nocache.collect().foreach(println) nocache.collect().foreach(println) nocache.collect().foreach(println) //查看Lineage(血統) println(nocache.toDebugString) //釋放資源 sc.stop() } }