Spark Core
1、什麼是Spark?(官網:http://spark.apache.org)
1、什麼是Spark?

個人翻譯:Spark是一個針對大規模數據處理的快速通用引擎。html
Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成爲Apache孵化項目,2014年2月成爲Apache頂級項目。目前,Spark生態系統已經發展成爲一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。Spark獲得了衆多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了不少生產系統的推薦算法;騰訊Spark集羣達到8000臺的規模,是當前已知的世界上最大的Spark集羣。es6
2、爲何要學習Spark?
(*)Hadoop的MapReduce計算模型存在的問題:算法
學習過Hadoop的MapReduce的學員都知道,MapReduce的核心是Shuffle(洗牌)。在整個Shuffle的過程當中,至少會產生6次的I/O。下圖是咱們在講MapReduce的時候,畫的Shuffle的過程。shell
中間結果輸出:基於MapReduce的計算引擎一般會將中間結果輸出到磁盤上,進行存儲和容錯。另外,當一些查詢(如:Hive)翻譯到MapReduce任務時,每每會產生多個Stage(階段),而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每個Stage的輸出結果,而I/O的效率每每較低,從而影響了MapReduce的運行速度。apache
(*)Spark的最大特色:基於內存編程
Spark是MapReduce的替代方案,並且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。數組
3、Spark的特色:快、易用、通用、兼容性
(*)快緩存
與Hadoop的MapReduce相比,Spark基於內存的運算速度要快100倍以上,即便,Spark基於硬盤的運算也要快10倍。Spark實現了高效的DAG執行引擎,從而能夠經過內存來高效處理數據流。 網絡
(*)易用架構
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶能夠快速構建不一樣的應用。並且Spark支持交互式的Python和Scala的shell,能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法。
(*)通用
Spark提供了統一的解決方案。Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不一樣類型的處理均可以在同一個應用中無縫使用。Spark統一的解決方案很是具備吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減小開發和維護的人力成本和部署平臺的物力成本。
另外Spark還能夠很好的融入Hadoop的體系結構中能夠直接操做HDFS,並提供Hive on Spark、Pig on Spark的框架集成Hadoop。
(*)兼容性
Spark能夠很是方便地與其餘的開源產品進行融合。好比,Spark可使用Hadoop的YARN和Apache Mesos做爲它的資源管理和調度器,器,而且能夠處理全部Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集羣的用戶特別重要,由於不須要作任何數據遷移就可使用Spark的強大處理能力。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻,使得全部人均可以很是容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集羣的工具。
2、Spark的體系結構與安裝部署
1、Spark集羣的體系結構
官方的一張圖:
我本身的一張圖:
2、Spark的安裝與部署
Spark的安裝部署方式有如下幾種模式:
Standalone
YARN
Mesos
Amazon EC2
(*)Spark Standalone僞分佈的部署
l 配置文件:conf/spark-env.sh
export JAVA_HOME=/root/training/jdk1.7.0_75
export SPARK_MASTER_HOST=spark81
export SPARK_MASTER_PORT=7077
下面的能夠不寫,默認
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
spark81
(*)Spark Standalone全分佈的部署
l 配置文件:conf/spark-env.sh
export JAVA_HOME=/root/training/jdk1.7.0_75
export SPARK_MASTER_HOST=spark82
export SPARK_MASTER_PORT=7077
下面的能夠不寫,默認
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
l 配置文件:conf/slave
spark83
spark84
(*)啓動Spark集羣:start-all.sh(會和hadoop的start-all.sh有衝突,能夠設置他們的環境變量爲不一樣的名字)
3、Spark HA的實現
(*)基於文件系統的單點恢復
主要用於開發或測試環境。當spark提供目錄保存spark Application和worker的註冊信息,並將他們的恢復狀態寫入該目錄中,這時,一旦Master發生故障,就能夠經過從新啓動Master進程(sbin/start-master.sh),恢復已運行的spark Application和worker的註冊信息。
基於文件系統的單點恢復,主要是在spark-en.sh裏對SPARK_DAEMON_JAVA_OPTS設置
配置參數 |
參考值 |
spark.deploy.recoveryMode |
設置爲FILESYSTEM開啓單點恢復功能,默認值:NONE |
spark.deploy.recoveryDirectory |
Spark 保存恢復狀態的目錄 |
參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.1.0-bin-hadoop2.7/recovery"
測試:
1、在spark82上啓動Spark集羣
2、在spark83上啓動spark shell
MASTER=spark://spark82:7077 spark-shell
3、在spark82上中止master
stop-master.sh
4、觀察spark83上的輸出:
5、在spark82上重啓master
start-master.sh
(*)基於Zookeeper的Standby Masters
ZooKeeper提供了一個Leader Election機制,利用這個機制能夠保證雖然集羣存在多個Master,可是隻有一個是Active的,其餘的都是Standby。當Active的Master出現故障時,另外的一個Standby Master會被選舉出來。因爲集羣的信息,包括Worker, Driver和Application的信息都已經持久化到ZooKeeper,所以在切換的過程當中只會影響新Job的提交,對於正在進行的Job沒有任何的影響。加入ZooKeeper的集羣總體架構以下圖所示。
|
參考值 |
spark.deploy.recoveryMode |
設置爲ZOOKEEPER開啓單點恢復功能,默認值:NONE |
spark.deploy.zookeeper.url |
ZooKeeper集羣的地址 |
spark.deploy.zookeeper.dir |
Spark信息在ZK中的保存目錄,默認:/spark |
l 參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 -Dspark.deploy.zookeeper.dir=/spark"
l 另外:每一個節點上,須要將如下兩行註釋掉。
l ZooKeeper中保存的信息
3、執行Spark Demo程序
1、執行Spark Example程序
(*)示例程序:$SPARK_HOME/examples/jars/spark-examples_2.11-2.1.0.jar
(*)全部的示例程序:$EXAMPLE_HOME/examples/src/main
有Java、Scala等等等
(*)Demo:蒙特卡羅求PI
命令:
spark-submit --master spark://spark81:7077 --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.1.0.jar 100
2、使用Spark Shell
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。
(*)啓動Spark Shell:spark-shell
也可使用如下參數:
參數說明:
--master spark://spark81:7077 指定Master的地址
--executor-memory 2g 指定每一個worker可用內存爲2G
--total-executor-cores 2 指定整個集羣使用的cup核數爲2個
例如:
spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
(*)注意:
若是啓動spark shell時沒有指定master地址,可是也能夠正常啓動spark shell和執行spark shell中的程序,實際上是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣創建聯繫。
請注意local模式和集羣模式的日誌區別:
(*)在Spark Shell中編寫WordCount程序
程序以下:
sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
說明:
sc是SparkContext對象,該對象時提交spark程序的入口
textFile("hdfs://192.168.88.111:9000/data/data.txt")是hdfs中讀取數據
flatMap(_.split(" "))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(_+_)按照key進行reduce,並將value累加
saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")將結果寫入到hdfs中
3、在IDEA中編寫WordCount程序
(*)須要的jar包:$SPARK_HOME/jars/*.jar
(*)建立Scala Project,並建立Scala Object、或者Java Class
(*)書寫源代碼,並打成jar包,上傳到Linux
==========================Scala版本==========================
(*)運行程序:
spark-submit --master spark://spark81:7077 --class mydemo.WordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt hdfs://192.168.88.111:9000/output/spark/wc1
====================Java版本(直接輸出在屏幕)====================
(*)運行程序:
spark-submit --master spark://spark81:7077 --class mydemo.JavaWordCount jars/wc.jar hdfs://192.168.88.111:9000/data/data.txt
4、Spark運行機制及原理分析
1、WordCount執行的流程分析
須要看源碼一步步看。
2、Spark提交任務的流程

3.Spark工做機制


5、Spark的算子
1、RDD基礎
- 什麼是RDD?
RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。
- RDD的屬性(源碼中的一段話)
² 一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
² 一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
² RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
² 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
² 一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
RDD的建立方式
- 經過外部的數據文件建立,如HDFS
val rdd1 = sc.textFile(「hdfs://192.168.88.111:9000/data/data.txt」)
- 經過sc.parallelize進行建立
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
- RDD的類型:Transformation和Action
RDD 的基本原理
2、Transformation
RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 |
filter(func) |
返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) |
相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) |
對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) |
|
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey相似,可是更靈活 |
join(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) |
笛卡爾積 |
pipe(command, [envVars]) |
|
coalesce(numPartitions) |
|
repartition(numPartitions) |
|
repartitionAndSortWithinPartitions(partitioner) |
|
3、Action
動做 |
含義 |
reduce(func) |
經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的 |
collect() |
在驅動程序中,以數組的形式返回數據集的全部元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(相似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) |
返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) |
|
saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 |
saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 |
saveAsObjectFile(path) |
|
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 |
foreach(func) |
在數據集的每個元素上,運行函數func進行更新。 |
4、RDD的緩存機制
RDD經過persist方法或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。
Demo示例:
經過UI進行監控:
5、RDD的Checkpoint(檢查點)機制:容錯機制
檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage(血統)作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。
設置checkpoint的目錄,能夠是本地的文件夾、也能夠是HDFS。通常是在具備容錯能力,高可靠的文件系統上(好比HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據。
分別舉例說明:
l 本地目錄
注意:這種模式,須要將spark-shell運行在本地模式上
l HDFS的目錄
注意:這種模式,須要將spark-shell運行在集羣模式上
l 源碼中的一段話
6、RDD的依賴關係和Spark任務中的Stage
l RDD的依賴關係
RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
- 窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用
總結:窄依賴咱們形象的比喻爲獨生子女
- 寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
總結:窄依賴咱們形象的比喻爲超生
l Spark任務中的Stage
DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。
7、RDD基礎練習
- 練習1:
//經過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1裏的每個元素乘2而後排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以數組的方式在客戶端顯示
rdd3.collect
- 練習2:
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1裏面的每個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
- 練習3:
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
- 練習4:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
rdd4.groupByKey
rdd4.collect
- 練習5:
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect
- 練習6:
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
- 練習7:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
6、Spark RDD的高級算子
1、mapPartitionsWithIndex
把每一個partition中的分區號和對應的值拿出來
接收一個函數參數:
l 第一個參數:分區號
l 第二個參數:分區中的元素
示例:將每一個分區中的元素和分區號打印出來。
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
建立一個函數返回RDD中的每一個分區號和元素:
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
調用:rdd1.mapPartitionsWithIndex(func1).collect
2、aggregate
先對局部聚合,再對全局聚合
示例:val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
查看每一個分區中的元素:
將每一個分區中的最大值求和,注意:初始值是0;
若是初始值時候10,則結果爲:30
若是是求和,注意:初始值是0:
若是初始值是10,則結果是:45
一個字符串的例子:
val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
修改一下剛纔的查看分區元素的函數
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
兩個分區中的元素:
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
運行結果:
更復雜一點的例子
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
結果多是:」24」,也多是:」42」
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
結果是:」10」,也多是」01」,
緣由:注意有個初始值」」,其長度0,而後0.toString變成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
結果是:」11」,緣由同上。
3、aggregateByKey
準備數據:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
兩個分區中的元素:
示例:
將每一個分區中的動物最多的個數求和
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
將每種動物個數求和
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
這個例子也可使用:reduceByKey
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
4、coalesce與repartition
都是將RDD中的分區進行重分區。
區別是:coalesce默認不會進行shuffle(false);而repartition會進行shuffle(true),即:會將數據真正經過網絡進行重分區。
示例:
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
下面兩句話是等價的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) --->若是是false,查看RDD的length依然是2
5、其餘高級算子
參考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html