Spark系列文章(四):Spark之RDD

Spark系列文章(四):Spark之RDD

做者:studytime
原文: https://www.studytime.xin

1、RDD的概述

1.一、RDD是什麼?數據庫

RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,表明一個不可變、可分區、裏面的元素可並行計算的集合。編程

1.二、RDD的主要屬性?數組

RDD 是Spark 中最基本的數據抽象,是一個邏輯概念,它可能並不對應次磁盤或內存中的物理數據,而僅僅是記錄了RDD的由來,父RDD是誰,以及怎樣從父RDD計算而來。緩存

spark 源碼裏面對 RDD 的描述:併發

Internally, each RDD is characterized by five main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

能夠知道,每一個 RDD 有如下五部分構成:app

  • 一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
  • 每一個Partition的計算函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
  • RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
  • (可選的)對於key-value類型的RDD,則包含一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
  • (可選的)計算每一個Parition所傾向的節點位置。存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

RDD是一個應用層面的邏輯概念。一個RDD多個分片。RDD就是一個元數據記錄集,記錄了RDD內存全部的關係數據。分佈式

2、Spark編程接口

Spark 程序設計流程通常以下:ide

步驟一:實例化 sparkContent 對象。sparkContent 封裝了程序運行的上下文環境,包括配置信息、數據庫管理器、任務調度等。
步驟二:構造 RDD。可經過 sparkContent 提供的函數構造 RDD,常見的 RDD 構造方式分爲:將 Scala集合轉換爲 RDD 和將 Hadoop 文件轉換爲 RDD。
步驟三:在 RDD 基礎上,經過 Spark 提供的 transformation 算子完成數據處理步驟。
步驟四:經過 action 算子將最終 RDD 做爲結果直接返回或者保存到文件中。函數

Spark 提供了兩大類編程接口,分別爲 RDD 操做符以及共享變量。
其中 RDD 操做符包括 transformation 和 action 以及 control API 三類;共享變量包括廣播變量和累加器兩種。oop

3、建立 sparkContent 對象,封裝了 Spark 執行環境信息

一、 建立conf ,封裝了spark配置信息

SparkConf conf = new SparkConf().setAppName(appName); 
conf.set(「spark.master」, 「local」); 
conf.set(「spark.yarn.queue」, 「infrastructure」);

二、 建立 SparkContext,封裝了調度器等信息

JavaSparkContext jsc = new JavaSparkContext(conf);

4、構建RDD

一、Java 集合構建

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); 
JavaRDD<Integer> rdd = jsc.parallelize(data, 3)

二、 將文本文件轉換爲 RDD

