saveAsNewAPIHadoopFile
def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unitnode
def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unitapache
saveAsNewAPIHadoopFile用於將RDD數據保存到HDFS上,使用新版本Hadoop API。app
用法基本同saveAsHadoopFile。oop
[plain] view plain copyspa
print?.net
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import SparkContext._
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
- import org.apache.hadoop.io.Text
- import org.apache.hadoop.io.IntWritable
-
- var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
- rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
saveAsNewAPIHadoopDataset
def saveAsNewAPIHadoopDataset(conf: Configuration): Unitorm
做用同saveAsHadoopDataset,只不過採用新版本Hadoop API。htm
以寫入HBase爲例:blog
HBase建表:ip
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
完整的Spark應用程序:
[plain] view plain copy
print?
- package com.lxw1234.test
-
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import SparkContext._
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.mapreduce.Job
- import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.hbase.client.Result
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.hbase.client.Put
-
- object Test {
- def main(args : Array[String]) {
- val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
- val sc = new SparkContext(sparkConf);
- var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
-
- sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
- sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
- sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
- var job = new Job(sc.hadoopConfiguration)
- job.setOutputKeyClass(classOf[ImmutableBytesWritable])
- job.setOutputValueClass(classOf[Result])
- job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
-
- rdd1.map(
- x => {
- var put = new Put(Bytes.toBytes(x._1))
- put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
- (new ImmutableBytesWritable,put)
- }
- ).saveAsNewAPIHadoopDataset(job.getConfiguration)
-
- sc.stop()
- }
- }
-
注意:保存到HBase,運行時候須要在SPARK_CLASSPATH中加入HBase相關的jar包。
可參考:http://lxw1234.com/archives/2015/07/332.htm