概念緩存
RDD具備如下一些特色:架構
建立:只能經過轉換( transformation,如map/filter/groupBy/join等,區別於動做action)從兩種數據源中建立RDD:1)穩定存儲中的數據;2)其餘RDD。分佈式
只讀:狀態不可變,不能修改函數
分區:支持使RDD中的元素根據那個key來分區( partitioning),保存到多個結點上。還原時只會從新計算丟失分區的數據,而不會影響整個系統。oop
路徑:在RDD中叫世族或血統( lineage),即RDD有充足的信息關於它是如何從其餘RDD產生而來的。spa
持久化:支持將會·被重用的RDD緩存(如in-memory或溢出到磁盤)設計
延遲計算:像DryadLINQ同樣,Spark也會延遲計算RDD,使其可以將轉換管道化(pipeline transformation)orm
操做:豐富的動做( action),count/reduce/collect/save等。對象
關於轉換(transformation)與動做(action)的區別,前者會生成新的RDD,然後者只是將RDD上某項操做的結果返回給程序,而不會生成新的RDDip
RDD底層實現原理
RDD是一個分佈式數據集,顧名思義,其數據應該分部存儲於多臺機器上。事實上,每一個RDD的數據都以Block的形式存儲於多臺機器上,下圖是Spark的RDD存儲架構圖,其中每一個Executor會啓動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點的BlockManagerMaster保存。BlockManagerSlave生成Block後向BlockManagerMaster註冊該Block,BlockManagerMaster管理RDD與Block的關係,當RDD再也不須要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。
RDD cache的原理
RDD的轉換過程當中,並非每一個RDD都會存儲,若是某個RDD會被重複使用,或者計算其代價很高,那麼能夠經過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?
RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,經過CacheManager把RDD計算出來,並存儲到BlockManager中,下次獲取該RDD的數據時即可直接經過CacheManager從BlockManager讀出。
RDD的容錯機制實現分佈式數據集容錯方法
數據檢查點和記錄更新RDD採用記錄更新的方式:記錄全部更新點的成本很高。因此,RDD只支持粗顆粒變換,即只記錄單個塊上執行的單個操做,而後建立某個RDD的變換序列(血統)存儲下來;變換序列指,每一個RDD都包含了他是如何由其餘RDD變換過來的以及如何重建某一塊數據的信息。所以RDD的容錯機制又稱「血統」容錯。 要實現這種「血統」容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關係。實際上依賴關係能夠分兩種,窄依賴和寬依賴:窄依賴:子RDD中的每一個數據塊只依賴於父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊能夠依賴於父RDD中的全部數據塊。例如:map變換,子RDD中的數據塊只依賴於父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴於多有父RDD中的數據塊,由於一個key可能錯在於父RDD的任何一個數據塊中 將依賴關係分類的兩個特性:第一,窄依賴能夠在某個計算節點上直接經過計算父RDD的某塊數據計算獲得子RDD對應的某塊數據;寬依賴則要等到父RDD全部數據都計算完成以後,而且父RDD的計算結果進行hash並傳到對應節點上以後才能計算子RDD。第二,數據丟失時,對於窄依賴只須要從新計算丟失的那一塊數據來恢復;對於寬依賴則要將祖先RDD中的全部數據塊所有從新計算來恢復。因此在長「血統」鏈特別是有寬依賴的時候,須要在適當的時機設置數據檢查點。也是這兩個特性要求對於不一樣依賴關係要採起不一樣的任務調度機制和容錯恢復機制。
RDD內部的設計
每一個RDD有5個主要的屬性:
1)一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。圖3-1描述了分區存儲的計算模型,每一個分配的存儲是由BlockManager實現的。每一個分區都會被邏輯映射成BlockManager的一個Block,而這個Block會被一個Task負責計算。
2)一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
3)RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
4)一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。
Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
5)一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
以Spark中內建的幾個RDD舉例來講:
信息/RDD |
HadoopRDD |
FilteredRDD |
JoinedRDD |
Partitions |
每一個HDFS塊一個分區,組成集合 |
與父RDD相同 |
每一個Reduce任務一個分區 |
PreferredLoc |
HDFS塊位置 |
無(或詢問父RDD) |
無 |
Dependencies |
無(父RDD) |
與父RDD一對一 |
對每一個RDD進行混排 |
Iterator |
讀取對應的塊數據 |
過濾 |
聯接混排的數據 |
Partitioner |
無 |
無 |
HashPartitioner |
工做原理
主要分爲三步:建立RDD對象,DAG調度器建立執行計劃,Task調度器分配任務並調度Worker開始運行。
如下面一個按A-Z首字母分類,查找相同首字母下不一樣姓名總個數的例子來看一下RDD是如何運行起來的。
步驟1:建立RDD。上面的例子除去最後一個collect是個動做,不會建立RDD以外,前面四個轉換都會建立出新的RDD。所以第一步就是建立好全部RDD(內部的五項信息)。
步驟2:建立執行計劃。Spark會盡量地管道化,並基因而否要從新組織數據來劃分 階段(stage),例如本例中的groupBy()轉換就會將整個執行計劃劃分紅兩階段執行。最終會產生一個 DAG(directed acyclic graph,有向無環圖)做爲邏輯執行計劃。
步驟3:調度任務。將各階段劃分紅不一樣的 任務(task),每一個任務都是數據和計算的合體。在進行下一階段前,當前階段的全部任務都要執行完成。由於下一階段的第一個轉換必定是從新組織數據的,因此必須等當前階段全部結果數據都計算出來了才能繼續。