scala版 ,基本名詞概念及 rdd的基本建立及使用apache
var conf = new SparkConf()網絡
var sc: SparkContext = new SparkContext(conf)多線程
val rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3) app
# sc.parallelize(,3) 將數據並行加載到三臺機器上框架
var tmpRDDA1 = rawRDDA.flatMap(line=>line.split(" "))spa
var tmpRDDA2 = tmpRDDA1.filter(allWord=>{allWord.contains("aa") || allWord.contains("bb")})線程
var tmpRDDA3 = tmpRDDA2.map(word=>(word,1))scala
import org.apache.spark.HashPartitioner對象
var tmpRDDA4 = tmpRDDA.partitionBy(new HashPartitioner(2)).groupByKey()接口
#partitionBy(new HashPartitioner(2)).groupByKey 將以前的3臺機器Shuffle成兩臺機器
var tmpResultRDDA = tmpRDDA4.map((P:(String,Iterable[Int]))=>(P._1,P._2.sum))
#對相同的key的value進行求和
Partition :某機上一個固定數據塊 , 一系列相關Partition組合爲一個RDD 。
如tmpRDDA2擁有3個Partition ,而 tmpResultRDDA擁有兩個Partition
RDD :數據統一操做所在地, 代碼中任意一個操做(如faltMap,filter,map), RDD內的全部Partition都會執行
如在rawRDDA->tmpRDDA1時 ,執行flatMap(line=>line.split(" ")),則rawRDD 的三個Partition (分別爲 cslave0上的「!! bb ## cc」,
cslave1上的「-- cc bb $$」和cslave2上的「cc ^^ ++ aa」都要執行flatMap操做)
RDD 是數據並行化所在地 ,隸屬於某RDD的全部Partition都要執行相同操做,當這些Partition存在於不一樣機器,就會由不一樣機器同時執
行,也就是並行執行
RDD並行化範式主要有Map和Shuffle
Map 範式 :只對本Partition上的數據進行操做, 操做的數據對象不跨越多個Partition,即不跨越網絡 。
Shuffle範式 : 對不一樣Partition上的數據進行重組,其操做的數據對象跨越多個甚至是全部Partition ,即跨越網絡
場景 :多輸入源
兩個原始文件rawFile1 和 rawFile2,要求將rawFile1的內容均勻加載到cslave3,cslave4上,接着對rawFile1進行數據去重,
要求將rawFile2加載到cslave5,而後將rawFile1的處理結果中 去掉rawFile2中所含的條目
var conf = new SparkConf()
var sc: SparkContext = new SparkContext(conf)
var rawRDDB = sc.parallelize(List(("xx",99),("yy",88),("xx",99),("zz",99)),2)
var rawRDDC = sc.parallelize(List(("yy",88)),1)
var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)
subtract()就是兩個RDD相減,而這兩個RDD來自不一樣的輸入文件
場景:複雜狀況
初始化多個rdd,相互取並集或差集
多輸入源,去重,裝換,再合併
var conf = new SparkConf()
var sc:SparkContext = new SparkContext(conf)
var rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3)
var rawRDDB = sc.paralleliz(List(("xx,99),("yy",88),("xx",99),("zz",99)),2)
var rawRDDC = sc.parallelize(List(("yy",88)),1)
import org.apache.spark.HashPartitioner
var tmpResultRDDA = rawRDDA.flatMap(line=>line.split(" ")).filter(allWord=>{allWord.contains("aa")||allWord.contains("bb")}).map(word=>(word,1)).partitionBy(new HashPartitioner(2)).groupByKey().map((P:String,Iterable[Int]))=>(P._1,P._2.sum))
var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)
var resultRDDABC = tmpResultRDDA.union(tmpResultRDDBC)
resultRDDABC.saveAsTextFile("HDFS路徑")
map範式做用於RDD時,不會改變先後兩個RDD內Partition數量, 當partitionBy,union做用於RDD時,會改變先後兩個RDD內Partition數量
RDD持久化到HDFS時,RDD對應一個文件夾,屬於該RDD的每一個Partition對應一個獨立文件
RDD之間的中間數據不存入本地磁盤或HDFS
RDD的多個操做能夠用點‘.’鏈接,如 RDD1.map().filter().groupBy()
RDD能夠對指定的某個Partition進行操做,而不更改其餘Partition
Spark-app執行流程:
1.用戶調用RDD API接口,編寫rdd轉換應用代碼
2.使用spark提交job到Master
3.Master收到job,通知各個Worker啓動Executor
4.各個Executor向Driver註冊 (用戶編寫的代碼和提交任務的客戶端統一稱Driver)
5.RDD Graph將用戶的RDD串組織成DAG-RDD
6.DAGSchedule 以Shuffle爲原則(即遇Shuffle就拆分)將DAG-RDD拆分紅一系列StageDAG-RDD(StageDAG-RDD0->StageDAG-RDD1->StageDAG-rdd2->...)
7.RDD經過訪問NameNode,將DataNode上的數據塊裝入RDD的Partition
8.TaskSchedule將StageDAG-RDD0發往隸屬於本RDD的全部Partition執行,在Partition執行過程當中,Partition上的Executor優先執行本Partition.
9.TaskSchedule將StageDAG-RDD1發往隸屬於本RDD(已改變)的全部Partition執行
10.重複上面8,9步的步驟,直至執行完全部Stage-DAG-RDD
資源隔離性
每一個執行的Spark-APP都有本身一系列的Executor進程(分佈在不一樣的機器上或內核上),這些Executor會協做完成該任務。
單個Executor會以多線程複用方式運行該Spark-APP分配來的全部Task .
一個Executor只屬於一個Spark-APP,一個Spark-APP能夠有多個Executor
這與MapReduce不一樣。 好比某個由Map->Reduce1->Reduce2構成的ML-App,有十個Slave同時執行該任務,從某一個slave機器上來看,
MapReduce框架執行時會啓動Map進程,Reduce1進程,Reduce2進程,三個進程順序執行該任務
而Spark則使用一個Executor進程完成這四個操做。
spark-APP自己感知不到集羣的存在