==> RDD是什麼?
shell
---> RDD(Resilient Distributed Dataset) 彈性分佈式數據集 , 是 Spark 中最基本的數據抽象,它表明一個不可變,可分區,裏面的元素可並行計算的集合數組
---> 特色:緩存
---- 自動容錯
分佈式
---- 位置感知性高度ide
---- 可伸縮性函數
---- 容許用戶在執行多個查詢時顯示的將工做集緩存在內存中,後續的查詢可以重用工做集,極大的提高了查詢速度oop
---> RDD 的屬性this
---- A list of partitionsspa
一個組分片,即數據集的基本組成單位 |
對於 RDD 來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度,用戶能夠在建立 RDD 時指定 RDD的分片個數,若是沒有指定,那麼就會採用默認值,默認值就是程序所分配 到的 CPU Core 的數目 |
---- A function for computing each split
scala
一個計算每一個分區的函數 |
Spark 中 RDD 的計算是以分片爲單位的,每一個 RDD 都會實現 compute 函數以達到這個目的, compute 函數會對迭代器進行復合,不須要保存每次計算的結果 |
---- A list of dependencies on other RDDs
RDD 之間的依賴關係 |
RDD 每次轉換都會生成一個新的RDD, 因此 RDD 之間就會造成相似於流水線同樣的先後依賴關係。在部分數據丟失時, Spark 能夠經過這個依賴關係從新計算丟失 的分區數據,而不是對 RDD的全部分區進行從新計算 |
---- Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)
一個 Partitioner, 即 RDD的分片函數 |
Spark 中實現 了兩種類型的分片函數, 一個是基於哈希的 HashPartitioner, 另一個是基於 RangePartitioner, 只有對於 key-value 的 RDD, 纔會有 Partitioner, 非 key-value的 RDD的 Partitioner的值是None, Partitioner函數不但決定 了 RDD 自己的分片數量,也決定了 parents RDD Shuffle 輸出時的分片數量 |
---- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file )
一個列表,存儲存取每一個 Partion 的優先位置(preferred location) |
對於一個 HDFS 文件來講,這個列表 保存的就是每一個Partition 所在的塊的位置 按照「移動數據不如移動計算」的理念, Spark 在進行任務調度的時候,會盡量的將計算任務分配到其所要處理數據塊的存儲位置 |
==> RDD 的建立方式
---> 經過外部的數據文件建立 (HDFS)
val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt")
---> 經過 sc.parallelize 進行建立
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
==> RDD的基本原理
---> 建立一個 RDD:
// 3表明分三個分區 val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8), 3)
---> 一個分區運行在一個Worker 節點上, 一個 Worker 上能夠運行多個分區
==> RDD 的類型
---> Trasformation
RDD 中的全部轉換都是延遲加載的,即,不會返回計算結果,只記住這些應用到基礎數據集(如,一個文件,一個列表等)上的轉換動做,只有當發生一個要求返回結果給 Driver 時,這些轉換纔會執行(我的理解,與 Scala 中的 lazy (懶值)比較類似) |
轉換 | 含義 |
map(func) |
返回一個新的 RDD,該 RDD 由每個輸入元素通過 func 函數轉換後組成 |
filter(func) | 返回一個新的RDD,該 RDD 由通過 func 函數計算後返回值爲 true 的輸入元素組成 |
flatMap(func) | 相似於 map ,可是每一個輸入元素能夠被映射爲 0 或多個輸出元素(返回一個序列) |
mapPartitions(func) | 相似於 map, 可是獨立的在RDD 的每個分片上運行,所以在類型爲T 的 RDD 上運行時,func 的函數類型必須 是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似於 mapPartitions,但 func 帶有一個整數參數表示分片的索引值,所以在類型爲T 的 RDD 上運行時, func 的函數類型必須是(Int, Interator[T])= > Iterator[U] |
sample(withReplacement, fraction, seed) | 根據 fraction 指定的比例對數據進行採樣, 能夠選擇是否使用隨機數進行替換, seed 用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD 和 參數 RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD 和參數 RDD 求交集後返回一個新的RDD |
distinct([numTasks]) | 對源RDD 去重後返回一個新的RDD |
groupByKey([numTasks]) | 在 (k, v) 的RDD 上調用,返回一個(K, iterator[V]) 的 RDD |
reduceByKey(func, [numTasks]) | 在(k, v) 的RDD上調用,返回一個(k, v) 的 RDD, 使用指定的reduce函數,將相同key 的值聚合到一塊兒,與 groupByKey 相似,reduce 任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一個(k, v)上調用,k 必須實現 Ordered 接口,返回一個按照 key 進行排序的(k, v) 的RDD |
sortBy(func, [ascending], [numTasks]) | 與 sortByKey 相似,可是更靈活 |
join(otherDataset, [numTasks]) | 在類型爲(k, v)和(k, w) 的RDD 上調用,返回一個相同key 對應的全部元素堆在一塊兒的(k, (v, w)) 的 RDD |
cogroup(otherDataset, [numTasks]) | 在類型爲(k, v)和(k, w)的 RDD 上調用 ,返回一個(k, (Iterable<v>, Iterable<w>)) 類型的 RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | |
coalesce(numPartitions) | |
repartitionAndSortWithinPartitions(partitions) |
---> Action
reduce(fun) | 經過 func 函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的 |
collect() | 在驅動程序中,以數組的形式返回數據集的全部元素 |
count() | 返回元素個數 |
first() | 反回 RDD 的第一個元素(相似於 take(1)) |
take(n) | 返回一個由數據集的前 n 個元素組成的數組 |
takeSample(withReplacement, num, [seed]) | 返回一個數組,該 數組由從數據集中隨機採樣的num 個元素組成,能夠選擇是否用隨機數替換不足的部分, seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 將數據集的元素以 textfile 的形式保存到HDFS文件系統或者其它支持的文件系統,對每一個元素,Spark 將會調用 toString 方法將它轉換爲文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以 Hadoop sequencefile 的格式 保存到指定的目錄下,可使HDFS 或者其它 Hadoop 支持的文件系統 |
saveAsObjectFile(path) | |
countByKey() | 針對(k, v ) 類型的RDD, 返回一個(k, Int) 的 map, 表示每個key 對應的元素個數 |
foreach(func) | 在數據集的每個元素上運行函數 func 進行更新 |
==> RDD 的緩存機制
---> 做用:緩存有可能丟失,或因爲存儲於內存中的數據因爲內存不足而被刪除,緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行
---> 實現原理:經過基於 RDD 的一系列轉換,丟失的數據會被重算,因爲 RDD 的各個 Partition 是相對獨立的,所以只須要計算丟失的部分便可, 不用所有從新計算
---> 運行方式:RDD經過 persist方法或 cache方法能夠將前面的計算結果緩存,但並不會調用時便立緩存,而是觸發後面的action 時,此RDD會被緩存到計算機內存中,供後面重用
---> 經過查看源碼能夠發現,cache 最終調用的也是 parsist
def persist():this.type = persist(StorageLevel.MEMORY_ONLY) def cache():this.type = persist()
---> 緩存使用:
val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt") rdd1.count // 沒有緩存,直接執行 rdd1.cache rdd1.count // 第一次執行會慢一些 rdd1.count // 第二次會很快
---> 存儲級別在 object StorageLevel 中定義
object StorageLevel{ val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true) val MEMORY_ONLY_SET = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SET_2 = new StorageLevel(false, true, false, false) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_SET = new StorageLevel(true, true, false, false) val MEMORY_ADN_DISK_SET_2 = new StorageLevel(true, true, false, false) val OFF_HEAP = new StorageLevel = new StorageLevel(true, true, true, false) }
==> RDD的 Checkpoint(檢查點)機制: 容錯機制
---> 檢查點本質是經過將 RDD 寫入 Disk 作檢查點
---> 做用: 經過作 lineage 作容錯的輔助
---> 運行機制: 在RDD 的中間階段作檢查點容錯,以後若是有節點出現問題而丟失分區,從作檢查點的 RDD 開始從新作 Lineage,以達到減小開銷的目的
---> 設置檢查點的方式: 本地目錄, HDFS
---- 本地目錄(須要將 spark-shell 運行在本地模式上)
// 設置檢查點目錄 sc.setCheckpointDir("/data/checkpoint") // 建立一個RDD val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt") // 設置檢查點 rdd1.checkpoint // 執行,觸發 Action ,會在檢查點目錄生成檢查點 rdd1.count
---- HDFS(須要將 Spark-shell 運行在集羣模式上)
// 設置檢查點目錄 sc.setCheckpointDir("hdfs://192.168.10.210:9000/data/checkpoint") // 建立一個RDD val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt") // 設置檢查點 rdd1.checkpoint // 執行,觸發 Action ,會在檢查點目錄生成檢查點 rdd1.count
==> RDD 的依賴關係 和 Spark 任務中的 Stage
---> RDD 依賴關係 RDD和它的父 RDD(s)的關係有兩種不一樣的類型
---- 窄依賴 每一個 父 RDD 的 partition 只能被子 RDD 的一個 partition 使用 一個子RDD
---- 寬依賴 多個子RDD 的 partition 會依賴同一個父 RDD 多個子RDD
---> Stage 劃分Stage 的依據是:寬依賴