Action類算子也是一類算子(函數)叫作行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序(就是咱們編寫的一個應用程序)中有幾個Action類算子執行,就有幾個job運行。es6
經過函數func彙集數據集中的全部元素,這個函數必須是關聯性的,確保能夠被正確的併發執行 apache
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd1.reduce(_+_) res3: Int = 55
在driver的程序中,以數組的形式,返回數據集的全部元素,這一般會在使用filter或者其它操做後,返回一個足夠小的數據子集再使用 數組
scala> var rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24 scala> rdd1.collect res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
返回數據集的元素個數 併發
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24 scala> rdd1.count res4: Long = 10
返回數據集的第一個元素(相似於take(1)) app
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24 scala> rdd1.first res5: Int = 1
返回一個數組,由數據集的前n個元素組成。注意此操做目前並不是並行執行的,而是driver程序所在機器 ide
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24 scala> rdd1.take(3) res6: Array[Int] = Array(1, 2, 3)
withReplacement:結果中是否可重複
num:取多少個
seed:隨機種子
返回一個數組,在數據集中隨機採樣num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定的隨機數生成器種子
原理
takeSample()函數和sample函數是一個原理,可是不使用相對比例採樣,而是按設定的採樣個數進行採樣,同時返回結果再也不是RDD,而是至關於對採樣後的數據進行collect(),返回結果的集合爲單機的數組函數
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:24 scala> rdd1.takeSample(true,4,10) res19: Array[Int] = Array(10, 10, 2, 3)
takeOrdered和top相似,只不過以和top相反的順序返回元素。
top默認倒序,taskOrdered默認正序
top方法其實就是調用的taskOrdered,而後反轉的結果es5
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { takeOrdered(num)(ord.reverse) }
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at <console>:24 scala> rdd1.top(5) res22: Array[Int] = Array(10, 9, 8, 7, 6) scala> rdd1.takeOrdered(5) res23: Array[Int] = Array(1, 2, 3, 4, 5)
saveAsTextFile用於將RDD以文本文件的格式存儲到文件系統中spa
val conf = new SparkConf() .setAppName("saveFile") .setMaster("local[*]") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.parallelize(1 to 10) rdd1.repartition(1).saveAsTextFile("/tmp/fff")
saveAsSequenceFile用於將RDD以SequenceFile的文件格式保存到HDFS上。使用方法和saveAsTextFile相似scala
saveAsObjectFile用於將RDD中的元素序列化成對象,存儲到文件中。使用方法和saveAsTextFile相似
對(K,V)類型的RDD有效,返回一個(K,Int)對的map,表示每個能夠對應的元素個數
scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd1.countByKey res1: scala.collection.Map[String,Long] = Map(B -> 2, A -> 2, C -> 1)
在數據集的每個元素上,運行函數func,t一般用於更新一個累加器變量,或者和外部存儲系統作交互
scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at makeRDD at <console>:24 scala> rdd1.collect.foreach(println(_)) (A,0) (A,2) (B,1) (B,2) (C,3)