返回key-value對,key是惟一的,若是rdd元素中同一個key對應多個value,則只會保留一個。
/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): Map[K, V]
scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.collectAsMap res0: scala.collection.Map[String,Int] = Map(A -> 3, C -> 3, B -> 2)
計算有多少個不一樣的key.
/**
* Count the number of elements for each key, collecting the results to a local Map.
*
* Note that this method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
scala> val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24 scala> rdd.countByKey res5: scala.collection.Map[Int,Long] = Map(1 -> 3, 2 -> 3)
計算不一樣的value個數,該函數首先經過map將每一個元素轉成(value,null)的key-value(value爲null)對,
而後調用countByKey進行統計。
/**
* Return the count of each unique value in this RDD as a local map of (value, count) pairs.
*
* Note that this method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
map(value => (value, null)).countByKey()
}
scala> val rdd = sc.parallelize(List(1,2,3,4,5,4,4,3,2,1)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24 scala> rdd.countByValue res12: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 2, 2 -> 2, 3 -> 2, 4 -> 3)
根據key值搜索全部的value.
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
def lookup(key: K): Seq[V]
scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> rdd.lookup("A") res2: Seq[Int] = WrappedArray(1, 2, 3)
將RDD數據根據設置的checkpoint目錄保存至硬盤中。linux
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit
/*經過linux命令建立/home/check目錄後,設置checkpoint directory*/ scala> sc.setCheckpointDir("/home/check") scala> val rdd = sc.parallelize(List(("A",1),("A",2),("A",3),("B",1),("B",2),("C",3)),3) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24 /* *執行下面的代碼會在/home/check目錄下建立一個空的目錄/home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49 */ scala> rdd.checkpoint /* 執行count後會在上述目錄下建立一個rdd目錄,rdd目錄下是數據文件 */ scala> rdd.count res5: Long = 6
[root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/ total 8 drwxr-xr-x. 2 root root 4096 Sep 4 10:29 . drwxr-xr-x. 3 root root 4096 Sep 4 10:29 .. [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/ total 12 drwxr-xr-x. 3 root root 4096 Sep 4 10:30 . drwxr-xr-x. 3 root root 4096 Sep 4 10:29 .. drwxr-xr-x. 2 root root 4096 Sep 4 10:30 rdd-6 [root@localhost ~]# ll -a /home/check/5545e4ca-d53d-4d93-aaf4-fd3c74f1ea49/rdd-6/ total 32 drwxr-xr-x. 2 root root 4096 Sep 4 10:30 . drwxr-xr-x. 3 root root 4096 Sep 4 10:30 .. -rw-r--r--. 1 root root 171 Sep 4 10:30 part-00000 -rw-r--r--. 1 root root 12 Sep 4 10:30 .part-00000.crc -rw-r--r--. 1 root root 170 Sep 4 10:30 part-00001 -rw-r--r--. 1 root root 12 Sep 4 10:30 .part-00001.crc -rw-r--r--. 1 root root 170 Sep 4 10:30 part-00002 -rw-r--r--. 1 root root 12 Sep 4 10:30 .part-00002.crc
返回RDD全部元素的數組。
/**
* Return an array that contains all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T]
scala> val rdd = sc.parallelize(1 to 10,3) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> rdd.collect res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
返回一個包含全部算的迭代器。
/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*
* Note: this results in multiple Spark jobs, and if the input RDD is the result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input RDD should be cached first.
*/
def toLocalIterator: Iterator[T]
scala> val rdd = sc.parallelize(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val it = rdd.toLocalIterator it: Iterator[Int] = non-empty iterator scala> while(it.hasNext){ | println(it.next) | } 1 2 3 4 5 6 7 8 9 10
返回RDD中元素的數量。
/**
* Return the number of elements in the RDD.
*/
def count(): Long
scala> val rdd = sc.parallelize(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.count res1: Long = 10
返回該RDD的依賴RDD的地址。
/**
* Get the list of dependencies of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def dependencies: Seq[Dependency[_]]
scala> val rdd = sc.parallelize(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val rdd1 = rdd.filter(_>3) rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:26 scala> val rdd2 = rdd1.filter(_<6) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:28 scala> rdd2.dependencies res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@21c882b5)
以數組形式返回RDD各分區地址
/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def partitions: Array[Partition]
scala> val rdd = sc.parallelize(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> rdd.partitions res4: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@70c, org.apache.spark.rdd.ParallelCollectionPartition@70d)
返回RDD的第一個元素。
/**
* Return the first element in this RDD.
*/
def first(): T
scala> val rdd = sc.parallelize(1 to 10,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> rdd.first res5: Int = 1
使用zeroValue和每一個分區的元素進行聚合運算,最後各分區結果和zeroValue再進行一次聚合運算。
/**
* @param zeroValue the initial value for the accumulated result of each partition for the `op`
* operator, and also the initial value for the combine results from different
* partitions for the `op` operator - this will typically be the neutral
* element (e.g. `Nil` for list concatenation or `0` for summation)
* @param op an operator used to both accumulate results within a partition and combine results
* from different partitions
*/
def fold(zeroValue: T)(op: (T, T) => T): T
scala> val rdd = sc.parallelize(1 to 5) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> rdd.fold(10)(_+_) res13: Int = 35