咱們的數據一天就一個T,數據量不斷增大,集羣磁盤有限,因此把冷數據放到了oss,偶爾會使用到冷數據,若是使用的時候還的從oss上拉數據這樣很浪費時間後來想了個辦法能夠直接獲取到oss上的數據。
案例:
object OssWc {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println(
"""
|Arguments:
| inputPath Input OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/input/words.txt
| outputPath Output OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/output
| numPartitions The number of RDD partitions.
|
""".stripMargin)
System.exit(1)
}
Logger.getLogger("org").setLevel(Level.WARN)
val conf = new SparkConf().setAppName("OssWc")//.setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.accessKeyId", "SLFNEWKBG")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "SDFSFSS")
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val numPartitions = 1
val input = sc.textFile(inputPath,numPartitions)
val output = input.flatMap(_.split(",")).map(x=>(x,1)).reduceByKey(_+_).foreach(println(_))
output.saveAsTextFile(outputPath)
sc.stop()
}
}
本地測試時注意下
//.setMaster("local[4]")
否則會報如下錯:
2018-01-25 12:04:38 [ main:1 ] - [ ERROR ] org.apache.spark.internal.Logging$class.logError(Logging.scala:91) Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
at text.OssWc$.main(OssWc.scala:32)
at text.OssWc.main(OssWc.scala)
Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:379)
at text.OssWc$.main(OssWc.scala:32)
at text.OssWc.main(OssWc.scala)apache