jsc.textFile(「/data」, 1) 
jsc.textFile(「/data/file.txt」, 1) 
jsc.textFile(「/data/*.txt」, 1) 
jsc.textFile(「hdfs://bigdata:9000/data/」, 1) 
jsc.sequenceFile(「/data」, 1) 
jsc.wholeTextFiles(「/data」, 1)

5、RDD transformation

transformation API 是惰性的,調用這些API比不會觸發實際的分佈式數據計算,而僅僅是將相關信息記錄下來,直到action API纔會開始數據計算。

Spark 提供了大量的 transformation API,下面列舉了一些經常使用的API:

API 功能
map(func) 將 RDD 中的元素,經過 func 函數逐一映射成另一個值,造成一個新的 RDD
filter(func) 將 RDD 中使用 func 函數返回 true 的元素過濾出來,造成一個新的 RDD
flatMap(func 相似於map,但每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
mapPartitions(func) 相似於 map,但獨立地在 RDD 的每個分片上運行,所以在類型爲T的 RDD 上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
sample(withReplacement, fraction, seed) 數據採樣函數。根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed 用於指定隨機數生成器種子
union(otherDataset) 求兩個 RDD (目標 RDD 與指定 RDD)的並集,並以 RDD 形式返回
intersection(otherDataset) 求兩個 RDD (目標 RDD 與指定 RDD )的交集,並以 RDD 形式返回
distinct([numTasks])) 對目標 RDD 進行去重後返回一個新的 RDD
groupByKey([numTasks]) 針對 key/value 類型的 RDD,將 key 相同的 value 彙集在一塊兒。默認任務併發度與父 RDD 相同,可顯示設置 [numTasks]大小
reduceByKey(func, [numTasks]) 針對 key/value 類型的 RDD,將 key 相同的 value 彙集在一塊兒,將對每組value,按照函數 func 規約,產生新的 RDD
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 與 reduceByKey 相似,但目標 key/value 的類型與最終產生的 RDD 可能不一樣
sortByKey([ascending], [numTasks]) 針對 key/value 類型的 RDD,按照 key 進行排序,若 ascending 爲 true,則爲升序,反之爲降序
join(otherDataset, [numTasks]) 針對 key/value 類型的 RDD,對 (K,V) 類型的 RDD 和(K,W)類型的RDD上調用,按照 key 進行等值鏈接,返回一個相同key對應的全部元素在一塊兒的(K,(V,W))的RDD 至關於內鏈接(求交集)
cogroup(otherDataset, [numTasks]) 分組函數,對(K,V)類型的RDD和(K,W)類型的RD按照key進行分組,產生新的 (K,(Iterable<V>,Iterable<W>)) 類型的RDD
cartesian(otherDataset) 求兩個 RDD 的笛卡爾積
coalesce(numPartitions) 從新分區, 縮減分區數,用於大數據集過濾後,提升小數據集的執行效率
repartition(numPartitions) 從新分區,將目標 RDD 的 partition 數量從新調整爲 numPartitions, 少變多
glom() 將RDD中每一個partition中元素轉換爲數組,並生 成新的rdd2
mapValues() 針對於(K,V)形式的類型只對V進行操做
cache RDD緩存,能夠避免重複計算從而減小時間,cache 內部調用了 persist 算子,cache 默認就一個緩存級別 MEMORY-ONLY
persist persist 能夠選擇緩存級別

6、RDD action

transformation 算子具備惰性執行的特性,他僅僅是記錄一些原信息,知道遇到action算子纔會觸發相關transformation 算子的執行,

Spark 提供了大量的 action API,下面列舉了一些經常使用的API:

API 功能
reduce(func) 將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值爲止
collect() 將 RDD 以數組的形式返回給 Driver,經過將計算後的較小結果集返回
count() 計算 RDD 中的元素個數
first() 返回 RDD 中第一個元素
take(n) 以數組的形式返回 RDD 前 n 個元素
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
saveAsTextFile(path) 將 RDD 存儲到文本文件中,並一次調用每一個元素的toString方法將之轉換成字符串保存成一行
saveAsSequenceFile(path) 針對 key/value 類型的 RDD,保存成 SequenceFile 格式文件
countByKey() 針對 key/value 類型的 RDD,統計每一個 key出現的次數,並以 hashmap 形式返回
foreach(func) 將 RDD 中的元素一次交給 func 處理
aggregate 先對分區進行操做,再整體操做
aggregateByKey
lookup(key: K) 針對 key/value 類型的 RDD, 指定key值,返回RDD中該K對應的全部V值。
foreachPartition 相似於 foreach,但獨立地在 RDD 的每個分片上運行,其中可嵌入foreach算子

7、RDD 的寬依賴和窄依賴

因爲 RDD 是粗粒度的操做數據集,每一個 Transformation 操做都會生成一個新的 RDD,因此 RDD 之間就會造成相似流水線的先後依賴關係;RDD 和它依賴的父 RDD(s)的關係有兩種不一樣的類型,即窄依賴(Narrow Dependency)和寬依賴(Wide Dependency)。

寬依賴和窄依賴深度剖析圖:

窄依賴:指的是子 RDD 只依賴於父 RDD 中一個固定數量的分區。
寬依賴:指的是子 RDD 的每個分區都依賴於父 RDD 的全部分區。

RDD Stage:

在 Spark 中,Spark 會將每個 Job 分爲多個不一樣的 Stage, 而 Stage 之間的依賴關係則造成了有向無環圖,Spark 會根據 RDD 之間的依賴關係將 DAG 圖(有向無環圖)劃分爲不一樣的階段,對於窄依賴,因爲 Partition 依賴關係的肯定性,Partition 的轉換處理就能夠在同一個線程裏完成,窄依賴就被 Spark 劃分到同一個 stage 中,而對於寬依賴,只能等父 RDD shuffle 處理完成後,下一個 stage 才能開始接下來的計算。

8、RDD 持久化

persist 和 cache 算子

RDD 持久化是 Spark 很是重要的特性之一。用戶可顯式將一個 RDD 持久化到內存或磁盤中,以便重用該RDD。RDD 持久化是一個分佈式的過程,其內部的每一個 Partition 各自緩存到所在的計算節點上。RDD 持久化存儲能大大加快數據計算效率,尤爲適合迭代式計算和交互式計算。

Spark 提供了 persist 和 cache 兩個持久化函數,其中 cache 將 RDD 持久化到內存中,而 persist 則支持多種存儲級別。

persist RDD 存儲級別:

持久化級別 含義
MEMORY_ONLY 以非序列化的Java對象的方式持久化在JVM內存中。若是內存沒法徹底存儲RDD全部的partition,那麼那些沒有持久化的partition就會在下一次須要使用它的時候,從新被計算。
MEMORY_AND_DISK 同上,可是當某些partition沒法存儲在內存中時,會持久化到磁盤中。下次須要使用這些partition時,須要從磁盤上讀取。
MEMORY_ONLY_SER 同MEMORY_ONLY,可是會使用Java序列化方式,將Java對象序列化後進行持久化。能夠減小內存開銷,可是須要進行反序列化,所以會加大CPU開銷
MEMORY_AND_DSK_SER 同MEMORY_AND_DSK。可是使用序列化方式持久化Java對象。
DISK_ONLY 使用非序列化Java對象的方式持久化,徹底存儲到磁盤上。
MEMORY_ONLY_2
MEMORY_AND_DISK_2 若是是尾部加了2的持久化級別,表示會將持久化數據複用一份,保存到其餘節點,從而在數據丟失時,不須要再次計算,只須要使用備份數據便可。

如何選擇RDD持久化策略:

Spark 提供的多種持久化級別,主要是爲了在 CPU 和內存消耗之間進行取捨。下面是一些通用的持久化級別的選擇建議:

  • 優先使用 MEMORY_ONLY,若是能夠緩存全部數據的話,那麼就使用這種策略。由於純內存速度最快,並且沒有序列化,不須要消耗CPU進行反序列化操做。
  • 若是MEMORY_ONLY策略,沒法存儲的下全部數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操做仍是很是快,只是要消耗CPU進行反序列化。
  • 若是須要進行快速的失敗恢復,那麼就選擇帶後綴爲_2的策略,進行數據的備份,這樣在失敗時,就不須要從新計算了。
  • 能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如從新計算一次。

checkpoint

除了 cache 和 persist 以外,Spark 還提供了另一種持久化:checkpoint, 它能將 RDD 寫入文件系統,提供相似於數據庫快照的功能。

於 cache 和 persist, 區別:

  • Spark 自動管理(建立和回收)cache 和 persist 持久化的數據,而checkpoint持久化的數據須要有由戶本身管理
  • checkpoint 會清楚 RDD 的血統,避免血統過長致使序列化開銷增大,而 cache 和 persist 不會清楚 RDD 的血統

代碼實例:

sc.checkpoint("hdfs://spark/rdd"); // 設置存放目錄
val data = sc.testFile("hdfs://bigdata:9000/input");
val rdd = data.map(..).reduceByKey(...)

rdd.checkpoint
rdd.count()
相關文章
相關標籤/搜索