spark RDD的原理

RDD詳解

RDD(Resilient Distributed Datasets彈性分佈式數據集),是spark中最重要的概念,能夠簡單的把RDD理解成一個提供了許多操做接口的數據集合,和通常數據集不一樣的是,其實際數據分佈存儲於一批機器中(內存或磁盤中)。固然,RDD確定不會這麼簡單,它的功能還包括容錯、集合內的數據能夠並行處理等。圖1是RDD類的視圖。
這裏寫圖片描述
圖1html

一個簡單的例子

下面是一個實用scala語言編寫的spark應用(摘自Apache Spark 社區https://spark.apache.org/docs/latest/quick-start.html)。apache

/* SimpleApp.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf


object SimpleApp {

def main(args: Array[String]) {

val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system

val conf = new SparkConf().setAppName("Simple Application") //設置程序名字

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2).cache() //加載文件爲RDD,並緩存

val numAs = logData.filter(line => line.contains("a")).count()//包含a的行數

val numBs = logData.filter(line => line.contains("b")).count()//包含b的行數

println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

    }

}

這個程序只是簡單的對輸入文件README.md包含’a’和’b’的行分別計數。固然若是你想運行這個程序,須要把YOUR_SPARK_HOME替換爲Spark的安裝目錄。程序中定義了一個RDD:logData,並調用cache,把RDD數據緩存在內存中,這樣能防止重複加載文件。filter是RDD提供的一種操做,它能過濾出符合條件的數據,count是RDD提供的另外一個操做,它能返回RDD數據集中的記錄條數。編程

RDD操做類型

上述例子介紹了兩種RDD的操做:filter與count;事實上,RDD還提供了許多操做方法,如map,groupByKey,reduce等操做。RDD的操做類型分爲兩類,轉換(transformations),它將根據原有的RDD建立一個新的RDD;行動(actions),對RDD操做後把結果返回給driver。例如,map是一個轉換,它把數據集中的每一個元素通過一個方法處理後返回一個新的RDD;而reduce則是一個action,它收集RDD的全部數據後通過一些方法的處理,最後把結果返回給driver。

RDD的全部轉換操做都是lazy模式,即Spark不會馬上計算結果,而只是簡單的記住全部對數據集的轉換操做。這些轉換隻有遇到action操做的時候纔會開始計算。這樣的設計使得Spark更加的高效,例如,對一個輸入數據作一次map操做後進行reduce操做,只有reduce的結果返回給driver,而不是把數據量更大的map操做後的數據集傳遞給driver。

下面分別是transformations和action類型的操做。

  • Transformations類型的操做
    這裏寫圖片描述緩存

  • Action類型的操做
    這裏寫圖片描述markdown

更多RDD的操做描述和編程方法請參考社區文檔:https://spark.apache.org/docs/latest/programming-guide.html架構

RDD底層實現原理

RDD是一個分佈式數據集,顧名思義,其數據應該分部存儲於多臺機器上。事實上,每一個RDD的數據都以Block的形式存儲於多臺機器上,下圖是Spark的RDD存儲架構圖,其中每一個Executor會啓動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點的BlockManagerMaster保存。BlockManagerSlave生成Block後向BlockManagerMaster註冊該Block,BlockManagerMaster管理RDD與Block的關係,當RDD再也不須要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。分佈式

這裏寫圖片描述
圖2 RDD存儲原理ide

RDD cache的原理

RDD的轉換過程當中,並非每一個RDD都會存儲,若是某個RDD會被重複使用,或者計算其代價很高,那麼能夠經過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?性能

RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,經過CacheManager把RDD計算出來,並存儲到BlockManager中,下次獲取該RDD的數據時即可直接經過CacheManager從BlockManager讀出。ui

RDD dependency與DAG

RDD提供了許多轉換操做,每一個轉換操做都會生成新的RDD,這是新的RDD便依賴於原有的RDD,這種RDD之間的依賴關係最終造成了DAG(Directed Acyclic Graph)。

RDD之間的依賴關係分爲兩種,分別是NarrowDependency與ShuffleDependency,其中ShuffleDependency爲子RDD的每一個Partition都依賴於父RDD的全部Partition,而NarrowDependency則只依賴一個或部分的Partition。下圖的groupBy與join操做是ShuffleDependency,map和union是NarrowDependency。

這裏寫圖片描述
圖3 RDD dependency

RDD partitioner與並行度

每一個RDD都有Partitioner屬性,它決定了該RDD如何分區,固然Partition的個數還將決定每一個Stage的Task個數。當前Spark須要應用設置Stage的並行Task個數(配置項爲:spark.default.parallelism),在未設置的狀況下,子RDD會根據父RDD的Partition決定,如map操做下子RDD的Partition與父Partition徹底一致,Union操做時子RDD的Partition個數爲父Partition個數之和。

如何設置spark.default.parallelism對用戶是一個挑戰,它會很大程度上決定Spark程序的性能。
相關文章
相關標籤/搜索