RDD是一個抽象,會記錄一些信息,他並非一個真正的集合,但能夠像集合同樣操做,下降了開發難度。es6
RDD的算子分爲2類,一種是Transformation(lazy不會當即執行,即使有錯誤也不會發現),一類是Action(觸發任務執行)shell
建立RDD的方式有3種。數據庫
一、經過外部的存儲系統建立RDD(如hadoop hdfs,HBase,MongoDB)express
二、將Driver的Scala集合經過並行化的方式變成RDD(測試時使用,生產環境不適用)apache
三、調用一個已經存在的RDD的Transformation,會生成一個新的RDD.數組
1以前已經有過介紹,見提交第一個Spark統計文件單詞數程序,配合hadoop hdfs服務器
2session
Spark context Web UI available at http://192.168.5.182:4040
Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181112100219-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.app
scala> val arr = Array(1,2,3,4,5,6,7,8,9,10)
arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)函數
scala> arr.map(_ * 10)
res0: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
scala> val rdd = sc.parallelize(arr) //將集合轉成RDD
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val rdds = rdd.map(_ * 10) //將每一個元素乘以10造成一個新的RDD
rdds: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:28
scala> rdds.collect //查看這個新的RDD,因爲RDD並非一個真正的集合,必需要通過一次從各個Worker收集才能查看數據
res3: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
scala> val rdd3 = rdd.filter(_ % 2 == 0) //過濾出偶數的集合生成一個新的RDD
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:28
scala> rdd3.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10)
這個時候咱們來看管理界面
咱們點進去這個Spark shell
咱們能夠看到他進行了2次收集
一路點擊進去咱們能夠看到任務是在哪些機器上執行的詳細狀況
RDD的算子
scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x,true) //將List集合每一個元素乘以2後按照升序排序
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at sortBy at <console>:24
scala> rdd2.collect
res5: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
咱們能夠看到他進行了排序和收集操做。
scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x + "",true) //按照字符串規則來排序,不會改變集合的元素類型,這裏依然是Int型集合
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at sortBy at <console>:24
scala> rdd2.collect
res6: Array[Int] = Array(10, 12, 14, 16, 18, 2, 20, 4, 6, 8)
咱們能夠看到排序後是先比較第一位,再比較第二位來進行排序,即字符串規則排序的
scala> val arr = Array("a b c","d e f","h i j")
arr: Array[String] = Array(a b c, d e f, h i j)
scala> arr.map(_.split(" "))
res10: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))
scala> arr.map(_.split(" ")).flatten //扁平化處理
res11: Array[String] = Array(a, b, c, d, e, f, h, i, j)
以上是集合操做
scala> val rdd4 = sc.parallelize(Array("a b c","d e f","h i j"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24
scala> rdd4.map(_.split(" ")).collect
res12: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))
因爲RDD沒有flatten方法,只能使用flatMap方法進行扁平化處理
scala> rdd4.flatMap(_.split(" ")).collect
res13: Array[String] = Array(a, b, c, d, e, f, h, i, j)
-----------------------------------------------------
scala> val rdd5 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))
rdd5: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd5.flatMap(_.flatMap(_.split(" "))).collect //這兩個flatMap不是一回事,一個是RDD的,他會把任務分發到各個計算服務器上進行計算;一個是List的,他只會在被分發到的計算服務器上進行計算
res14: Array[String] = Array(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)
--------------------------------------------------
scala> val rdd6 = sc.parallelize(List(5,6,4,7))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> rdd6.union(rdd7).collect
res15: Array[Int] = Array(5, 6, 4, 7, 1, 2, 3, 4) //並集
scala> rdd6.intersection(rdd7).collect
res16: Array[Int] = Array(4) //交集
-------------------------------------------------
scala> val rdd8 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) //建立一個對偶元組的List的RDD
rdd8: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24
scala> val rdd9 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2)))
rdd9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val rdd10 = rdd8.join(rdd9) //相似於SQL的inner join,只對對偶元組的Key爲依據生效
rdd10: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[36] at join at <console>:28
scala> rdd10.saveAsTextFile("hdfs://192.168.5.182:8020/testjoin") //將結果保存在hadoop hdfs裏面
[Stage 17:> [Stage 19:> [Stage 19:==============================================
[root@host2 bin]# ./hdfs dfs -ls /testjoin
Found 17 items
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00000
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00001
-rw-r--r-- 3 root supergroup 24 2018-11-12 14:54 /testjoin/part-00002
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00003
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00004
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00005
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00006
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00007
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00008
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00009
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00010
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00011
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00012
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00013
-rw-r--r-- 3 root supergroup 14 2018-11-12 14:54 /testjoin/part-00014
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00015
[root@host2 bin]# ./hdfs dfs -cat /testjoin/part-00002
(tom,(1,2))
(tom,(1,8))
[root@host2 bin]# ./hdfs dfs -cat /testjoin/part-00014
(jerry,(2,9))
根據結果,只有tom和jerry被依據條件保留了下來
scala> val rdd11 = rdd8.leftOuterJoin(rdd9) //left join
rdd11: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[40] at leftOuterJoin at <console>:28
scala> rdd11.saveAsTextFile("hdfs://192.168.5.182:8020/leftjointest")
[root@host2 bin]# ./hdfs dfs -ls /leftjointest
Found 17 items
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00000
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00001
-rw-r--r-- 3 root supergroup 36 2018-11-12 15:15 /leftjointest/part-00002
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00003
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00004
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00005
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00006
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00007
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00008
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00009
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00010
-rw-r--r-- 3 root supergroup 17 2018-11-12 15:15 /leftjointest/part-00011
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00012
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00013
-rw-r--r-- 3 root supergroup 20 2018-11-12 15:15 /leftjointest/part-00014
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00015
[root@host2 bin]# ./hdfs dfs -cat /leftjointest/part-00002
(tom,(1,Some(8)))
(tom,(1,Some(2)))
[root@host2 bin]# ./hdfs dfs -cat /leftjointest/part-00011
(kitty,(3,None))
[root@host2 bin]# ./hdfs dfs -cat /leftjointest/part-00014
(jerry,(2,Some(9)))
rdd8的元素都被保留下來,rdd9中有相同的元素會被選出來。
scala> rdd11.collect
res18: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9))))
在Drive中直接查看結果,跟保存在hadoop hdfs中相同。
----------------------------------------------------------
scala> val rdd12 = rdd8.union(rdd9)
rdd12: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[42] at union at <console>:28
scala> rdd12.collect
res20: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
scala> rdd12.groupByKey.collect //分組
res21: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(2, 1, 8)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))
--------------------------------------------------------
如今咱們不用reduceByKey來計算hadoop hdfs中/usr/file/a.txt中的WordCount,而使用groupByKey
scala> val wordAndOne = sc.textFile("hdfs://192.168.5.182:8020/usr/file/a.txt")
wordAndOne: org.apache.spark.rdd.RDD[String] = hdfs://192.168.5.182:8020/usr/file/a.txt MapPartitionsRDD[45] at textFile at <console>:24
scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey.collect
res23: Array[(String, Iterable[Int])] = Array((him,CompactBuffer(1)), (park,CompactBuffer(1)), (fool,CompactBuffer(1)), (dinsh,CompactBuffer(1)), (fish,CompactBuffer(1)), (dog,CompactBuffer(1)), (apple,CompactBuffer(1)), (cry,CompactBuffer(1)), (my,CompactBuffer(1)), (ice,CompactBuffer(1)), (cark,CompactBuffer(1)), (balana,CompactBuffer(1)), (fuck,CompactBuffer(1)))
scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey().mapValues(_.sum).collect //mapValues對對偶元組的值進行操做,_.sum對每一個值進行求和,這樣得出的結果跟以前同樣。
res24: Array[(String, Int)] = Array((him,1), (park,1), (fool,1), (dinsh,1), (fish,1), (dog,1), (apple,1), (cry,1), (my,1), (ice,1), (cark,1), (balana,1), (fuck,1))
雖然結果同樣,可是在數據量大的時候,使用reduceByKey,由於reduceByKey會先在各個計算服務器上先計算,而groupByKey會把全部數據放入一臺計算服務器中,再進行計算,這樣消耗會很是大
------------------------------------------------------------
scala> val rdd1 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd3 = rdd1.cogroup(rdd2) //對對偶元組所在的集合的RDD進行操做,以Key爲依據進行分組,得到一個新的對偶元組數組,對偶元組中,保留Key,而Value爲每個RDD中的Value集合組成的元組。
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[3] at cogroup at <console>:28
scala> rdd3.collect
[Stage 0:> [Stage 0:> res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2))))
-----------------------------------------------------------------------------
scala> val rdd1 = sc.parallelize(List("tom","jerry"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List("tom","kitty","shuke"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val rdd3 = rdd1.cartesian(rdd2) //求笛卡爾積
rdd3: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[6] at cartesian at <console>:28
scala> rdd3.collect
res1: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
---------------------------------------------------------------------
Action
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3) //並行化建立時指定3個分區
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave")
[root@host2 bin]# ./hdfs dfs -ls /
Found 4 items
drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest
drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin
drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
[root@host2 bin]# ./hdfs dfs -ls /testsave
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:07 /testsave/_SUCCESS
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:07 /testsave/part-00000
-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00001
-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00002
[root@host2 bin]# ./hdfs dfs -cat /testsave/part-00000
1
[root@host2 bin]# ./hdfs dfs -cat /testsave/part-00001
2
3
[root@host2 bin]# ./hdfs dfs -cat /testsave/part-00002
4
5
在Hadoop hdfs裏,咱們能夠看到,他有3個part保存結果
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5)) //不指定分區
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave1")
[root@host2 bin]# ./hdfs dfs -ls /
Found 5 items
drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest
drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin
drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave
drwxr-xr-x - root supergroup 0 2018-11-15 11:09 /testsave1
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
[root@host2 bin]# ./hdfs dfs -ls /testsave1
Found 17 items
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00000
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00001
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00002
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00003
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00004
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00005
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00006
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00007
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00008
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00009
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00010
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00011
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00012
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00013
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00014
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00015
[root@host2 bin]# ./hdfs dfs -cat /testsave1/part-00003
1
[root@host2 bin]# ./hdfs dfs -cat /testsave1/part-00006
2
[root@host2 bin]# ./hdfs dfs -cat /testsave1/part-00009
3
[root@host2 bin]# ./hdfs dfs -cat /testsave1/part-00012
4
[root@host2 bin]# ./hdfs dfs -cat /testsave1/part-00015
5
不指定分區,咱們能夠看到有16個分區,這跟咱們啓動Spark-Shell時使用的核數有關係
[root@host2 bin]# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 1g --total-executor-cores 16
這裏我使用的16核,1G內存來啓動本次計算,值得注意的是這裏並非分區越大越好,分區較大,也只有16個線程同時工做,其餘線程等待,而切換線程會浪費時間。
----------------------------------------------------------
scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD[12] at textFile at <console>:24
[root@host2 bin]# ./hdfs dfs -ls /usr/file/wcount
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS
-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000
-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001
scala> rdd.partitions.length //查看RDD的分區數
res4: Int = 3
這裏咱們能夠看到hadoop hdfs裏/usr/file/wcount下面有3個文件,RDD的分區數則爲3,若是咱們上傳一個新的文件進入該文件夾
[root@host2 bin]# ./hdfs dfs -put /home/soft/schema.xml /usr/file/wcount
[root@host2 bin]# ./hdfs dfs -ls /usr/file/wcount
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS
-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000
-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001
-rw-r--r-- 3 root supergroup 3320 2018-11-15 14:34 /usr/file/wcount/schema.xml
scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD[14] at textFile at <console>:24
scala> rdd.partitions.length
res5: Int = 4
則該RDD的分區數變成了4.
-------------------------------------------------------------------
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> rdd1.reduce(_+_)
res6: Int = 15
咱們這裏能夠看到reduce沒有返回一個RDD,而是直接返回了一個值,說明reduce()是一個Action算子
scala> rdd1.count
res7: Long = 5
集合包含的元素數量,也是一個Action算子
scala> rdd1.top(2)
res8: Array[Int] = Array(5, 4)
將元素進行排序,按照降序取最大的n個
scala> rdd1.take(2)
res9: Array[Int] = Array(1, 2)
取前n個元素,不排序
scala> rdd1.first
res10: Int = 1
取第一個元素
scala> rdd1.takeOrdered(3)
res11: Array[Int] = Array(1, 2, 3)
排序,按照升序,取前n個元素
-------------------------------------------------------------------
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val func = (index: Int,it: Iterator[Int]) => {
| it.map(e => s"part: $index, ele: $e")
| }
func: (Int, Iterator[Int]) => Iterator[String] = <function2>
定義一個專門獲取集合數據e所在分區index的函數
scala> val rdd2 = rdd.mapPartitionsWithIndex(func) //一次性獲取一個分區的集合數據,而且知道這個集合的數據在哪一個分區
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:28
scala> rdd2.collect
res0: Array[String] = Array(part: 0, ele: 1, part: 0, ele: 2, part: 0, ele: 3, part: 0, ele: 4, part: 1, ele: 5, part: 1, ele: 6, part: 1, ele: 7, part: 1, ele: 8, part: 1, ele: 9)
1,2,3,4在0分區;5,6,7,8,9在1分區。
scala> rdd.aggregate(0)(_ + _,_ + _) //第一個_ + _表示在每一個分區內各自相加(這裏是2個分區),第二個_ + _表示再總求和(先分散,再聚合)
res6: Int = 45
scala> rdd.aggregate(0)(math.max(_,_),_ + _) //math.max(_,_)表示取各個分區的最大值,_ + _表示各個最大值相加
res7: Int = 13
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) //3個分區
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> rdd.aggregate(0)(_ + _,_ + _) //3個分區分別相加,再彙總
res8: Int = 45
scala> rdd.aggregate(0)(math.max(_,_),_ + _) //3個分區的最大值相加,這裏爲3+6+9
res9: Int = 18
scala> rdd.aggregate(5)(math.max(_,_),_ + _)
res10: Int = 25
這裏5做爲一個值被加到各個分區作比較,第一個分區1,2,3都比5小,因此第一個分區最大值爲5,第二個分區最大值爲6,第三個分區最大值爲9,5+6+9=20,同時5又做爲一個單獨分區被統加,因此這裏是5+6+9+5=25
---------------------------------------------------------
scala> val rdd = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> def func2(index: Int, iter: Iterator[String]): Iterator[String] = {
| iter.map(x => "[partID:" + index + ",val:" + x + "]")
| }
func2: (index: Int, iter: Iterator[String])Iterator[String]
定義一個專門獲取集合數據x所在分區index的函數
scala> val rdd1 = rdd.mapPartitionsWithIndex(func2) //一次性獲取一個分區的集合數據,而且知道這個集合的數據在哪一個分區
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:28
scala> rdd1.collect
res3: Array[String] = Array([partID:0,val:a], [partID:0,val:b], [partID:0,val:c], [partID:1,val:d], [partID:1,val:e], [partID:1,val:f])
a,b,c在0分區;d,e,f在1分區
scala> rdd.aggregate("")(_ + _,_ + _)
res18: String = defabc
scala> rdd.aggregate("")(_ + _,_ + _)
res19: String = abcdef
這裏出現了兩個不一樣的結果,其緣由就在於rdd有兩個分區,而每一個分區在worker裏面的executor是並行計算的,他們返回到rdd的結果速度不必定,誰先返回,誰在前面。
scala> rdd.aggregate("|")(_ + _,_ + _)
res20: String = ||abc|def
scala> rdd.aggregate("|")(_ + _,_ + _)
res21: String = ||def|abc
這裏也是出現了兩個結果,緣由同上,|被分配到每個分區做爲第一個字符被鏈接,同時|做爲一個單獨的分區被鏈接字符串。
---------------------------------------------------------------------
scala> val rdd = sc.parallelize(List("12","23","345","4567"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x + y)
res24: String = 24
scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x + y)
res25: String = 42
(x,y) => math.max(x.length,y.length).toString每個分區取最大的字符串長度轉成字符串,(x,y) => x + y全部分區結果字符串的拼接。第一個分區"12","23"的最大字符串長度爲2,第二個分區"345","4567"的最大字符串長度爲4.因此有兩個結果,誰先返回誰在前面,返回的結果爲"24"或者"42".
---------------------------------------------------------------
scala> val rdd = sc.parallelize(List("12","23","345",""),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x + y)
res28: String = 01
scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x + y)
res29: String = 10
這個比較難以理解,第一個分區""跟"12"比較獲得長度爲"0"的字符串,而後"0"的字符串跟"23"比較,獲得長度爲"1"的字符串;第二個分區,""跟"345"比較獲得"0"的字符串,"0"的字符串跟""比較獲得"0"的字符串,因此返回的是"01"或者是"10",咱們能夠用下面這個rdd來驗證。
scala> val rdd = sc.parallelize(List("12","23","345","67"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x + y)
res30: String = 11
這裏惟一的不一樣就在於"0"的字符串跟"67"比較獲得"1"的字符串
-------------------------------------------------------------------------------
scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> pairRDD.aggregateByKey(0)(_ + _,_ + _).collect //各個分區相加,再聚合相加
res33: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
scala> pairRDD.aggregateByKey(100)(_ + _,_ + _).collect
res34: Array[(String, Int)] = Array((dog,112), (cat,219), (mouse,206))
初始值100,會在每一個分區的都加一次,dog在第一個分區中沒有,第二個分區中加得112;cat在第一個分區和第二個分區都有,因此100會加兩次,獲得219,mouse同理。
固然咱們只是爲了獲取對偶元組key的value值的和,可使用reduceByKey,這裏不須要分區,結果跟初始值爲0的aggregateByKey相同
scala> pairRDD.reduceByKey(_ + _).collect
res31: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
scala> pairRDD.aggregateByKey(100)(_ + _,_ + _).saveAsTextFile("hdfs://192.168.5.182:8020/aggbk")
[root@host2 bin]# ./hdfs dfs -ls /aggbk
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-11-16 17:22 /aggbk/_SUCCESS
-rw-r--r-- 3 root supergroup 20 2018-11-16 17:22 /aggbk/part-00000
-rw-r--r-- 3 root supergroup 12 2018-11-16 17:22 /aggbk/part-00001
[root@host2 bin]# ./hdfs dfs -cat /aggbk/part-00000
(dog,112)
(cat,219)
[root@host2 bin]# ./hdfs dfs -cat /aggbk/part-00001
(mouse,206)
[root@host2 bin]#
將初始值100的結果保存進hadoop hdfs中,由於咱們建立RDD的時候是2個分區,因此這裏只有2個part文件,查看結果跟以前collect相同。
---------------------------------------------------------------------------------
scala> val rdd = sc.parallelize(List(("a",1),("b",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> rdd.collectAsMap //將結果收集轉換成Map
res36: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
scala> rdd.mapValues(_ * 100).collectAsMap //將value乘以100,收集成Map
res37: scala.collection.Map[String,Int] = Map(b -> 200, a -> 100)
-------------------------------------------------------------------------------------
RDD的執行過程,先把List(1,2,3,4,5)分3個區,生成task,推送到3個Worker的Executor中,在Executor中通過計算,獲得結果,再收集回Driver中,以數組的形式返回,返回的結果,有快有慢,可是他依然會按照分區編號來進行組裝成一個Array,因此他的順序並不會變化。
scala> val rdd = sc.parallelize(List(1,2,3,4,5),3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd.map(_ * 10).collect
res19: Array[Int] = Array(10, 20, 30, 40, 50)
scala> rdd.map(_ * 10).collect
res20: Array[Int] = Array(10, 20, 30, 40, 50)
scala> rdd.map(_ * 10).collect
res21: Array[Int] = Array(10, 20, 30, 40, 50)
這裏不管執行多少次,順序都不會變。
若是要將結果保存到數據庫中,當數據量過大時,應該經過Executor直接寫入數據庫,而不是經過Driver收集再存入數據庫。
-----------------------------------------------------------------------------
scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> rdd.countByKey() //跟對偶元組的Value無關,只看Key的出現次數
res22: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)
--------------------------------------------------------------------------------
scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1),("d",4),("d",2),("e",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd1 = rdd.filterByRange("b","d") //以對偶數組的Key爲過濾條件,只取"b"到"d"的範圍的元組
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at filterByRange at <console>:26
scala> rdd1.collect
res24: Array[(String, Int)] = Array((b,2), (b,2), (c,2), (c,1), (d,4), (d,2))
-----------------------------------------------------------------------------------------------
scala> val a = sc.parallelize(List(("a","1 2"),("b","3 4")))
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[12] at parallelize at <console>:24
scala> a.flatMapValues(_.split(" ")).collect //對對偶元組的Value進行扁平化處理
res25: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
---------------------------------------------------------------------------------------------
scala> val rdd = sc.parallelize(List("dog","wolf","cat","bear"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> val rdd1 = rdd.map(x => (x.length,x)) //將rdd的元素轉成帶對偶元組的集合,造成一個新的RDD的rdd1
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[15] at map at <console>:26
scala> rdd1.collect
res26: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
如今咱們要將rdd1以相同的Key,將Value拼接起來,有如下三種方法
scala> rdd1.aggregateByKey("")(_ + _,_ + _).collect
res27: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
scala> rdd1.aggregateByKey("")(_ + _,_ + _).collect
res28: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))
scala> rdd1.reduceByKey(_ + _).collect
res40: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
scala> rdd1.foldByKey("")(_ + _).collect
res41: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
其實這3種方法均可以實現分散聚合,是由於他們都調用了同一個底層方法combineByKeyWithClassTag
-------------------------------------------------------------------------------
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
scala> rdd.foreach(e => println(e * 100))
這個foreach咱們看到沒有任何返回,其緣由就在於這是在executor上執行的,並無返回Driver.咱們來看Spark的控制檯
這裏有一個Job Id爲42的foreach,一直點進去能夠看到
咱們點擊Tasks(2)的stdout能夠看到當index爲0時
當index爲1時,能夠看到
說明他們只是在executor中執行了rdd.foreach(e => println(e * 100))這條語句。
scala> rdd.foreachPartition(it => it.foreach(x => println(x * 10000))) //一次性拿出一個分區的數據放入迭代器,由迭代器來打印
咱們能夠看到這裏也沒有返回值,在Spark控制檯中,能夠看到
說明他也是在Executor中執行了該語句,並無返回到Driver.
當咱們要將Executor中的數據寫入到數據庫時,使用foreachPartition一次性拿出一個分區的數據,與數據庫創建一次鏈接,就能夠所有寫進去,而使用foreach則須要每拿出一條數據就要與數據庫創建一次鏈接,這樣很是低效,並且消耗很是巨大。
---------------------------------------------------------------------------------------
scala> val pairRDD = sc.parallelize(List(("hello",2),("jerry",3),("hello",4),("jerry",1)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd = pairRDD.combineByKey(x => x,(m: Int,n: Int) => m + n,(a: Int,b: Int) => a + b)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at combineByKey at <console>:26
combineByKey是一個底層的算子,必需要聲明參數的類型,不能使用相似_ + _的寫法;ShuffledRDD是把有相同的Key的對偶元組放到同一個Executor中,再進行運算。
scala> rdd.collect
res1: Array[(String, Int)] = Array((hello,6), (jerry,4))
咱們來看一個把各類動物按照單雙來進行分組的例子
scala> val rdd = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(List(1,1,2,2,2,1,2,2,2),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd2 = rdd1.zip(rdd) //將兩個RDD的集合合併成一個對偶元組的集合
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[2] at zip at <console>:28
scala> rdd2.collect
res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer
scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBuffer[String],n: String) => m += n,(a: ListBuffer[String],b: ListBuffer[String]) => a ++= b)
rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBuffer[String])] = ShuffledRDD[4] at combineByKey at <console>:31
第一個函數x => ListBuffer(x)是將分好組的各類Key(這裏Key爲數字)的第一個Value(Value爲動物)放進一個單獨的ListBuffer中,好比第一個分區中只有ListBuffer(dog)和ListBuffer(gnu),沒有cat,由於cat不是1的第一個Value,其餘分區以此類推;第二個函數(m: ListBuffer[String],n: String) => m += n將沒有放進ListBuffer中的其餘Value放進有相同Key的ListBuffer中,好比第一個分區中有ListBuffer(dog,cat),ListBuffer(gnu),此時只是在各個分區分別操做;第三個函數(a: ListBuffer[String],b: ListBuffer[String]) => a ++= b進行全部分區總體聚合,將全部相同Key的ListBuffer合併,此時是一個Shuffled操做,會將有相同Key的ListBuffer放入到同一個機器中,計算完再合併。
scala> rdd3.collect
res2: Array[(Int, scala.collection.mutable.ListBuffer[String])] = Array((1,ListBuffer(dog, cat, turkey)), (2,ListBuffer(salmon, rabbit, gnu, wolf, bear, bee)))
總體概念圖以下
將結果保存到hadoop hdfs中
scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine")
[root@host2 bin]# ./hdfs dfs -ls /combine
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/part-00000
-rw-r--r-- 3 root supergroup 33 2018-11-23 17:14 /combine/part-00001
-rw-r--r-- 3 root supergroup 53 2018-11-23 17:14 /combine/part-00002
[root@host2 bin]# ./hdfs dfs -cat /combine/part-00001
(1,ListBuffer(turkey, dog, cat))
[root@host2 bin]# ./hdfs dfs -cat /combine/part-00002
(2,ListBuffer(gnu, wolf, bear, bee, salmon, rabbit))
雖然有3個分區,可是Shuffled之後,只有2個Key(1和2),因此只有兩個文件有數據,可是有3個part文件。
咱們能夠從新定義rdd3的分區數
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBuffer[String],n: String) => m += n,(a: ListBuffer[String],b: ListBuffer[String]) => a ++= b,new HashPartitioner(2),true,null)
rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBuffer[String])] = ShuffledRDD[6] at combineByKey at <console>:32
從新保存到hadoop hdfs中
scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine1")
[root@host2 bin]# ./hdfs dfs -ls /combine1
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-11-23 17:27 /combine1/_SUCCESS
-rw-r--r-- 3 root supergroup 53 2018-11-23 17:27 /combine1/part-00000
-rw-r--r-- 3 root supergroup 33 2018-11-23 17:27 /combine1/part-00001
此時能夠看到新保存的結果只有2個part文件,而且都有數據。