Spark原理及Java操做

Spark與MapReduce

Spark 運算比 Hadoop 的 MapReduce 框架快的緣由是由於 Hadoop 在一次 MapReduce 運算以後,會將數據的運算結果從內存寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取數據,因此其瓶頸在2次運算間的多餘 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算獲得最後的結果,再將結果寫入到磁盤,因此屢次運算的狀況下, Spark 是比較快的. 其優化了迭代式工做負載。html

網上看到的圖片,抄過來的:java

Spark的Master節點與Worker節點

  • Master

在集羣中master節點上運行着master進程以及Driver,master進程將用戶提交的job轉爲可並行執行的任務集Tasks。並將這些Tasks分配給集羣上的Worker,Driver進程負責執行用戶編寫的application。算法

  • Worker

在集羣中,除了master節點負責管理集羣的調度之外,還有worker,worker中存在executer進程,每一個worker能夠存在一個或多個executer進程,每一個executer都擁有一個線程池,線程池中的每一個線程都負責一個Task的執行。 每一個executer可並行執行的Task數量是根據cpu core來的。executer擁有多少cpu core就能同時並行執行多少Task。數組

  • Master掛了怎麼辦?

若是master掛了想恢復集羣的正常運行,須要依靠zookeeper,zookeeper記錄了集羣中全部的worker、driver、application。當master掛掉後,zookeeper根據自己的選舉算法,在集羣中選出一個worker做爲新的master。在恢復master的這段時間內,用戶沒法提交新的job。緩存

  • 某個worker掛了怎麼辦?

在集羣中若是某個正在執行task任務的worker掛掉了,master會從新把該worker負責的task分配給其餘worker。在worker掛掉的這段時間內,若是worker長時間(默認是60s)沒有上報master心跳。master則會將該worker從集羣中移除,並標識DEADapp

Spark RDD

RDD(Resilient Distributed Dataset): 一個可並行操做的有容錯機制的數據集合,有 2 種方式建立 RDDs:第一種是在你的驅動程序中並行化一個已經存在的集合;另一種是引用一個外部存儲系統的數據集,例如共享的文件系統,HDFS,HBase或其餘 Hadoop 數據格式的數據源。框架

以上描述在實操中會更好理解。在未理解它以前把它看作是一種數據集合便可。jvm

RDD 特色

  • 它是在集羣節點上的不可變的、已分區的集合對象;
  • 經過並行轉換的方式來建立(如 Map、 filter、join 等);
  • 失敗自動重建;
  • 能夠控制存儲級別(內存、磁盤等)來進行重用;
  • 必須是可序列化的;
  • 是靜態類型的(只讀)。

RDD的操做函數主要分爲兩種Transformation和Action

Transformation: 返回值爲RDD,不會立刻提交任務去集羣執行函數

Action: 返回值不是RDD,造成DAG圖後,將任務提交到集羣執行,並返回結果。oop

使用RDD操做數據

統計某個文件的總字數:

// 設置appname和master,這裏設置了local[2]的意思是在本地以2個CPU核心執行這個任務。或者使用:"spark://master_hostname:7077"指定任務到遠程的機器執行。
SparkConf sparkConf = new SparkConf().setAppName("firstDemo").setMaster("local[2]");
// 建立SoarkContext對象
JavaSparkContext sc = new JavaSparkContext(sparkConf);

// 讀取文件
final JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt");
// 統計字數
Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b)

建立RDD的兩種方式:

第一種,使用RDD並行化一個已有的集合:

List<Integer> integers = Arrays.asList(1, 1, 2, 2, 3, 4, 5);
// 把數據並行化爲RDD
JavaRDD<Integer> parallelize = sc.parallelize(integers);
List<Integer> collect = parallelize.distinct().collect();

第二種,使用外部數據建立RDD。好比本地磁盤、HDFS、HBase等

// Spark能夠根據本地文件系統、HDFS、Hbase等做爲數據源,而後進行操做。
// 指定不一樣的數據源,只須要有對應的uri便可好比hdfs的:"hdfs://"
JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt");
Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b);

Spark的懶加載

spark的懶加載與scala的lazy val是有關係的。scala Lazy vals介紹

咱們把一個文件讀取出來:

JavaRDD<String> lazyLoad = sc.textFile("/usr/test/test.txt");

按照通常程序的執行流程,執行這種操做spark會立馬把數據從磁盤中讀取出來放到內存中。可是事實卻不是這樣的,這裏的操做,只是把地址映射了起來,並無去把它加載到內存中去。

對數據進行map操做,轉換爲另一個RDD:

JavaRDD<Integer> lineLengths = lazyLoad.map(String::length)

這裏的操做也沒有真正去執行,只是定義了把JavaRDD<String>轉換爲JavaRDD<Integer>的操做。

最後統計出test.txt的字數:

Integer count = lineLengths.reduce((a, b) -> a + b)

該操做是真正的計算,當spark的操做到這步時,纔會真正將計算分解在集羣中的機器上運行。

將RDD操做保存到內存中,供下次使用:

lineLengths.persist(StorageLevel.MEMORY_ONLY())

Spark 內存分配及緩存機制

spark支持將某次的RDD操做保存到內存中,以便以後其餘操做複用該RDD的數據。這樣使得以後的操做更快,由於複用的數據不須要從新計算,直接從緩存中取便可。若是在內存分區中,緩存的RDD數據丟失,spark會執行RDD從新計算,並放到緩存中。

當咱們在代碼中執行了cache/persist等持久化操做時,spark會根據咱們設置的緩存級別不一樣,每一個task計算出來的數據會保存到task所在節點的內存或磁盤中。

主要分爲三塊:

  1. task在執行咱們寫的代碼時佔用到的內存,默認佔總內存的20%
  2. Task經過shuffle過程拉取了上一個stage的Task的輸出後,進行聚合等操做時使用的內存,默認也是佔總內存的20%
  3. RDD持久化使用到的內存總共佔60%
  • spark RDD一共有如下幾種緩存級別
Storage Level description
MEMORY_ONLY 默認級別,將RDD操做做爲序列化的java對象存儲在jvm中,若是內存放不下所有的RDD操做。那麼沒法緩存的RDD操做在下次須要時再從新計算,而已經緩存的部分就直接使用。該級別只使用內存,不使用磁盤。效率很是高。
MEMORY_AND_DISK 將RDD操做做爲序列化的java對象存儲在jvm中,若是內存放不下所有的RDD操做。那麼沒法緩存的RDD操做會持久化到磁盤上,並在須要時從磁盤中取出來。該級別須要使用的內存和磁盤。效率中等
MEMORY_ONLY_SER 將RDD存儲爲序列化Java對象(每一個分區一個字節的數組),與反序列化對象相比,它更節省空間,特別是當它使用快速序列化器時。但它增長了CPU的開銷。在此級別中,存儲空間較小,CPU計算時間較長,數據存儲在內存中。它不使用磁盤。
MEMORY_AND_DISK_SER MEMORY_ONLY_SER,但它會將不適合內存的分區丟棄到磁盤,而不是每次須要時從新計算。在此存儲級別中,用於存儲的空間較低,CPU計算時間較長,它使用內存和磁盤存儲。
DISK_ONLY 在此存儲級別中,RDD僅存儲在磁盤上。用於存儲的空間很小,CPU計算時間很長,而且它利用磁盤存儲。
相關文章
相關標籤/搜索