Spark算子:RDD建立操做

從集合建立RDD

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]es6

從一個Seq集合建立RDD。apache

參數1:Seq集合,必須。oop

參數2:分區數,默認爲該Application分配到的資源的CPU核數大數據

 
  1. scala> var rdd = sc.parallelize(1 to 10)
  2. rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21
  3.  
  4. scala> rdd.collect
  5. res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  6.  
  7. scala> rdd.partitions.size
  8. res4: Int = 15
  9.  
  10. //設置RDD爲3個分區
  11. scala> var rdd2 = sc.parallelize(1 to 10,3)
  12. rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21
  13.  
  14. scala> rdd2.collect
  15. res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  16.  
  17. scala> rdd2.partitions.size
  18. res6: Int = 3
  19.  
  • makeRDD

def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]優化

這種用法和parallelize徹底相同es5

def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]spa

該用法能夠指定每個分區的preferredLocations。scala

 
  1. scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),
  2. (11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))
  3. collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
  4. List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com)))
  5.  
  6. scala> var rdd = sc.makeRDD(collect)
  7. rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23
  8.  
  9. scala> rdd.partitions.size
  10. res33: Int = 2
  11.  
  12. scala> rdd.preferredLocations(rdd.partitions(0))
  13. res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)
  14.  
  15. scala> rdd.preferredLocations(rdd.partitions(1))
  16. res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)
  17.  
  18.  

指定分區的優先位置,對後續的調度優化有幫助。orm

 

從外部存儲建立RDD

  • textFile

//從hdfs文件建立.xml

 
  1. //從hdfs文件建立
  2. scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt")
  3. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21
  4.  
  5. scala> rdd.count
  6. res48: Long = 4
  7.  
  8. //從本地文件建立
  9. scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml")
  10. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21
  11.  
  12. scala> rdd.count
  13. res49: Long = 97
  14.  

注意這裏的本地文件路徑須要在Driver和Executor端存在。

  • 從其餘HDFS文件格式建立

hadoopFile

sequenceFile

objectFile

newAPIHadoopFile

  • 從Hadoop接口API建立

hadoopRDD

newAPIHadoopRDD

好比:從HBase建立RDD

 
  1. scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  2. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  3.  
  4. scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  6.  
  7. scala> import org.apache.hadoop.hbase.client.HBaseAdmin
  8. import org.apache.hadoop.hbase.client.HBaseAdmin
  9.  
  10. scala> val conf = HBaseConfiguration.create()
  11. scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")
  12. scala> var hbaseRDD = sc.newAPIHadoopRDD(
  13. conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
  14.  
  15. scala> hbaseRDD.count
  16. res52: Long = 1
  17.  

若是以爲本博客對您有幫助,請 贊助做者 。

轉載請註明:lxw的大數據田地 » Spark算子:RDD建立操做

相關文章
相關標籤/搜索