Spark學習(二)——RDD基礎

其餘更多java基礎文章:
java基礎學習(目錄)java


RDD的建立與方法(代碼詳細)算法

1. RDD概述

RDD 是 Spark 的計算模型。RDD(Resilient Distributed Dataset)叫作彈性的分佈式數據集合,是 Spark 中最基本的數據抽象,它表明一個不可變、只讀的,被分區的數據集。操做 RDD 就像操做本地集合同樣,有不少的方法能夠調用,使用方便,而無需關心底層的調度細節。緩存

2. RDD的建立

Spark Core爲咱們提供了三種建立RDD的方式,包括:安全

  1. 使用程序中的集合建立RDD
  2. 使用HDFS文件建立RDD

2.1 Spark初始化

spark程序須要作的第一件事情,就是建立一個SparkContext對象,它將告訴spark如何訪問一個集羣,而要建立一個SparkContext對象,你首先要建立一個SparkConf對象,該對象訪問了你的應用程序的信息,好比下面的代碼:bash

SparkConf conf=new SparkConf();
        conf.set("參數", "參數值");     //由於jvm沒法得到足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
複製代碼

2.2 使用程序中的集合建立RDD

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //並行集合,是經過對於驅動程序中的集合調用JavaSparkContext.parallelize來構建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);
複製代碼

2.3 使用HDFS文件建立RDD

//經過hdfs上的文件定義一個RDD 這個數據暫時尚未加載到內存,也沒有在上面執行動做,lines僅僅指向這個文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
複製代碼

3. RDD的兩種算子

RDD支持兩種類型的操做算子:Transformation與Action。jvm

3.1 Transformation

Transformation操做會由一個RDD生成一個新的 RDD。Transformation操做是延遲計算的,也就是說從一個RDD轉換生成另外一個RDD的轉換操做不是立刻執行,須要等到Actions操做時,才真正開始運算。分佈式

3.2 Action

Action操做會對 RDD 計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如 HDFS)中。函數

例如,first() 就是的一個行動操做,它會返回 RDD 的第一個元素。post

result = testlines.first()
複製代碼

transformations操做和Action操做的區別在於Spark計算RDD 的方式不一樣。對於在任什麼時候候transformations獲得的新的RDD,Spark只會惰性計算。只有在一個Action操做中用到時,纔會真正計算。這種策略也是spark性能高的部分緣由。性能

好比,咱們讀取一個文本文件建立一個RDD,而後把其中包含spark的行篩選出來。若是Spark在咱們運行lines = sc.textFile(test.txt) 時就把文件中全部的行都讀取到內存中並存儲起來,內存開銷會很大,而咱們接下來的操做會篩選掉其中的不少數據。相反, 若是Spark 在知道了完整的轉化操做鏈以後,它就能夠只計算求結果時真正須要的數據。

事實上,在執行行動操做 first()時,Spark也只是掃描文件直到找到第一個匹配的行爲止,而不是讀取整個文件。

3.3 RDD經常使用算子

Spark32個經常使用算子總結
Spark經常使用算子詳解

4. RDD的持久化機制

blog.csdn.net/weixin_3560…

RDD還有一個叫持久化的機制,就是在不一樣操做間,持久化(或緩存)一個數據集在內存中。當你持久化一個RDD,每個結點都將把它的計算分塊結果保存在內存中,並在對此數據集(或者衍生出的數據集)進行的其它動做中重用。這將使得後續的動做(action)變得更加迅速。緩存是用Spark構建迭代算法的關鍵。RDD的緩存可以在第一次計算完成後,將計算結果保存到內存、本地文件系統或者Tachyon(分佈式內存文件系統)中。經過緩存,Spark避免了RDD上的重複計算,可以極大地提高計算速度。在Spark應用程序的調優中就會考慮到RDD的持久化的機制。

4.1 RDD持久化機制

  • Spark很是重要的一個功能特性就是能夠將RDD 持久化在內存中,當對RDD執行持久化操做時,每一個節點都會將本身操做的RDD的partition持久化到內存中,而且在以後對該RDD的反覆使用中,直接使用內存緩存的partition,這樣的話,對於針對一個RDD反覆執行多個操做的場景,就只要對RDD計算一次便可,後面直接使用該RDD ,而不須要計算屢次該RDD
  • 要持久化一個RDD,只要調用其cache()或者persist()方法便可。可是並非這兩個方法被調用時當即緩存,在該RDD第一次被計算出來時(觸發後面的action時),該RDD將會被緩存在計算節點的內存中,並供後面重用。並且Spark的持久化機制仍是自動容錯的,若是持久化的RDD的任何partition丟失了,那麼Spark會自動經過其源RDD,使用transformation操做從新計算該partition。
  • cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,同時就是調用persist(MEMORY_ONLY),將數據持久化到內存中。若是須要從內存中去除緩存,那麼可使用unpersist()方法。

4.2 RDD的持久化的級別

順便看一下RDD都有哪些緩存級別,查看 StorageLevel 類的源碼:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}
複製代碼

查看其構造參數

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}
複製代碼

能夠看到StorageLevel類的主構造器包含了5個參數:

  • useDisk:使用硬盤(外存)
  • useMemory:使用內存
  • useOffHeap:使用堆外內存,這是Java虛擬機裏面的概念,堆外內存意味着把內存對象分配在Java虛擬機的堆之外的內存,這些內存直接受操做系統管理(而不是虛擬機)。這樣作的結果就是能保持一個較小的堆,以減小垃圾收集對應用的影響。
  • deserialized:反序列化,其逆過程序列化(Serialization)是java提供的一種機制,將對象表示成一連串的字節;而反序列化就表示將字節恢復爲對象的過程。序列化是對象永久化的一種機制,能夠將對象及其屬性保存起來,並能在反序列化後直接恢復這個對象。序列化方式存儲對象能夠節省磁盤或內存的空間,通常 序列化:反序列化=1:3
  • replication:備份數(在多個節點上備份)

