spark 基礎

scala版 ,基本名詞概念及 rdd的基本建立及使用apache

 

var conf = new SparkConf()網絡

var sc: SparkContext = new SparkContext(conf)多線程

val rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3)  app

           

# sc.parallelize(,3)  將數據並行加載到三臺機器上框架

 

var tmpRDDA1 = rawRDDA.flatMap(line=>line.split(" "))spa

var tmpRDDA2 = tmpRDDA1.filter(allWord=>{allWord.contains("aa") || allWord.contains("bb")})線程

var tmpRDDA3 = tmpRDDA2.map(word=>(word,1))scala

import org.apache.spark.HashPartitioner對象

var tmpRDDA4 = tmpRDDA.partitionBy(new HashPartitioner(2)).groupByKey()接口

 

#partitionBy(new HashPartitioner(2)).groupByKey  將以前的3臺機器Shuffle成兩臺機器

 

var tmpResultRDDA = tmpRDDA4.map((P:(String,Iterable[Int]))=>(P._1,P._2.sum))

#對相同的key的value進行求和

Partition :某機上一個固定數據塊 , 一系列相關Partition組合爲一個RDD  。

                如tmpRDDA2擁有3個Partition ,而 tmpResultRDDA擁有兩個Partition

 

RDD :數據統一操做所在地, 代碼中任意一個操做(如faltMap,filter,map), RDD內的全部Partition都會執行

          如在rawRDDA->tmpRDDA1時 ,執行flatMap(line=>line.split(" ")),則rawRDD 的三個Partition (分別爲 cslave0上的「!! bb ## cc」,

          cslave1上的「-- cc bb $$」和cslave2上的「cc ^^ ++ aa」都要執行flatMap操做)

 

RDD 是數據並行化所在地 ,隸屬於某RDD的全部Partition都要執行相同操做,當這些Partition存在於不一樣機器,就會由不一樣機器同時執 

        行,也就是並行執行

 

RDD並行化範式主要有Map和Shuffle

        Map 範式 :只對本Partition上的數據進行操做, 操做的數據對象不跨越多個Partition,即不跨越網絡 。

        Shuffle範式 : 對不一樣Partition上的數據進行重組,其操做的數據對象跨越多個甚至是全部Partition ,即跨越網絡

 

 

 

場景 :多輸入源

兩個原始文件rawFile1 和 rawFile2,要求將rawFile1的內容均勻加載到cslave3,cslave4上,接着對rawFile1進行數據去重,

要求將rawFile2加載到cslave5,而後將rawFile1的處理結果中 去掉rawFile2中所含的條目 

var conf = new SparkConf()

var sc: SparkContext = new SparkContext(conf)

var rawRDDB = sc.parallelize(List(("xx",99),("yy",88),("xx",99),("zz",99)),2)

var rawRDDC = sc.parallelize(List(("yy",88)),1)

var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)

 

subtract()就是兩個RDD相減,而這兩個RDD來自不一樣的輸入文件

       

 

場景:複雜狀況

初始化多個rdd,相互取並集或差集

多輸入源,去重,裝換,再合併

var conf = new SparkConf()

var sc:SparkContext = new SparkContext(conf)

var rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3)

var rawRDDB = sc.paralleliz(List(("xx,99),("yy",88),("xx",99),("zz",99)),2)

var rawRDDC = sc.parallelize(List(("yy",88)),1)

import org.apache.spark.HashPartitioner

var tmpResultRDDA = rawRDDA.flatMap(line=>line.split(" ")).filter(allWord=>{allWord.contains("aa")||allWord.contains("bb")}).map(word=>(word,1)).partitionBy(new HashPartitioner(2)).groupByKey().map((P:String,Iterable[Int]))=>(P._1,P._2.sum))

var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)

var resultRDDABC = tmpResultRDDA.union(tmpResultRDDBC)

resultRDDABC.saveAsTextFile("HDFS路徑")

 

map範式做用於RDD時,不會改變先後兩個RDD內Partition數量, 當partitionBy,union做用於RDD時,會改變先後兩個RDD內Partition數量

 

RDD持久化到HDFS時,RDD對應一個文件夾,屬於該RDD的每一個Partition對應一個獨立文件

RDD之間的中間數據不存入本地磁盤或HDFS

RDD的多個操做能夠用點‘.’鏈接,如 RDD1.map().filter().groupBy()

 

RDD能夠對指定的某個Partition進行操做,而不更改其餘Partition

 

Spark-app執行流程:
1.用戶調用RDD API接口,編寫rdd轉換應用代碼

2.使用spark提交job到Master

3.Master收到job,通知各個Worker啓動Executor

4.各個Executor向Driver註冊 (用戶編寫的代碼和提交任務的客戶端統一稱Driver)

5.RDD Graph將用戶的RDD串組織成DAG-RDD

6.DAGSchedule 以Shuffle爲原則(即遇Shuffle就拆分)將DAG-RDD拆分紅一系列StageDAG-RDD(StageDAG-RDD0->StageDAG-RDD1->StageDAG-rdd2->...)

7.RDD經過訪問NameNode,將DataNode上的數據塊裝入RDD的Partition

8.TaskSchedule將StageDAG-RDD0發往隸屬於本RDD的全部Partition執行,在Partition執行過程當中,Partition上的Executor優先執行本Partition.

9.TaskSchedule將StageDAG-RDD1發往隸屬於本RDD(已改變)的全部Partition執行

10.重複上面8,9步的步驟,直至執行完全部Stage-DAG-RDD

 

 

資源隔離性

每一個執行的Spark-APP都有本身一系列的Executor進程(分佈在不一樣的機器上或內核上),這些Executor會協做完成該任務。

單個Executor會以多線程複用方式運行該Spark-APP分配來的全部Task .

一個Executor只屬於一個Spark-APP,一個Spark-APP能夠有多個Executor

這與MapReduce不一樣。  好比某個由Map->Reduce1->Reduce2構成的ML-App,有十個Slave同時執行該任務,從某一個slave機器上來看,

MapReduce框架執行時會啓動Map進程,Reduce1進程,Reduce2進程,三個進程順序執行該任務

而Spark則使用一個Executor進程完成這四個操做。

 

spark-APP自己感知不到集羣的存在

相關文章
相關標籤/搜索