做者:studytime
原文: https://www.studytime.xin
1.一、RDD是什麼?數據庫
RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,表明一個不可變、可分區、裏面的元素可並行計算的集合。編程
1.二、RDD的主要屬性?數組
RDD 是Spark 中最基本的數據抽象,是一個邏輯概念,它可能並不對應次磁盤或內存中的物理數據,而僅僅是記錄了RDD的由來,父RDD是誰,以及怎樣從父RDD計算而來。緩存
spark 源碼裏面對 RDD 的描述:併發
Internally, each RDD is characterized by five main properties: A list of partitions A function for computing each split A list of dependencies on other RDDs Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
能夠知道,每一個 RDD 有如下五部分構成:app
RDD是一個應用層面的邏輯概念。一個RDD多個分片。RDD就是一個元數據記錄集,記錄了RDD內存全部的關係數據。分佈式
Spark 程序設計流程通常以下:ide
步驟一:實例化 sparkContent 對象。sparkContent 封裝了程序運行的上下文環境,包括配置信息、數據庫管理器、任務調度等。
步驟二:構造 RDD。可經過 sparkContent 提供的函數構造 RDD,常見的 RDD 構造方式分爲:將 Scala集合轉換爲 RDD 和將 Hadoop 文件轉換爲 RDD。
步驟三:在 RDD 基礎上,經過 Spark 提供的 transformation 算子完成數據處理步驟。
步驟四:經過 action 算子將最終 RDD 做爲結果直接返回或者保存到文件中。函數
Spark 提供了兩大類編程接口,分別爲 RDD 操做符以及共享變量。
其中 RDD 操做符包括 transformation 和 action 以及 control API 三類;共享變量包括廣播變量和累加器兩種。oop
一、 建立conf ,封裝了spark配置信息
SparkConf conf = new SparkConf().setAppName(appName); conf.set(「spark.master」, 「local」); conf.set(「spark.yarn.queue」, 「infrastructure」);
二、 建立 SparkContext,封裝了調度器等信息
JavaSparkContext jsc = new JavaSparkContext(conf);
一、Java 集合構建
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = jsc.parallelize(data, 3)
二、 將文本文件轉換爲 RDD
jsc.textFile(「/data」, 1) jsc.textFile(「/data/file.txt」, 1) jsc.textFile(「/data/*.txt」, 1) jsc.textFile(「hdfs://bigdata:9000/data/」, 1) jsc.sequenceFile(「/data」, 1) jsc.wholeTextFiles(「/data」, 1)
transformation API 是惰性的,調用這些API比不會觸發實際的分佈式數據計算,而僅僅是將相關信息記錄下來,直到action API纔會開始數據計算。
Spark 提供了大量的 transformation API,下面列舉了一些經常使用的API:
API | 功能 |
---|---|
map(func) | 將 RDD 中的元素,經過 func 函數逐一映射成另一個值,造成一個新的 RDD |
filter(func) | 將 RDD 中使用 func 函數返回 true 的元素過濾出來,造成一個新的 RDD |
flatMap(func | 相似於map,但每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 相似於 map,但獨立地在 RDD 的每個分片上運行,所以在類型爲T的 RDD 上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
sample(withReplacement, fraction, seed) | 數據採樣函數。根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed 用於指定隨機數生成器種子 |
union(otherDataset) | 求兩個 RDD (目標 RDD 與指定 RDD)的並集,並以 RDD 形式返回 |
intersection(otherDataset) | 求兩個 RDD (目標 RDD 與指定 RDD )的交集,並以 RDD 形式返回 |
distinct([numTasks])) | 對目標 RDD 進行去重後返回一個新的 RDD |
groupByKey([numTasks]) | 針對 key/value 類型的 RDD,將 key 相同的 value 彙集在一塊兒。默認任務併發度與父 RDD 相同,可顯示設置 [numTasks]大小 |
reduceByKey(func, [numTasks]) | 針對 key/value 類型的 RDD,將 key 相同的 value 彙集在一塊兒,將對每組value,按照函數 func 規約,產生新的 RDD |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 與 reduceByKey 相似,但目標 key/value 的類型與最終產生的 RDD 可能不一樣 |
sortByKey([ascending], [numTasks]) | 針對 key/value 類型的 RDD,按照 key 進行排序,若 ascending 爲 true,則爲升序,反之爲降序 |
join(otherDataset, [numTasks]) | 針對 key/value 類型的 RDD,對 (K,V) 類型的 RDD 和(K,W)類型的RDD上調用,按照 key 進行等值鏈接,返回一個相同key對應的全部元素在一塊兒的(K,(V,W))的RDD 至關於內鏈接(求交集) |
cogroup(otherDataset, [numTasks]) | 分組函數,對(K,V)類型的RDD和(K,W)類型的RD按照key進行分組,產生新的 (K,(Iterable<V>,Iterable<W>)) 類型的RDD |
cartesian(otherDataset) | 求兩個 RDD 的笛卡爾積 |
coalesce(numPartitions) | 從新分區, 縮減分區數,用於大數據集過濾後,提升小數據集的執行效率 |
repartition(numPartitions) | 從新分區,將目標 RDD 的 partition 數量從新調整爲 numPartitions, 少變多 |
glom() | 將RDD中每一個partition中元素轉換爲數組,並生 成新的rdd2 |
mapValues() | 針對於(K,V)形式的類型只對V進行操做 |
cache | RDD緩存,能夠避免重複計算從而減小時間,cache 內部調用了 persist 算子,cache 默認就一個緩存級別 MEMORY-ONLY |
persist | persist 能夠選擇緩存級別 |
transformation 算子具備惰性執行的特性,他僅僅是記錄一些原信息,知道遇到action算子纔會觸發相關transformation 算子的執行,
Spark 提供了大量的 action API,下面列舉了一些經常使用的API:
API | 功能 |
---|---|
reduce(func) | 將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值爲止 |
collect() | 將 RDD 以數組的形式返回給 Driver,經過將計算後的較小結果集返回 |
count() | 計算 RDD 中的元素個數 |
first() | 返回 RDD 中第一個元素 |
take(n) | 以數組的形式返回 RDD 前 n 個元素 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
saveAsTextFile(path) | 將 RDD 存儲到文本文件中,並一次調用每一個元素的toString方法將之轉換成字符串保存成一行 |
saveAsSequenceFile(path) | 針對 key/value 類型的 RDD,保存成 SequenceFile 格式文件 |
countByKey() | 針對 key/value 類型的 RDD,統計每一個 key出現的次數,並以 hashmap 形式返回 |
foreach(func) | 將 RDD 中的元素一次交給 func 處理 |
aggregate | 先對分區進行操做,再整體操做 |
aggregateByKey | |
lookup(key: K) | 針對 key/value 類型的 RDD, 指定key值,返回RDD中該K對應的全部V值。 |
foreachPartition | 相似於 foreach,但獨立地在 RDD 的每個分片上運行,其中可嵌入foreach算子 |
因爲 RDD 是粗粒度的操做數據集,每一個 Transformation 操做都會生成一個新的 RDD,因此 RDD 之間就會造成相似流水線的先後依賴關係;RDD 和它依賴的父 RDD(s)的關係有兩種不一樣的類型,即窄依賴(Narrow Dependency)和寬依賴(Wide Dependency)。
寬依賴和窄依賴深度剖析圖:
窄依賴:指的是子 RDD 只依賴於父 RDD 中一個固定數量的分區。
寬依賴:指的是子 RDD 的每個分區都依賴於父 RDD 的全部分區。
RDD Stage:
在 Spark 中,Spark 會將每個 Job 分爲多個不一樣的 Stage, 而 Stage 之間的依賴關係則造成了有向無環圖,Spark 會根據 RDD 之間的依賴關係將 DAG 圖(有向無環圖)劃分爲不一樣的階段,對於窄依賴,因爲 Partition 依賴關係的肯定性,Partition 的轉換處理就能夠在同一個線程裏完成,窄依賴就被 Spark 劃分到同一個 stage 中,而對於寬依賴,只能等父 RDD shuffle 處理完成後,下一個 stage 才能開始接下來的計算。
RDD 持久化是 Spark 很是重要的特性之一。用戶可顯式將一個 RDD 持久化到內存或磁盤中,以便重用該RDD。RDD 持久化是一個分佈式的過程,其內部的每一個 Partition 各自緩存到所在的計算節點上。RDD 持久化存儲能大大加快數據計算效率,尤爲適合迭代式計算和交互式計算。
Spark 提供了 persist 和 cache 兩個持久化函數,其中 cache 將 RDD 持久化到內存中,而 persist 則支持多種存儲級別。
persist RDD 存儲級別:
持久化級別 | 含義 | |
---|---|---|
MEMORY_ONLY | 以非序列化的Java對象的方式持久化在JVM內存中。若是內存沒法徹底存儲RDD全部的partition,那麼那些沒有持久化的partition就會在下一次須要使用它的時候,從新被計算。 | |
MEMORY_AND_DISK | 同上,可是當某些partition沒法存儲在內存中時,會持久化到磁盤中。下次須要使用這些partition時,須要從磁盤上讀取。 | |
MEMORY_ONLY_SER | 同MEMORY_ONLY,可是會使用Java序列化方式,將Java對象序列化後進行持久化。能夠減小內存開銷,可是須要進行反序列化,所以會加大CPU開銷 | |
MEMORY_AND_DSK_SER | 同MEMORY_AND_DSK。可是使用序列化方式持久化Java對象。 | |
DISK_ONLY | 使用非序列化Java對象的方式持久化,徹底存儲到磁盤上。 | |
MEMORY_ONLY_2 | ||
MEMORY_AND_DISK_2 | 若是是尾部加了2的持久化級別,表示會將持久化數據複用一份,保存到其餘節點,從而在數據丟失時,不須要再次計算,只須要使用備份數據便可。 |
如何選擇RDD持久化策略:
Spark 提供的多種持久化級別,主要是爲了在 CPU 和內存消耗之間進行取捨。下面是一些通用的持久化級別的選擇建議:
除了 cache 和 persist 以外,Spark 還提供了另一種持久化:checkpoint, 它能將 RDD 寫入文件系統,提供相似於數據庫快照的功能。
於 cache 和 persist, 區別:
代碼實例:
sc.checkpoint("hdfs://spark/rdd"); // 設置存放目錄 val data = sc.testFile("hdfs://bigdata:9000/input"); val rdd = data.map(..).reduceByKey(...) rdd.checkpoint rdd.count()