快速,通用,可擴展的分佈式計算引擎。算法
RDD(Resilient Distributed Dataset)彈性分佈式數據集,是Spark中最基本的數據(邏輯)抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。 RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。數組
Spark 是類Hadoop MapReduce的通用並行框架, 專門用於大數據量下的迭代式計算.是爲了跟 Hadoop 配合而開發出來的,不是爲了取代 Hadoop, Spark 運算比 Hadoop 的 MapReduce 框架快的緣由是由於 Hadoop 在一次 MapReduce 運算以後,會將數據的運算結果從內存寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取數據,因此其瓶頸在2次運算間的多餘 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算獲得最後的結果,再將結果寫入到磁盤,因此屢次運算的狀況下, Spark 是比較快的. 其優化了迭代式工做負載。緩存
Hadoop的侷限 | Spark的改進 |
---|---|
抽象層次低,編碼難以上手。 | 經過使用RDD的統一抽象,實現數據處理邏輯的代碼很是簡潔。 |
只提供Map和Reduce兩個操做,欠缺表達力。 | 經過RDD提供了許多轉換和動做,實現了不少基本操做,如sort、join等。 |
一個job只有map和reduce兩個階段,複雜的程序須要大量的job來完成。且job之間的依賴關係須要應用開發者自行管理。 | 一個job能夠包含多個RDD的轉換操做,只須要在調度時生成多個stage。一個stage中也能夠包含多個map操做,只須要map操做所使用的RDD分區保持不變。 |
處理邏輯隱藏在代碼細節中,缺乏總體邏輯視圖。 | RDD的轉換支持流式API,提供處理邏輯的總體視圖。 |
對迭代式數據的處理性能比較差,reduce與下一步map的中間結果只能存放在HDFS的文件系統中。 | 經過內存緩存數據,可大大提升迭代式計算的性能,內存不足時可溢寫到磁盤上。 |
reduce task須要等全部的map task所有執行完畢才能開始執行。 | 分區相同的轉換能夠在一個task中以流水線的形式執行。只有分區不一樣的轉換須要shuffle操做。 |
時延高,只適合批數據處理,對交互式數據處理和實時數據處理支持不夠。 | 將流拆成小的batch,提供discretized stream處理流數據 |
兩種類型: transformation和action網絡
主要作的是就是將一個已有的RDD生成另一個RDD。Transformation具備lazy特性(延遲加載)。
Transformation算子的代碼不會真正被執行。只有當咱們的程序裏面遇到一個action算子的時候,代碼纔會真正的被執行。這種設計讓Spark更加有效率地運行。
經常使用的Transformation:併發
動做 | 說明 | 示例 |
---|---|---|
map(func) | 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 (每個輸入元素只能被映射爲一個) | var rdd = sc.parallelize(List(「hello world」, 「hello spark」, 「hello hdfs」)) var rdd2 = rdd.map(x => x + 「_1」) rdd2.foreach(println) |
filter(func) | 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 | var rdd3 = rdd2.filter(x => x.contains(「world」)) rdd3.foreach(println) |
flatMap(func) | 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) | var rdd4 = rdd2.flatMap(x => x.split(" ")) rdd4.foreach(println) |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 | |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD | var rdd5 = rdd4.map(x => (x, 1)) var rdd6 = rdd5.groupByKey() rdd6.foreach(println) |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 | var rdd = sc.parallelize(1 to 10)rdd.sample(false,0.4).collect() rdd.sample(false,0.4, 9).collect() |
combineByKey | 合併相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) | jake 80.0 jake 90.0 jake 85.0 mike 86.0 mike 90 求分數的平均值 |
單Value類型算子補充:app
1. mapPartitions: 將待處理的數據以分區爲單位發送到計算節點進行處理;
框架
2. mapPartintions: 將待處理的數據以分區爲單位發送到計算節點進行處理 ;
分佈式
3. glom: 將同一個分區的數據直接轉換爲相同類型的內存數組進行處理,分區不變 ;
ide
4. groupBy: 將數據根據指定的規則進行分組, 分區默認不變,可是數據會被打亂從新組合 ;
函數
5. distinct: 將數據集中重複的數據去重 ;
6. coalesce: 根據數據量縮減分區,用於大數據集過濾後,提升小數據集的執行效率
當 spark 程序中,存在過多的小任務的時候,能夠經過 coalesce 方法,收縮合並分區,減小
分區的個數,減少任務調度成本 ;
7. repartition: 該操做內部其實執行的是 coalesce 操做,參數 shuffle 的默認值爲 true。
8. sortBy: 該操做用於排序數據。在排序以前,能夠將數據經過 f 函數進行處理,以後按照 f 函數處理
的結果進行排序,默認爲升序排列
雙Value類型算子補充:
1. intersection: 對源 RDD 和參數 RDD 求交集後返回一個新的 RDD
2. union: 對源 RDD 和參數 RDD 求並集後返回一個新的 RDD
3. subtract: 以一個 RDD 元素爲主, 去除兩個 RDD 中重複元素,將其餘元素保留下來
觸發代碼的運行,咱們一段spark代碼裏面至少須要有一個action操做。
經常使用的Action:
動做 | 含義 | 示例 |
---|---|---|
reduce(func) | 經過func函數彙集RDD中的全部元素,能夠實現,RDD中元素的累加,計數和其餘類型的彙集操做 | var rdd = sc.parallelize(1 to 10) rdd.reduce((x, y) => x+y) |
reduceByKey(func) | 按key進行reduce,讓key合併 | wordcount示例: var rdd = sc.parallelize(List(「hello world」, 「hello spark」, 「hello hdfs」)) rdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((x,y) => x+y).collect() |
collect() | 在驅動程序中,以數組的形式返回數據集的全部元素 | |
count() | 返回RDD的元素個數 | |
first() | 返回RDD的第一個元素(相似於take(1)) | |
take(n) | 返回一個由數據集的前n個元素組成的數組 | |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 | rdd.saveAsTextFile("/user/jd_ad/ads_platform/outergd/0124/demo2.csv") |
foreach(func) | 在數據集的每個元素上,運行函數func進行更新。 | |
takeSample | 抽樣返回一個dateset中的num個元素 | var rdd = sc.parallelize(1 to 10) rdd.takeSample(false,10) |
輸入可能以多個文件的形式存儲在HDFS上,每一個File都包含了不少塊,稱爲Block。
當Spark讀取這些文件做爲輸入時,會根據具體數據格式對應的InputFormat進行解析,通常是將若干個Block合併成一個輸入分片,稱爲InputSplit,注意InputSplit不能跨越文件。
隨後將爲這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關係。
隨後這些具體的Task每一個都會被分配到集羣上的某個節點的某個Executor去執行。
注意: 這裏的core是虛擬的core而不是機器的物理CPU核,能夠理解爲就是Executor的一個工做線程。
而 Task被執行的併發度 = Executor數目 * 每一個Executor核數。
至於partition的數目:
RDD之間有一系列的依賴關係,依賴關係又分爲窄依賴和寬依賴。簡單的區分發,能夠看一下父RDD中的數據是否進入不一樣的子RDD,若是隻進入到一個子RDD則是窄依賴,不然就是寬依賴。以下圖
窄依賴( narrow dependencies )
寬依賴( wide dependencies )
Spark任務會根據RDD之間的依賴關係,造成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG劃分相互依賴的多個stage,劃分stage的依據就是RDD之間的寬窄依賴。遇到寬依賴就劃分stage,每一個stage包含一個或多個task任務。而後將這些task以taskSet的形式提交給TaskScheduler運行。 stage是由一組並行的task組成。切割規則:從後往前,遇到寬依賴就切割stage,遇到窄依賴就將這個RDD加入該stage中。 以下圖
理解YARN-Client和YARN-Cluster深層次的區別以前先清楚一個概念:Application Master。在YARN中,每一個Application實例都有一個ApplicationMaster進程,它是Application啓動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源以後告訴NodeManager爲其啓動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別
YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督做業的運行情況。當用戶提交了做業以後,就能夠關掉Client,做業會繼續在YARN上運行,於是YARN-Cluster模式不適合運行交互類型的做業
YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通訊來調度他們工做,也就是說Client不能離開
下圖是幾種模式下的比較:
Spark在DAG調度階段會將一個Job劃分爲多個Stage,上游Stage作map工做,下游Stage作reduce工做,其本質上仍是MapReduce計算框架。Shuffle是鏈接map和reduce之間的橋樑,它將map的輸出對應到reduce輸入中,這期間涉及到序列化反序列化、跨節點網絡IO以及磁盤讀寫IO等,因此說Shuffle是整個應用程序運行過程當中很是昂貴的一個階段,理解Spark Shuffle原理有助於優化Spark應用程序。
注:
1.什麼是大數據處理的Shuffle?
不管是Hadoop仍是Spark,都要實現Shuffle。Shuffle描述數據從map tasks的輸出到reduce tasks輸入的這段過程。
2.爲何須要進行Shuffle呢?
map tasks的output向着reduce tasks的輸入input映射的時候,並不是節點一一對應的,在節點A上作map任務的輸出結果,可能要分散跑到reduce節點A、B、C、D ,就好像shuffle的字面意思「洗牌」同樣,這些map的輸出數據要打散而後根據新的路由算法(好比對key進行某種hash算法),發送到不一樣的reduce節點上去。
MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先partition、key對中間結果進行排序合併。這樣的好處在於 combine/reduce() 能夠處理大規模的數據,由於其輸入數據能夠經過外排獲得(mapper 對每段數據先作排序,reducer 的 shuffle 對排好序的每段數據作歸併)。
前面已經提到,在DAG調度的過程當中,Stage階段的劃分是根據是否有shuffle過程,也就是存在ShuffleDependency寬依賴的時候,須要進行shuffle,這時候會將做業job劃分紅多個Stage;
Spark的Shuffle實現大體以下圖所示,在DAG階段以shuffle爲界,劃分stage,上游stage作map task,每一個map task將計算結果數據分紅多份,每一份對應到下游stage的每一個partition中,並將其臨時寫到磁盤,該過程叫作shuffle write;下游stage作reduce task,每一個reduce task經過網絡拉取上游stage中全部map task的指定分區結果數據,該過程叫作shuffle read,最後完成reduce的業務邏輯。
下圖是spark shuffle實現的一個版本演進。
RDD持久化級別
持久化級別 | 含義解釋 |
---|---|
MEMORY_ONLY | 使用未序列化的Java對象格式,將數據保存在內存中。若是內存不夠存放全部的數據,則數據可能就不會進行持久化。那麼下次對這個RDD執行算子操做時,那些沒有被持久化的數據,須要從源頭處從新計算一遍。這是默認的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。 |
DISK_ONLY | 使用未序列化的Java對象格式,將數據所有寫入磁盤文件中。 |
MEMORY_ONLY_SER | 基本含義同MEMORY_ONLY。惟一的區別是,會將RDD中的數據進行序列化,RDD的每一個partition會被序列化成一個字節數組。這種方式更加節省內存,從而能夠避免持久化的數據佔用過多內存致使頻繁GC。 |
MEMORY_AND_DISK | 使用未序列化的Java對象格式,優先嚐試將數據保存在內存中。若是內存不夠存放全部的數據,會將數據寫入磁盤文件中,下次對這個RDD執行算子時,持久化在磁盤文件中的數據會被讀取出來使用。 |
MEMORY_AND_DISK_SER | 基本含義同MEMORY_AND_DISK。惟一的區別是,會將RDD中的數據進行序列化,RDD的每一個partition會被序列化成一個字節數組。這種方式更加節省內存,從而能夠避免持久化的數據佔用過多內存致使頻繁GC。 |
MEMORY_ONLY
,而persist能夠根據狀況設置其它的緩存級別
。checkpoint的兩大做用:
一是spark程序長期駐留,過長的依賴會佔用不少的系統資源,按期checkpoint能夠有效的節省資源;
二是維護過長的依賴關係可能會出現問題,一旦spark程序運行失敗,RDD的容錯成本會很高。
(注:checkpoint執行前要先進行cache,避免兩次計算。)