Spark 運算比 Hadoop 的 MapReduce 框架快的緣由是由於 Hadoop 在一次 MapReduce 運算以後,會將數據的運算結果從內存寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取數據,因此其瓶頸在2次運算間的多餘 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算獲得最後的結果,再將結果寫入到磁盤,因此屢次運算的狀況下, Spark 是比較快的. 其優化了迭代式工做負載。html
網上看到的圖片,抄過來的:java
在集羣中master節點上運行着master進程以及Driver,master進程將用戶提交的job轉爲可並行執行的任務集Tasks。並將這些Tasks分配給集羣上的Worker,Driver進程負責執行用戶編寫的application。算法
在集羣中,除了master節點負責管理集羣的調度之外,還有worker,worker中存在executer進程,每一個worker能夠存在一個或多個executer進程,每一個executer都擁有一個線程池,線程池中的每一個線程都負責一個Task的執行。 每一個executer可並行執行的Task數量是根據cpu core來的。executer擁有多少cpu core就能同時並行執行多少Task。數組
若是master掛了想恢復集羣的正常運行,須要依靠zookeeper,zookeeper記錄了集羣中全部的worker、driver、application。當master掛掉後,zookeeper根據自己的選舉算法,在集羣中選出一個worker做爲新的master。在恢復master的這段時間內,用戶沒法提交新的job。緩存
在集羣中若是某個正在執行task任務的worker掛掉了,master會從新把該worker負責的task分配給其餘worker。在worker掛掉的這段時間內,若是worker長時間(默認是60s)沒有上報master心跳。master則會將該worker從集羣中移除,並標識DEADapp
RDD(Resilient Distributed Dataset): 一個可並行操做的有容錯機制的數據集合,有 2 種方式建立 RDDs:第一種是在你的驅動程序中並行化一個已經存在的集合;另一種是引用一個外部存儲系統的數據集,例如共享的文件系統,HDFS,HBase或其餘 Hadoop 數據格式的數據源。框架
以上描述在實操中會更好理解。在未理解它以前把它看作是一種數據集合便可。jvm
Transformation: 返回值爲RDD,不會立刻提交任務去集羣執行函數
Action: 返回值不是RDD,造成DAG圖後,將任務提交到集羣執行,並返回結果。oop
統計某個文件的總字數:
// 設置appname和master,這裏設置了local[2]的意思是在本地以2個CPU核心執行這個任務。或者使用:"spark://master_hostname:7077"指定任務到遠程的機器執行。 SparkConf sparkConf = new SparkConf().setAppName("firstDemo").setMaster("local[2]"); // 建立SoarkContext對象 JavaSparkContext sc = new JavaSparkContext(sparkConf); // 讀取文件 final JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt"); // 統計字數 Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b)
第一種,使用RDD並行化一個已有的集合:
List<Integer> integers = Arrays.asList(1, 1, 2, 2, 3, 4, 5); // 把數據並行化爲RDD JavaRDD<Integer> parallelize = sc.parallelize(integers); List<Integer> collect = parallelize.distinct().collect();
第二種,使用外部數據建立RDD。好比本地磁盤、HDFS、HBase等
// Spark能夠根據本地文件系統、HDFS、Hbase等做爲數據源,而後進行操做。 // 指定不一樣的數據源,只須要有對應的uri便可好比hdfs的:"hdfs://" JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt"); Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b);
spark的懶加載與scala的lazy val是有關係的。scala Lazy vals介紹
咱們把一個文件讀取出來:
JavaRDD<String> lazyLoad = sc.textFile("/usr/test/test.txt");
按照通常程序的執行流程,執行這種操做spark會立馬把數據從磁盤中讀取出來放到內存中。可是事實卻不是這樣的,這裏的操做,只是把地址映射了起來,並無去把它加載到內存中去。
對數據進行map操做,轉換爲另一個RDD:
JavaRDD<Integer> lineLengths = lazyLoad.map(String::length)
這裏的操做也沒有真正去執行,只是定義了把
JavaRDD<String>
轉換爲JavaRDD<Integer>
的操做。
最後統計出test.txt的字數:
Integer count = lineLengths.reduce((a, b) -> a + b)
該操做是真正的計算,當spark的操做到這步時,纔會真正將計算分解在集羣中的機器上運行。
將RDD操做保存到內存中,供下次使用:
lineLengths.persist(StorageLevel.MEMORY_ONLY())
spark支持將某次的RDD操做保存到內存中,以便以後其餘操做複用該RDD的數據。這樣使得以後的操做更快,由於複用的數據不須要從新計算,直接從緩存中取便可。若是在內存分區中,緩存的RDD數據丟失,spark會執行RDD從新計算,並放到緩存中。
當咱們在代碼中執行了cache/persist等持久化操做時,spark會根據咱們設置的緩存級別不一樣,每一個task計算出來的數據會保存到task所在節點的內存或磁盤中。
主要分爲三塊:
Storage Level | description |
---|---|
MEMORY_ONLY | 默認級別,將RDD操做做爲序列化的java對象存儲在jvm中,若是內存放不下所有的RDD操做。那麼沒法緩存的RDD操做在下次須要時再從新計算,而已經緩存的部分就直接使用。該級別只使用內存,不使用磁盤。效率很是高。 |
MEMORY_AND_DISK | 將RDD操做做爲序列化的java對象存儲在jvm中,若是內存放不下所有的RDD操做。那麼沒法緩存的RDD操做會持久化到磁盤上,並在須要時從磁盤中取出來。該級別須要使用的內存和磁盤。效率中等 |
MEMORY_ONLY_SER | 將RDD存儲爲序列化Java對象(每一個分區一個字節的數組),與反序列化對象相比,它更節省空間,特別是當它使用快速序列化器時。但它增長了CPU的開銷。在此級別中,存儲空間較小,CPU計算時間較長,數據存儲在內存中。它不使用磁盤。 |
MEMORY_AND_DISK_SER | MEMORY_ONLY_SER,但它會將不適合內存的分區丟棄到磁盤,而不是每次須要時從新計算。在此存儲級別中,用於存儲的空間較低,CPU計算時間較長,它使用內存和磁盤存儲。 |
DISK_ONLY | 在此存儲級別中,RDD僅存儲在磁盤上。用於存儲的空間很小,CPU計算時間很長,而且它利用磁盤存儲。 |