Spark學習筆記總結html
Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
Spark是MapReduce的替代方案,並且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。shell
RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變(建立了內容不可變)、可分區、裏面的元素可並行計算的集合。機器學習
查看該rdd的分區數量
rdd1.partitions.length分佈式
RDD中兩種算子:
transformation轉換,是延遲加載的函數
經常使用的transformation:
(1)map、flatMap、filter
(2)intersection求交集、union求並集:注意類型要一致
distinct:去重
(3)join:類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD
(4)groupByKey:在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
可是效率reduceByKey較高,由於有一個本地combiner的過程。
(5)cartesian笛卡爾積oop
經常使用的action
(1)collect()、count()
(2)reduce:經過func函數彙集RDD中的全部元素
(3)take(n):取前n個;top(2):排序取前兩個
(4)takeOrdered(n),排完序後取前n個學習
參考《http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html》spa
(1)mapPartitions(func)和
mapPartitions(func):
獨立地在RDD的每個分片上運行,可是返回值;foreachPartition(func)也經常使用,不須要返回值命令行
mapPartitionsWithIndex(func):
能夠看到分區的編號,以及該分區數據。
相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,func的函數類型必須是
(Int, Interator[T]) => Iterator[U]scala
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) val func = (index: Int, iter: Iterator[(Int)]) => {iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator} rdd1.mapPartitionsWithIndex(func).collect
(2)aggregate
action操做,
第一個參數是初始值,
第二個參數:是2個函數[每一個函數都是2個參數(第一個參數:先對個個分區進行的操做, 第二個:對個個分區合併後的結果再進行合併), 輸出一個參數]
例子:
rdd1.aggregate(0)(_+_, _+_) //前一個是對每個分區進行的操做,第二個是對各分區結果進行的結果 rdd1.aggregate(5)(math.max(_, _), _ + _) //結果:5 + (5+9) = 19 val rdd3 = sc.parallelize(List("12","23","345","4567"),2) rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) //結果:24或者42 val rdd4 = sc.parallelize(List("12","23","345",""),2) rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) //結果01或者10
(3)aggregateByKey
將key值相同的,先局部操做,再總體操做。。和reduceByKey內部實現差很少
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect //結果:Array((dog,12), (cat,17), (mouse,6))
PS:
和reduceByKey(+)調用的都是同一個方法,只是aggregateByKey要底層一些,能夠先局部再總體操做。
(4)combineByKey
和reduceByKey是相同的效果,是reduceByKey的底層。
第一個參數x:原封不動取出來, 第二個參數:是函數, 局部運算, 第三個:是函數, 對局部運算後的結果再作運算
每一個分區中每一個key中value中的第一個值,
val rdd1 = sc.textFile("hdfs://master:9000/wordcount/input/").flatMap(_.split(" ")).map((_, 1)) val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd2.collect
第一個參數的含義:
每一個分區中相同的key中value中的第一個值
如:
(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就至關於hello的第一個1, good中的1
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd3.collect //每一個會多加3個10 val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val rdd6 = rdd5.zip(rdd4) val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n) //將key相同的數據,放入一個集合中
(5)collectAsMap
Action
Map(b -> 2, a -> 1)//將Array的元祖轉換成Map,之後能夠經過key取值
val rdd = sc.parallelize(List(("a", 1), ("b", 2))) rdd.collectAsMap //能夠下一步使用
(6)countByKey
根據key計算key的數量
Action
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) rdd1.countByKey rdd1.countByValue//將("a", 1)當作一個元素,統計其出現的次數
(7)flatMapValues 對每個value進行操做後壓平