理解了這5個參數,StorageLevel 的12種緩存級別就不難理解了。例如:

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
複製代碼

就表示使用這種緩存級別的RDD將存儲在硬盤以及內存中,使用序列化(在硬盤中),而且在多個節點上備份2份(正常的RDD只有一份)

注意: 持久化的單位爲Partition
注意: 當使用RDD的MEMORY_ONLY進行持久化的時候,當內存空間不夠的時候,不會報OOM,它會選擇最小的partiton來持久化在內存,當從新的使用RDD時候,其餘的partition會根據依賴關係從新計算

4.3 選擇持久化級別

Spark的多個存儲級別意味着在內存利用率和cpu利用效率間的不一樣權衡。咱們推薦經過下面的過程選擇一個合適的存儲級別:

  1. 若是你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。由於這是cpu利用率最高的選項,會使RDD上的操做盡量的快。
  2. 若是不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提升對象的空間使用率,可是仍可以至關快的訪問。
  3. 除非函數計算RDD的花費較大或者它們須要過濾大量的數據,不要將RDD存儲到磁盤上,不然,重複計算一個分區就會和重磁盤上讀取數據同樣慢。
  4. 若是你但願更快的錯誤恢復,能夠利用重複存儲級別。全部的存儲級別均可以經過重複計算丟失的數據來支持完整的容錯,可是重複的數據可以使你在RDD上繼續運行任務,而不須要重複計算丟失的數據。
  5. 在擁有大量內存的環境中或者多應用程序的環境中,OFF_HEAP具備以下優點:
    • 它運行多個執行者共享Tachyon中相同的內存池
    • 它顯著地減小垃圾回收的花費
    • 若是單個的執行者崩潰,緩存的數據不會丟失

4.4 checkPoint

當業務場景很是的複雜的時候,RDD的lineage(血統)依賴會很是的長,一旦血統較後面的RDD數據丟失的時候,Spark會根據血統依賴從新的計算丟失的RDD,這樣會形成計算的時間過長,Spark提供了一個叫checkPoint的算子來解決這樣的業務場景。

  • 爲當前RDD設置檢查點。該函數將會建立一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程當中,該RDD的全部依賴於父RDD中的信息將所有被移出。對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。

4.4.1 checkPoint優勢

  1. 持久化在HDFS上,HDFS默認的3副本備份使得持久化的備份數據更加的安全
  2. 切斷RDD的依賴關係:當業務場景複雜的時候,RDD的依賴關係很是的長的時候,當靠後的RDD數據丟失的時候,會經歷較長的從新計算的過程,採用checkPoint會轉爲依賴checkPointRDD,能夠避免長的lineage從新計算。

4.4.2 checkPoint的原理

  1. 當finalRDD執行Action類算子計算job任務的時候,Spark會從finalRDD從後往前回溯查看哪些RDD使用了checkPoint算子
  2. 將使用了checkPoint的算子標記
  3. Spark會自動的啓動一個job來從新計算標記了的RDD,並將計算的結果存入HDFS,而後切斷RDD的依賴關係
  4. 建議在執行checkpoint()方法以前先對rdd進行persisted操做。 在checkPoint的RDD以前先cache RDD,那麼Spark就不用啓動一個job來計算checkPoint的RDD,而是將持久化到內存的數據直接拷貝到HDFS,進而提升Spark的計算速度,提升應用程序的性能

5. 共享變量

Spark共享變量(廣播變量、累加器)

Spark兩種共享變量:廣播變量(broadcast variable)與累加器(accumulator)

累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。

5.1 廣播變量

有時會變量是在driver端建立的,可是由於須要在excutor端使用,因此driver會把變量以task的形式發送到excutor端,若是有不少個task,就會有不少給excutor端攜帶不少個變量,若是這個變量很是大的時候,就可能會形成內存溢出(以下圖所示)。這個時候就引出了廣播變量。

使用廣播變量後:

int factor = 3;
final Broadcast<Integer> factorBroadcast = sc.broadcast(factor);
...
//在RDD計算中
int factor = factorBroadcast.value();
複製代碼

另外,爲了確保全部的節點得到相同的變量,對象factorBroadcast廣播後只讀不可以被修改。

注意事項:

  • 能不能將一個RDD使用廣播變量廣播出去?
    不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。 廣播變量只能在Driver端定義,不能在Executor端定義。

5.2 累加器

Spark提供的Accumulator,主要用於多個節點對一個變量進行共享性的操做。Accumulator只提供了累加的功能,可是卻給咱們提供了多個task對一個變量並行操做的功能。task只能對Accumulator進行累加操做,不能讀取它的值。只有Driver程序能夠讀取Accumulator的值。

final Accumulator<Integer> sum = sc.accumulator(0);
...
//RDD計算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());
複製代碼

累加器只能由Spark內部進行更新,並保證每一個任務在累加器的更新操做僅執行一次,也就是說重啓任務也不該該更新。在轉換操做中,用戶必須意識到任務和做業的調度過程從新執行會形成累加器的屢次更新。

累加器一樣具備Spark懶加載的求值模型。若是它們在RDD的操做中進行更新,它們的值只在RDD進行行動操做時才進行更新。

相關文章
相關標籤/搜索