其餘更多java基礎文章:
java基礎學習(目錄)java
RDD 是 Spark 的計算模型。RDD(Resilient Distributed Dataset)叫作彈性的分佈式數據集合,是 Spark 中最基本的數據抽象,它表明一個不可變、只讀的,被分區的數據集。操做 RDD 就像操做本地集合同樣,有不少的方法能夠調用,使用方便,而無需關心底層的調度細節。緩存
Spark Core爲咱們提供了三種建立RDD的方式,包括:安全
spark程序須要作的第一件事情,就是建立一個SparkContext對象,它將告訴spark如何訪問一個集羣,而要建立一個SparkContext對象,你首先要建立一個SparkConf對象,該對象訪問了你的應用程序的信息,好比下面的代碼:bash
SparkConf conf=new SparkConf();
conf.set("參數", "參數值"); //由於jvm沒法得到足夠的資源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
複製代碼
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
//並行集合,是經過對於驅動程序中的集合調用JavaSparkContext.parallelize來構建的RDD
JavaRDD<Integer> distData = sc.parallelize(data);
複製代碼
//經過hdfs上的文件定義一個RDD 這個數據暫時尚未加載到內存,也沒有在上面執行動做,lines僅僅指向這個文件
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
複製代碼
RDD支持兩種類型的操做算子:Transformation與Action。jvm
Transformation操做會由一個RDD生成一個新的 RDD。Transformation操做是延遲計算的,也就是說從一個RDD轉換生成另外一個RDD的轉換操做不是立刻執行,須要等到Actions操做時,才真正開始運算。分佈式
Action操做會對 RDD 計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如 HDFS)中。函數
例如,first() 就是的一個行動操做,它會返回 RDD 的第一個元素。post
result = testlines.first()
複製代碼
transformations操做和Action操做的區別在於Spark計算RDD 的方式不一樣。對於在任什麼時候候transformations獲得的新的RDD,Spark只會惰性計算。只有在一個Action操做中用到時,纔會真正計算。這種策略也是spark性能高的部分緣由。性能
好比,咱們讀取一個文本文件建立一個RDD,而後把其中包含spark的行篩選出來。若是Spark在咱們運行lines = sc.textFile(test.txt) 時就把文件中全部的行都讀取到內存中並存儲起來,內存開銷會很大,而咱們接下來的操做會篩選掉其中的不少數據。相反, 若是Spark 在知道了完整的轉化操做鏈以後,它就能夠只計算求結果時真正須要的數據。
事實上,在執行行動操做 first()時,Spark也只是掃描文件直到找到第一個匹配的行爲止,而不是讀取整個文件。
RDD還有一個叫持久化的機制,就是在不一樣操做間,持久化(或緩存)一個數據集在內存中。當你持久化一個RDD,每個結點都將把它的計算分塊結果保存在內存中,並在對此數據集(或者衍生出的數據集)進行的其它動做中重用。這將使得後續的動做(action)變得更加迅速。緩存是用Spark構建迭代算法的關鍵。RDD的緩存可以在第一次計算完成後,將計算結果保存到內存、本地文件系統或者Tachyon(分佈式內存文件系統)中。經過緩存,Spark避免了RDD上的重複計算,可以極大地提高計算速度。在Spark應用程序的調優中就會考慮到RDD的持久化的機制。
順便看一下RDD都有哪些緩存級別,查看 StorageLevel 類的源碼:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
複製代碼
查看其構造參數
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
複製代碼
能夠看到StorageLevel類的主構造器包含了5個參數:
理解了這5個參數,StorageLevel 的12種緩存級別就不難理解了。例如:
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
複製代碼
就表示使用這種緩存級別的RDD將存儲在硬盤以及內存中,使用序列化(在硬盤中),而且在多個節點上備份2份(正常的RDD只有一份)
注意: 持久化的單位爲Partition
注意: 當使用RDD的MEMORY_ONLY進行持久化的時候,當內存空間不夠的時候,不會報OOM,它會選擇最小的partiton來持久化在內存,當從新的使用RDD時候,其餘的partition會根據依賴關係從新計算
Spark的多個存儲級別意味着在內存利用率和cpu利用效率間的不一樣權衡。咱們推薦經過下面的過程選擇一個合適的存儲級別:
當業務場景很是的複雜的時候,RDD的lineage(血統)依賴會很是的長,一旦血統較後面的RDD數據丟失的時候,Spark會根據血統依賴從新的計算丟失的RDD,這樣會形成計算的時間過長,Spark提供了一個叫checkPoint的算子來解決這樣的業務場景。
Spark兩種共享變量:廣播變量(broadcast variable)與累加器(accumulator)
累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。
有時會變量是在driver端建立的,可是由於須要在excutor端使用,因此driver會把變量以task的形式發送到excutor端,若是有不少個task,就會有不少給excutor端攜帶不少個變量,若是這個變量很是大的時候,就可能會形成內存溢出(以下圖所示)。這個時候就引出了廣播變量。
int factor = 3;
final Broadcast<Integer> factorBroadcast = sc.broadcast(factor);
...
//在RDD計算中
int factor = factorBroadcast.value();
複製代碼
另外,爲了確保全部的節點得到相同的變量,對象factorBroadcast廣播後只讀不可以被修改。
注意事項:
Spark提供的Accumulator,主要用於多個節點對一個變量進行共享性的操做。Accumulator只提供了累加的功能,可是卻給咱們提供了多個task對一個變量並行操做的功能。task只能對Accumulator進行累加操做,不能讀取它的值。只有Driver程序能夠讀取Accumulator的值。
final Accumulator<Integer> sum = sc.accumulator(0);
...
//RDD計算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());
複製代碼
累加器只能由Spark內部進行更新,並保證每一個任務在累加器的更新操做僅執行一次,也就是說重啓任務也不該該更新。在轉換操做中,用戶必須意識到任務和做業的調度過程從新執行會形成累加器的屢次更新。
累加器一樣具備Spark懶加載的求值模型。若是它們在RDD的操做中進行更新,它們的值只在RDD進行行動操做時才進行更新。