fold 操做 區別 與 co 1.mapValus 2.flatMapValues 3.comineByKey 4.foldByKey 5.reduceByKey 6.groupByKey 7.sortByKey 8.cogroup 9.join 10.LeftOutJoin 11.RightOutJoin 1.map(func) 2.flatMap(func) 3.mapPartitions(func) 4.mapPartitionsWithIndex(func) 5.simple(withReplacement,fraction,seed) 6.union(ortherDataset) 7.intersection(otherDataset) 8.distinct([numTasks]) 9.cartesian(otherDataset) 10.coalesce(numPartitions,shuffle) 11.repartition(numPartition) 12.glom() 13.randomSplit(weight:Array[Double],seed)
RDD簡介 在集羣背後,有一個很是重要的分佈式數據架構,即彈性分佈式數據集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是對分佈式內存的抽象使用,實現了以操做本地集合的方式來操做分佈式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的並可以被並行操做的數據集合,不一樣的數據集格式對應不一樣的RDD實現。RDD必須是可序列化的。RDD能夠cache到內存中,每次對RDD數據集的操做以後的結果,均可以存放到內存中,下一個操做能夠直接從內存中輸入,省去了MapReduce大量的磁盤IO操做。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來講,效率提高比較大。 (1)RDD的特色 1)建立:只能經過轉換 ( transformation ,如map/filter/groupBy/join 等,區別於動做 action) 從兩種數據源中建立 RDD 1 )穩定存儲中的數據; 2 )其餘 RDD。 2)只讀:狀態不可變,不能修改。 3)分區:支持使 RDD 中的元素根據那個 key 來分區 ( partitioning ) ,保存到多個結點上。還原時只會從新計算丟失分區的數據,而不會影響整個系統。 4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的信息關於它是如何從其餘 RDD 產生而來的。 5)持久化:支持將會被重用的 RDD 緩存 ( 如 in-memory 或溢出到磁盤 )。 6)延遲計算: Spark 也會延遲計算 RDD ,使其可以將轉換管道化 (pipeline transformation)。 7)操做:豐富的轉換(transformation)和動做 ( action ) , count/reduce/collect/save 等。 執行了多少次transformation操做,RDD都不會真正執行運算(記錄lineage),只有當action操做被執行時,運算纔會觸發。 (2)RDD的好處 1)RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint。 2)RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。 3)RDD的數據分區特性,能夠經過數據的本地性來提升性能,這不Hadoop MapReduce是同樣的。 4)RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce。 5)批量操做:任務可以根據數據本地性 (data locality) 被分配,從而提升性能。 (3)RDD的內部屬性 經過RDD的內部屬性,用戶能夠獲取相應的元數據信息。經過這些信息能夠支持更復雜的算法或優化。 1)分區列表:經過分區列表能夠找到一個RDD中包含的全部分區及其所在地址。 2)計算每一個分片的函數:經過函數能夠對每一個數據塊進行RDD須要進行的用戶自定義函數運算。 3)對父RDD的依賴列表,依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。 4)可選:key-value型的RDD是根據哈希來分區的,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce。 5)可選:每個分片的優先計算位置(preferred locations),好比HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,能夠將處理的分區「本地化」) [java] view plain copy //只計算一次 protected def getPartitions: Array[Partition] //對一個分片進行計算,得出一個可遍歷的結果 def compute(split: Partition, context: TaskContext): Iterator[T] //只計算一次,計算RDD對父RDD的依賴 protected def getDependencies: Seq[Dependency[_]] = deps //可選的,分區的方法,針對第4點,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce @transient val partitioner: Option[Partitioner] = None //可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil (4)RDD的存儲與分區 1)用戶能夠選擇不一樣的存儲級別存儲RDD以便重用。 2)當前RDD默認是存儲於內存,但當內存不足時,RDD會spill到disk。 3)RDD在須要進行分區把數據分佈於集羣中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。 RDD根據useDisk、useMemory、useOffHeap、deserialized、replication參數的組合定義瞭如下存儲級別: [java] view plain copy //存儲等級定義: val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) (5)RDD的容錯機制 RDD的容錯機制實現分佈式數據集容錯方法有兩種:數據檢查點和記錄更新,RDD採用記錄更新的方式:記錄全部更新點的成本很高。因此,RDD只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操做,而後建立某個RDD的變換序列(血統 lineage)存儲下來;變換序列指,每一個RDD都包含了它是如何由其餘RDD變換過來的以及如何重建某一塊數據的信息。所以RDD的容錯機制又稱「血統」容錯。 要實現這種「血統」容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關係。實際上依賴關係能夠分兩種,窄依賴和寬依賴。窄依賴:子RDD中的每一個數據塊只依賴於父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊能夠依賴於父RDD中的全部數據塊。例如:map變換,子RDD中的數據塊只依賴於父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴於多塊父RDD中的數據塊,由於一個key可能分佈於父RDD的任何一個數據塊中, 將依賴關係分類的兩個特性:第一,窄依賴能夠在某個計算節點上直接經過計算父RDD的某塊數據計算獲得子RDD對應的某塊數據;寬依賴則要等到父RDD全部數據都計算完成以後,而且父RDD的計算結果進行hash並傳到對應節點上以後才能計算子RDD。第二,數據丟失時,對於窄依賴只須要從新計算丟失的那一塊數據來恢復;對於寬依賴則要將祖先RDD中的全部數據塊所有從新計算來恢復。因此在「血統」鏈特別是有寬依賴的時候,須要在適當的時機設置數據檢查點。也是這兩個特性要求對於不一樣依賴關係要採起不一樣的任務調度機制和容錯恢復機制。 (6)Spark計算工做流 圖1-5中描述了Spark的輸入、運行轉換、輸出。在運行轉換中經過算子對RDD進行轉換。算子是RDD中定義的函數,能夠對RDD中的數據進行轉換和操做。 ·輸入:在Spark程序運行中,數據從外部數據空間(例如,HDFS、Scala集合或數據)輸入到Spark,數據就進入了Spark運行時數據空間,會轉化爲Spark中的數據塊,經過BlockManager進行管理。 ·運行:在Spark數據輸入造成RDD後,即可以經過變換算子fliter等,對數據操做並將RDD轉化爲新的RDD,經過行動(Action)算子,觸發Spark提交做業。若是數據須要複用,能夠經過Cache算子,將數據緩存到內存。 ·輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分佈式存儲中(如saveAsTextFile輸出到HDFS)或Scala數據或集合中(collect輸出到Scala集合,count返回Scala Int型數據)。 Spark的核心數據模型是RDD,但RDD是個抽象類,具體由各子類實現,如MappedRDD、ShuffledRDD等子類。Spark將經常使用的大數據操做都轉化成爲RDD的子類。 RDD編程模型 來看一段代碼:textFile算子從HDFS讀取日誌文件,返回「file」(RDD);filter算子篩出帶「ERROR」的行,賦給 「errors」(新RDD);cache算子把它緩存下來以備將來使用;count算子返回「errors」的行數。RDD看起來與Scala集合類型 沒有太大差異,但它們的數據和運行模型大相迥異。 上圖給出了RDD數據模型,並將上例中用到的四個算子映射到四種算子類型。Spark程序工做在兩個空間中:Spark RDD空間和Scala原生數據空間。在原生數據空間裏,數據表現爲標量(scalar,即Scala基本類型,用橘色小方塊表示)、集合類型(藍色虛線 框)和持久存儲(紅色圓柱)。 下圖描述了Spark運行過程當中經過算子對RDD進行轉換, 算子是RDD中定義的函數,能夠對RDD中的數據進行轉換和操做。 圖1 兩個空間的切換,四類不一樣的RDD算子 輸入算子(橘色箭頭)將Scala集合類型或存儲中的數據吸入RDD空間,轉爲RDD(藍色實線框)。輸入算子的輸入大體有兩類:一類針對 Scala集合類型,如parallelize;另外一類針對存儲數據,如上例中的textFile。輸入算子的輸出就是Spark空間的RDD。 由於函數語義,RDD通過變換(transformation)算子(藍色箭頭)生成新的RDD。變換算子的輸入和輸出都是RDD。RDD會被劃分 成不少的分區 (partition)分佈到集羣的多個節點中,圖1用藍色小方塊表明分區。注意,分區是個邏輯概念,變換先後的新舊分區在物理上多是同一塊內存或存 儲。這是很重要的優化,以防止函數式不變性致使的內存需求無限擴張。有些RDD是計算的中間結果,其分區並不必定有相應的內存或存儲與之對應,若是須要 (如以備將來使用),能夠調用緩存算子(例子中的cache算子,灰色箭頭表示)將分區物化(materialize)存下來(灰色方塊)。 一部分變換算子視RDD的元素爲簡單元素,分爲以下幾類: 輸入輸出一對一(element-wise)的算子,且結果RDD的分區結構不變,主要是map、flatMap(map後展平爲一維RDD); 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union(兩個RDD合爲一個)、coalesce(分區減小); 從輸入中選擇部分元素的算子,如filter、distinct(去除冗餘元素)、subtract(本RDD有、它RDD無的元素留下來)和sample(採樣)。 另外一部分變換算子針對Key-Value集合,又分爲: 對單個RDD作element-wise運算,如mapValues(保持源RDD的分區方式,這與map不一樣); 對單個RDD重排,如sort、partitionBy(實現一致性的分區劃分,這個對數據本地性優化很重要,後面會講); 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey; 對兩個RDD基於key進行join和重組,如join、cogroup。 後三類操做都涉及重排,稱爲shuffle類操做。 從RDD到RDD的變換算子序列,一直在RDD空間發生。這裏很重要的設計是lazy evaluation:計算並不實際發生,只是不斷地記錄到元數據。元數據的結構是DAG(有向無環圖),其中每個「頂點」是RDD(包括生產該RDD 的算子),從父RDD到子RDD有「邊」,表示RDD間的依賴性。Spark給元數據DAG取了個很酷的名字,Lineage(世系)。這個 Lineage也是前面容錯設計中所說的日誌更新。 Lineage一直增加,直到趕上行動(action)算子(圖1中的綠色箭頭),這時 就要evaluate了,把剛纔累積的全部算子一次性執行。行動算子的輸入是RDD(以及該RDD在Lineage上依賴的全部RDD),輸出是執行後生 成的原生數據,多是Scala標量、集合類型的數據或存儲。當一個算子的輸出是上述類型時,該算子必然是行動算子,其效果則是從RDD空間返回原生數據空間。 RDD運行邏輯 如圖所示,在Spark應用中,整個執行流程在邏輯上運算之間會造成有向無環圖。Action算子觸發以後會將全部累積的算子造成一個有向無環圖,而後由調度器調度該圖上的任務進行運算。Spark的調度方式與MapReduce有所不一樣。Spark根據RDD之間不一樣的依賴關係切分造成不一樣的階段(Stage),一個階段包含一系列函數進行流水線執行。圖中的A、B、C、D、E、F、G,分別表明不一樣的RDD,RDD內的一個方框表明一個數據塊。數據從HDFS輸入Spark,造成RDD A和RDD C,RDD C上執行map操做,轉換爲RDD D,RDD B和RDD F進行join操做轉換爲G,而在B到G的過程當中又會進行Shuffle。最後RDD G經過函數saveAsSequenceFile輸出保存到HDFS中。 RDD依賴關係 RDD的依賴關係以下圖所示: 窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每一個分區都只被子 RDD 的一個分區所使用,例如map、filter。相應的,那麼寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴,例如groupByKey、reduceByKey等操做。若是父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,不然的話就是寬依賴。 這種劃分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基於一對一的關係,能夠在 filter 以後執行 map 。其次,窄依賴支持更高效的故障還原。由於對於窄依賴,只有丟失的父 RDD 的分區須要從新計算。而對於寬依賴,一個結點的故障可能致使來自全部父 RDD 的分區丟失,所以就須要徹底從新執行。所以對於寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出同樣。 特別說明:對於join操做有兩種狀況,若是join操做的使用每一個partition僅僅和已知的Partition進行join,此時的join操做就是窄依賴;其餘狀況的join操做就是寬依賴;由於是肯定的Partition數量的依賴關係,因此就是窄依賴,得出一個推論,窄依賴不只包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨着RDD數據規模的改變而改變) 如何劃分Stage以下圖所示: Stage劃分的依據就是寬依賴,何時產生寬依賴呢?例如reduceByKey,groupByKey等Action。 1.從後往前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到Stage中; 2.每一個Stage裏面的Task的數量是由該Stage中最後一個RDD的Partition數量決定的; 3.最後一個Stage裏面的任務的類型是ResultTask,前面全部其餘Stage裏面的任務類型都是ShuffleMapTask; 4.表明當前Stage的算子必定是該Stage的最後一個計算步驟; 補充:Hadoop中的MapReduce操做中的Mapper和Reducer在Spark中基本等量算子是:map、reduceByKey;在一個Stage內部,首先是算子合併,也就是所謂的函數式編程的執行的時候最終進行函數的展開從而把一個Stage內部的多個算子合併成爲一個大算子(其內部包含了當前Stage中全部算子對數據的計算邏輯);其次是因爲Transformation操做的Lazy特性!!在具體算子交給集羣的Executor計算以前,首先會經過Spark Framework(DAGScheduler)進行算子的優化。 RDD如何操做 (1)RDD的建立方式 1)從Hadoop文件系統(或與Hadoop兼容的其餘持久化存儲系統,如Hive、Cassandra、HBase)輸入(例如HDFS)建立。 2)從父RDD轉換獲得新RDD。 3)經過parallelize或makeRDD將單機數據建立爲分佈式RDD。 (2)RDD的兩種操做算子 對於RDD能夠有兩種操做算子:轉換(Transformation)與行動(Action)。 1)轉換(Transformation):Transformation操做是延遲計算的,也就是說從一個RDD轉換生成另外一個RDD的轉換操做不是立刻執行,須要等到有Action操做的時候纔會真正觸發運算。 2)行動(Action):Action算子會觸發Spark提交做業(Job),並將數據輸出Spark系統。 1.Transformation具體內容: 2.Action具體內容: 總結 相比MapReduce,Spark提供了更加優化和複雜的執行流。讀者還能夠深刻了解Spark的運行機制與Spark算子,這樣能更加直觀地瞭解API的使用。Spark提供了更加豐富的函數式算子,這樣就爲Spark上層組件的開發奠基了堅實的基礎。後續文章將詳細介紹Spark算子源代碼及示例。
最近在閱讀源碼,發現這篇博客內容很是好,有助於快速理解代碼。 一、什麼是RDD? 上一章講了Spark提交做業的過程,這一章咱們要講RDD。簡單的講,RDD就是Spark的input,知道input是啥吧,就是輸入的數據。 RDD的全名是Resilient Distributed Dataset,意思是容錯的分佈式數據集,每個RDD都會有5個特徵: 一、有一個分片列表。就是能被切分,和hadoop同樣的,可以切分的數據才能並行計算。 二、有一個函數計算每個分片,這裏指的是下面會提到的compute函數。 三、對其餘的RDD的依賴列表,依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。 四、可選:key-value型的RDD是根據哈希來分區的,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce。 五、可選:每個分片的優先計算位置(preferred locations),好比HDFS的block的所在位置應該是優先計算的位置。 對應着上面這幾點,咱們在RDD裏面能找到這4個方法和1個屬性,彆着急,下面咱們會慢慢展開說這5個東東。 //只計算一次 protected def getPartitions: Array[Partition] //對一個分片進行計算,得出一個可遍歷的結果 def compute(split: Partition, context: TaskContext): Iterator[T] //只計算一次,計算RDD對父RDD的依賴 protected def getDependencies: Seq[Dependency[_]] = deps //可選的,分區的方法,針對第4點,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce @transient val partitioner: Option[Partitioner] = None //可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil 二、多種RDD之間的轉換 下面用一個實例講解一下吧,就拿咱們經常使用的一段代碼來說吧,而後會把咱們經常使用的RDD都會講到。 val hdfsFile = sc.textFile(args(1)) val flatMapRdd = hdfsFile.flatMap(s => s.split(" ")) val filterRdd = flatMapRdd.filter(_.length == 2) val mapRdd = filterRdd.map(word => (word, 1)) val reduce = mapRdd.reduceByKey(_ + _) 這裏涉及到不少個RDD,textFile是一個HadoopRDD通過map後的MappredRDD,通過flatMap是一個FlatMappedRDD,通過filter方法以後生成了一個FilteredRDD,通過map函數以後,變成一個MappedRDD,經過隱式轉換成 PairRDD,最後通過reduceByKey。 咱們首先看textFile的這個方法,進入SparkContext這個方法,找到它。 def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) } 看它的輸入參數,path,TextInputFormat,LongWritable,Text,同志們聯想到什麼?寫過mapreduce的童鞋都應該知道哈。 一、hdfs的地址 二、InputFormat的類型 三、Mapper的第一個類型 四、Mapper的第二類型 這就不難理解爲何立馬就對hadoopFile後面加了一個map方法,取pair的第二個參數了,最後在shell裏面咱們看到它是一個MappredRDD了。 那麼如今若是你們要用的不是textFile,而是一個別的hadoop文件類型,你們會不會使用hadoopFile來獲得本身要獲得的類型呢,不要告訴我不會哈,不會的趕忙回去複習mapreduce。 言歸正傳,默認的defaultMinPartitions的2過小了,咱們用的時候仍是設置大一點吧。 2.1 HadoopRDD 咱們繼續追殺下去,看看hadoopFile方法,裏面咱們看到它作了3個操做。 一、把hadoop的配置文件保存到廣播變量裏。 二、設置路徑的方法 三、new了一個HadoopRDD返回 好,咱們接下去看看HadoopRDD這個類吧,咱們重點看看它的getPartitions、compute、getPreferredLocations。 先看getPartitions,它的核心代碼以下: val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } 它調用的是inputFormat自帶的getSplits方法來計算分片,而後把分片HadoopPartition包裝到到array裏面返回。 這裏順便順帶提一下,由於1.0又出來一個NewHadoopRDD,它使用的是mapreduce新api的inputformat,getSplits就不要有minPartitions了,別的邏輯都是同樣的,只是使用的類有點區別。 咱們接下來看compute方法,它的輸入值是一個Partition,返回是一個Iterator[(K, V)]類型的數據,這裏面咱們只須要關注2點便可。 一、把Partition轉成HadoopPartition,而後經過InputSplit建立一個RecordReader 二、重寫Iterator的getNext方法,經過建立的reader調用next方法讀取下一個值。 // 轉換成HadoopPartition val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null val jobConf = getJobConf() val inputFormat = getInputFormat(jobConf) context.stageId, theSplit.index, context.attemptId.toInt, jobConf) // 經過Inputform的getRecordReader來建立這個InputSpit的Reader reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // 調用Reader的next方法 val key: K = reader.createKey() val value: V = reader.createValue() override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) } 從這裏咱們能夠看得出來compute方法是經過分片來得到Iterator接口,以遍歷分片的數據。 getPreferredLocations方法就更簡單了,直接調用InputSplit的getLocations方法得到所在的位置。 2.2 依賴 下面咱們看RDD裏面的map方法 def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) 直接new了一個MappedRDD,還把匿名函數f處理了再傳進去,咱們繼續追殺到MappedRDD。 private[spark] class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) extends RDD[U](prev) { override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f) } MappedRDD把getPartitions和compute給重寫了,並且都用到了firstParent[T],這個firstParent是何必人也?咱們能夠先點擊進入RDD[U](prev)這個構造函數裏面去。 def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) 就這樣你會發現它把RDD複製給了deps,HadoopRDD成了MappedRDD的父依賴了,這個OneToOneDependency是一個窄依賴,子RDD直接依賴於父RDD,繼續看firstParent。 protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]] } 由此咱們能夠得出兩個結論: 一、getPartitions直接沿用了父RDD的分片信息 二、compute函數是在父RDD遍歷每一行數據時套一個匿名函數f進行處理 好吧,如今咱們能夠理解compute函數真正是在幹嗎的了 它的兩個顯著做用: 一、在沒有依賴的條件下,根據分片的信息生成遍歷數據的Iterable接口 二、在有前置依賴的條件下,在父RDD的Iterable接口上給遍歷每一個元素的時候再套上一個方法 咱們看看點擊進入map(f)的方法進去看一下 def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] { def hasNext = self.hasNext def next() = f(self.next()) } 看黃色的位置,看它的next函數,不得不說,寫得真的很妙! 咱們接着看RDD的flatMap方法,你會發現它和map函數幾乎沒什麼區別,只是RDD變成了FlatMappedRDD,可是flatMap和map的效果仍是差異挺大的。 好比((1,2),(3,4)), 若是是調用了flatMap函數,咱們訪問到的就是(1,2,3,4)4個元素;若是是map的話,咱們訪問到的就是(1,2),(3,4)兩個元素。 有興趣的能夠去看看FlatMappedRDD和FilteredRDD這裏就不講了,和MappedRDD相似。 2.3 reduceByKey 前面的RDD轉換都簡單,但是到了reduceByKey可就不簡單了哦,由於這裏有一個同相同key的內容聚合的一個過程,因此它是最複雜的那一類。 那reduceByKey這個方法在哪裏呢,它在PairRDDFunctions裏面,這是個隱式轉換,因此比較隱蔽哦,你在RDD裏面是找不到的。 def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } 它調用的是combineByKey方法,過程過程蠻複雜的,摺疊起來,喜歡看的人看看吧。 def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { // 通常的RDD的partitioner是None,這個條件不成立,即便成立只須要對這個數據作一次按key合併value的操做便可 self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { // 默認是走的這個方法,須要map端的combinber. val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // 不須要map端的combine,直接就來shuffle val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } } 按照一個比較標準的流程來看的話,應該是走的中間的這條路徑,它幹了三件事: 一、給每一個分片的數據在外面套一個combineValuesByKey方法的MapPartitionsRDD。 二、用MapPartitionsRDD來new了一個ShuffledRDD出來。 三、對ShuffledRDD作一次combineCombinersByKey。 下面咱們先看MapPartitionsRDD,我把和別的RDD有別的兩行給拿出來了,很明顯的區別,f方法是套在iterator的外邊,這樣才能對iterator的全部數據作一個合併。 override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def compute(split: Partition, context: TaskContext) = f(context, split.index, firstParent[T].iterator(split, context)) } 接下來咱們看Aggregator的combineValuesByKey的方法吧。 def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { // 是否使用外部排序,是由參數spark.shuffle.spill,默認是true if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // 用map來去重,用update方法來更新值,若是沒值的時候,返回值,若是有值的時候,經過mergeValue方法來合併 // mergeValue方法就是咱們在reduceByKey裏面寫的那個匿名函數,在這裏就是(_ + _) while (iter.hasNext) { kv = iter.next() combiners.changeValue(kv._1, update) } combiners.iterator } else { // 用了一個外部排序的map來去重,就不停的往裏面插入值便可,基本原理和上面的差很少,區別在於須要外部排序 val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { val (k, v) = iter.next() combiners.insert(k, v) } combiners.iterator } 這個就是一個很典型的按照key來作合併的方法了,咱們繼續看ShuffledRDD吧。 ShuffledRDD和以前的RDD很明顯的特徵是 一、它的依賴傳了一個Nil(空列表)進去,表示它沒有依賴。 二、它的compute計算方式比較特別,這個在以後的文章說,過程比較複雜。 三、它的分片默認是採用HashPartitioner,數量和前面的RDD的分片數量同樣,也能夠不同,咱們能夠在reduceByKey的時候多傳一個分片數量便可。 在new完ShuffledRDD以後又來了一遍mapPartitionsWithContext,不過調用的匿名函數變成了combineCombinersByKey。 combineCombinersByKey和combineValuesByKey的邏輯基本相同,只是輸入輸出的類型有區別。combineCombinersByKey只是作單純的合併,不會對輸入輸出的類型進行改變,combineValuesByKey會把iter[K, V]的V值變成iter[K, C]。 case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) ...... } 這個方法會根據咱們傳進去的匿名方法的參數的類型作一個自動轉換。 到這裏,做業都沒有真正執行,只是將RDD各類嵌套,咱們經過RDD的id和類型的變化觀測到這一點,RDD[1]->RDD[2]->RDD[3]...... 三、其它RDD 日常咱們除了從hdfs上面取數據以後,咱們還可能從數據庫裏面取數據,那怎麼辦呢?不要緊,有個JdbcRDD! val rdd = new JdbcRDD( sc, () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 100, 3, (r: ResultSet) => { r.getInt(1) } ).cache() 前幾個參數你們都懂,咱們重點說一下後面1, 100, 3是咋回事? 在這個JdbcRDD裏面它默認咱們是會按照一個long類型的字段對數據進行切分,(1,100)分別是最小值和最大值,3是分片的數量。 好比咱們要一次查ID爲1-1000,000的的用戶,分紅10個分片,咱們就填(1, 1000,000, 10)便可,在sql語句裏面還必須有"? <= ID AND ID <= ?"的句式,別嘗試着本身造句哦! 最後是怎麼處理ResultSet的方法,本身愛怎麼處理怎麼處理去吧。不過確實覺着用得不方便的能夠本身重寫一個RDD。 小結: 這一章重點介紹了各類RDD那5個特徵,以及RDD之間的轉換,但願你們能夠對RDD有更深刻的瞭解,下一章咱們將要講做業的運行過程,敬請關注! 岑玉海 轉載請註明出處,謝謝!