Spark 算子

==> RDD是什麼?
shell

    ---> RDD(Resilient Distributed Dataset) 彈性分佈式數據集 , 是 Spark 中最基本的數據抽象,它表明一個不可變,可分區,裏面的元素可並行計算的集合數組

    ---> 特色:緩存

        ---- 自動容錯
分佈式

        ---- 位置感知性高度ide

        ---- 可伸縮性函數

        ---- 容許用戶在執行多個查詢時顯示的將工做集緩存在內存中,後續的查詢可以重用工做集,極大的提高了查詢速度oop

    ---> RDD 的屬性this

        ---- A list of partitionsspa


一個組分片,即數據集的基本組成單位
對於 RDD 來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度,用戶能夠在建立 RDD 時指定 RDD的分片個數,若是沒有指定,那麼就會採用默認值,默認值就是程序所分配 到的 CPU  Core 的數目 


        ---- A function for computing each split

scala

一個計算每一個分區的函數
Spark 中 RDD 的計算是以分片爲單位的,每一個 RDD 都會實現 compute 函數以達到這個目的, compute 函數會對迭代器進行復合,不須要保存每次計算的結果


        ---- A list of dependencies on other RDDs

RDD 之間的依賴關係
RDD 每次轉換都會生成一個新的RDD, 因此 RDD 之間就會造成相似於流水線同樣的先後依賴關係。在部分數據丟失時, Spark 能夠經過這個依賴關係從新計算丟失 的分區數據,而不是對 RDD的全部分區進行從新計算


        ---- Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)

一個 Partitioner, 即 RDD的分片函數
Spark 中實現 了兩種類型的分片函數, 一個是基於哈希的 HashPartitioner, 另一個是基於 RangePartitioner, 只有對於 key-value 的 RDD, 纔會有 Partitioner, 非 key-value的 RDD的 Partitioner的值是None, Partitioner函數不但決定 了 RDD 自己的分片數量,也決定了 parents RDD Shuffle 輸出時的分片數量


        ---- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file )

一個列表,存儲存取每一個 Partion 的優先位置(preferred location)

對於一個 HDFS 文件來講,這個列表 保存的就是每一個Partition 所在的塊的位置

按照「移動數據不如移動計算」的理念, Spark 在進行任務調度的時候,會盡量的將計算任務分配到其所要處理數據塊的存儲位置


==> RDD 的建立方式

    ---> 經過外部的數據文件建立 (HDFS)

val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt")

    ---> 經過 sc.parallelize 進行建立

val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))

==> RDD的基本原理

    ---> 建立一個 RDD: 

//                          3表明分三個分區
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8), 3)

    ---> 一個分區運行在一個Worker 節點上, 一個 Worker 上能夠運行多個分區

==> RDD  的類型

    ---> Trasformation

RDD 中的全部轉換都是延遲加載的,即,不會返回計算結果,只記住這些應用到基礎數據集(如,一個文件,一個列表等)上的轉換動做,只有當發生一個要求返回結果給 Driver 時,這些轉換纔會執行(我的理解,與 Scala 中的 lazy (懶值)比較類似)

轉換 含義
map(func)
返回一個新的 RDD,該 RDD 由每個輸入元素通過 func 函數轉換後組成
filter(func) 返回一個新的RDD,該 RDD 由通過 func 函數計算後返回值爲 true 的輸入元素組成
flatMap(func) 相似於 map ,可是每一個輸入元素能夠被映射爲 0 或多個輸出元素(返回一個序列)
mapPartitions(func) 相似於 map, 可是獨立的在RDD 的每個分片上運行,所以在類型爲T  的 RDD 上運行時,func 的函數類型必須 是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 相似於 mapPartitions,但 func 帶有一個整數參數表示分片的索引值,所以在類型爲T 的 RDD 上運行時, func 的函數類型必須是(Int, Interator[T])= > Iterator[U]
sample(withReplacement, fraction, seed) 根據 fraction 指定的比例對數據進行採樣, 能夠選擇是否使用隨機數進行替換, seed 用於指定隨機數生成器種子
union(otherDataset) 對源RDD 和 參數 RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD 和參數 RDD 求交集後返回一個新的RDD
distinct([numTasks]) 對源RDD 去重後返回一個新的RDD
groupByKey([numTasks]) 在 (k, v) 的RDD 上調用,返回一個(K, iterator[V]) 的 RDD
reduceByKey(func, [numTasks]) 在(k, v) 的RDD上調用,返回一個(k, v) 的 RDD, 使用指定的reduce函數,將相同key 的值聚合到一塊兒,與 groupByKey 相似,reduce 任務的個數能夠經過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個(k, v)上調用,k 必須實現 Ordered 接口,返回一個按照 key 進行排序的(k, v) 的RDD
sortBy(func, [ascending], [numTasks]) 與 sortByKey 相似,可是更靈活
join(otherDataset, [numTasks]) 在類型爲(k, v)和(k, w) 的RDD 上調用,返回一個相同key 對應的全部元素堆在一塊兒的(k, (v, w)) 的 RDD
cogroup(otherDataset, [numTasks]) 在類型爲(k, v)和(k, w)的 RDD 上調用 ,返回一個(k, (Iterable<v>, Iterable<w>)) 類型的 RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartitionAndSortWithinPartitions(partitions)

    ---> Action



