Spark算子:RDD行動Action操做(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset

saveAsNewAPIHadoopFile

def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unitnode

def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unitapache

 

saveAsNewAPIHadoopFile用於將RDD數據保存到HDFS上,使用新版本Hadoop API。app

用法基本同saveAsHadoopFile。oop

 

[plain] view plain copyspa

print?.net

  1. import org.apache.spark.SparkConf  
  2. import org.apache.spark.SparkContext  
  3. import SparkContext._  
  4. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat  
  5. import org.apache.hadoop.io.Text  
  6. import org.apache.hadoop.io.IntWritable  
  7.    
  8. var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))  
  9. rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])  

 

 

saveAsNewAPIHadoopDataset

 

def saveAsNewAPIHadoopDataset(conf: Configuration): Unitorm

做用同saveAsHadoopDataset,只不過採用新版本Hadoop API。htm

以寫入HBase爲例:blog

 

HBase建表:ip

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

 

完整的Spark應用程序:

[plain] view plain copy

print?

  1. package com.lxw1234.test  
  2.    
  3. import org.apache.spark.SparkConf  
  4. import org.apache.spark.SparkContext  
  5. import SparkContext._  
  6. import org.apache.hadoop.hbase.HBaseConfiguration  
  7. import org.apache.hadoop.mapreduce.Job  
  8. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  10. import org.apache.hadoop.hbase.client.Result  
  11. import org.apache.hadoop.hbase.util.Bytes  
  12. import org.apache.hadoop.hbase.client.Put  
  13.    
  14. object Test {  
  15.   def main(args : Array[String]) {  
  16.    val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")  
  17.    val sc = new SparkContext(sparkConf);  
  18.    var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))  
  19.      
  20.     sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")  
  21.     sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")  
  22.     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")  
  23.     var job = new Job(sc.hadoopConfiguration)  
  24.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
  25.     job.setOutputValueClass(classOf[Result])  
  26.     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])  
  27.       
  28.     rdd1.map(  
  29.       x => {  
  30.         var put = new Put(Bytes.toBytes(x._1))  
  31.         put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))  
  32.         (new ImmutableBytesWritable,put)  
  33.       }      
  34.     ).saveAsNewAPIHadoopDataset(job.getConfiguration)  
  35.       
  36.     sc.stop()     
  37.   }  
  38. }  
  39.    

注意:保存到HBase,運行時候須要在SPARK_CLASSPATH中加入HBase相關的jar包。

可參考:http://lxw1234.com/archives/2015/07/332.htm

相關文章
相關標籤/搜索