Spark算子:RDD基本轉換操做(7)–zipWithIndex、zipWithUniqueId

zipWithIndex

def zipWithIndex(): RDD[(T, Long)]算法

該函數將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。函數

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2)
  var rdd3 = rdd2.zipWithIndex()
  rdd3.collect.foreach(println(_))
}

16/12/20 14:23:41 INFO DAGScheduler: Job 1 finished: collect at ShellTest.scala:23, took 0.050251 s
(A,0)
(B,1)
(C,2)
(D,3)
(E,4)

16/12/20 14:23:41 INFO SparkContext: Invoking stop() from shutdown hookspa

 

zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]scala

該函數將RDD中元素和一個惟一ID組合成鍵/值對,該惟一ID生成算法以下:索引

每一個分區中第一個元素的惟一ID值爲:該分區索引號,ip

每一個分區中第N個元素的惟一ID值爲:(前一個元素的惟一ID值) + (該RDD總的分區數)it

def main(args: Array[String]): Unit = {
  //默認分區12個
  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))
  var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2)
  var rdd3 = rdd2.zipWithUniqueId()
  rdd3.collect.foreach(println(_))
}

16/12/20 14:25:55 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:23, took 0.568861 s
(A,0)
(B,2)
(C,1)
(D,3)
(E,5)

16/12/20 14:25:55 INFO SparkContext: Invoking stop() from shutdown hookspark

相關文章
相關標籤/搜索