reduce(fun) 經過 func 函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的
collect() 在驅動程序中,以數組的形式返回數據集的全部元素
count() 返回元素個數
first() 反回 RDD 的第一個元素(相似於 take(1))
take(n) 返回一個由數據集的前 n 個元素組成的數組
takeSample(withReplacement, num, [seed]) 返回一個數組,該 數組由從數據集中隨機採樣的num 個元素組成,能夠選擇是否用隨機數替換不足的部分, seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數據集的元素以 textfile 的形式保存到HDFS文件系統或者其它支持的文件系統,對每一個元素,Spark 將會調用 toString 方法將它轉換爲文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以 Hadoop sequencefile 的格式 保存到指定的目錄下,可使HDFS 或者其它 Hadoop 支持的文件系統
saveAsObjectFile(path)
countByKey() 針對(k, v ) 類型的RDD, 返回一個(k, Int) 的 map, 表示每個key 對應的元素個數
foreach(func) 在數據集的每個元素上運行函數 func 進行更新


 

==> RDD  的緩存機制

    ---> 做用:緩存有可能丟失,或因爲存儲於內存中的數據因爲內存不足而被刪除,緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行

    ---> 實現原理:經過基於 RDD 的一系列轉換,丟失的數據會被重算,因爲 RDD 的各個 Partition 是相對獨立的,所以只須要計算丟失的部分便可, 不用所有從新計算

    ---> 運行方式:RDD經過 persist方法或 cache方法能夠將前面的計算結果緩存,但並不會調用時便立緩存,而是觸發後面的action 時,此RDD會被緩存到計算機內存中,供後面重用

    ---> 經過查看源碼能夠發現,cache 最終調用的也是 parsist

def persist():this.type = persist(StorageLevel.MEMORY_ONLY)

def cache():this.type = persist()

    ---> 緩存使用:

val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt")
rdd1.count          // 沒有緩存,直接執行

rdd1.cache
rdd1.count        // 第一次執行會慢一些
rdd1.count        // 第二次會很快


    ---> 存儲級別在 object  StorageLevel 中定義

object StorageLevel{
    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false)
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_SET = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SET_2 = new StorageLevel(false, true, false, false)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_SET = new StorageLevel(true, true, false, false)
    val MEMORY_ADN_DISK_SET_2 = new StorageLevel(true, true, false, false)
    val OFF_HEAP = new StorageLevel = new StorageLevel(true, true, true, false)

}


==> RDD的 Checkpoint(檢查點)機制: 容錯機制

    ---> 檢查點本質是經過將 RDD 寫入 Disk 作檢查點

    ---> 做用: 經過作 lineage 作容錯的輔助

    ---> 運行機制: 在RDD 的中間階段作檢查點容錯,以後若是有節點出現問題而丟失分區,從作檢查點的 RDD 開始從新作 Lineage,以達到減小開銷的目的

    ---> 設置檢查點的方式: 本地目錄, HDFS

        ---- 本地目錄(須要將 spark-shell 運行在本地模式上)

// 設置檢查點目錄 
sc.setCheckpointDir("/data/checkpoint")
// 建立一個RDD
val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt")
// 設置檢查點
rdd1.checkpoint
// 執行,觸發 Action ,會在檢查點目錄生成檢查點
rdd1.count


        ---- HDFS(須要將 Spark-shell 運行在集羣模式上)

// 設置檢查點目錄 
sc.setCheckpointDir("hdfs://192.168.10.210:9000/data/checkpoint")
// 建立一個RDD
val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt")
// 設置檢查點
rdd1.checkpoint
// 執行,觸發 Action ,會在檢查點目錄生成檢查點
rdd1.count


==> RDD 的依賴關係 和 Spark 任務中的 Stage

    ---> RDD 依賴關係    RDD和它的父 RDD(s)的關係有兩種不一樣的類型

        ---- 窄依賴    每一個 父 RDD 的 partition 只能被子 RDD 的一個 partition 使用    一個子RDD

        ---- 寬依賴    多個子RDD 的 partition 會依賴同一個父 RDD        多個子RDD

    ---> Stage    劃分Stage 的依據是:寬依賴

 image.png

相關文章
相關標籤/搜索