Spark對不少種文件格式的讀取和保存方式都很簡單。Spark會根據文件擴展名選擇對應的處理方式。java
Spark支持的一些常見文件格式以下:mysql
使用文件路徑做爲參數調用SparkContext中的textFile()函數,就能夠讀取一個文本文件。也能夠指定minPartitions控制分區數。傳遞目錄做爲參數,會把目錄中的各部分都讀取到RDD中。例如:sql
val input = sc.textFile("E:\\share\\new\\chapter5") input.foreach(println)
chapter目錄有三個txt文件,內容以下:數據庫
輸出結果:apache
用SparkContext.wholeTextFiles()也能夠處理多個文件,該方法返回一個pair RDD,其中鍵是輸入文件的文件名。json
例如:網絡
val input = sc.wholeTextFiles("E:\\share\\new\\chapter5") input.foreach(println)
輸出結果:app
保存文本文件用saveAsTextFile(outputFile)函數
JSON是一種使用較廣的半結構化數據格式,這裏使用json4s來解析JSON文件。oop
以下:
import org.apache.spark.{SparkConf, SparkContext} import org.json4s.ShortTypeHints import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization object TestJson { case class Person(name:String,age:Int) def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("JSON") val sc = new SparkContext(conf) implicit val formats = Serialization.formats(ShortTypeHints(List())) val input = sc.textFile("E:\\share\\new\\test.json") input.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.age)}) } }
json文件內容:
輸出結果:
保存JSON文件用saveASTextFile(outputFile)便可
以下:
val datasave = input.map { myrecord => implicit val formats = DefaultFormats val jsonObj = parse(myrecord) jsonObj.extract[Person] } datasave.saveAsTextFile("E:\\share\\spark\\savejson")
輸出結果:
讀取CSV文件和讀取JSON數據類似,都須要先把文件看成普通文本文件來讀取數據,再對數據進行處理。
以下:
import org.apache.spark.{SparkConf, SparkContext} import java.io.StringReader import au.com.bytecode.opencsv.CSVReader object DataReadAndSave { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("CSV") val sc = new SparkContext(conf) val input = sc.textFile("E:\\share\\spark\\test.csv") input.foreach(println) val result = input.map{ line => val reader = new CSVReader(new StringReader(line)) reader.readNext() } for(res <- result){ for(r <- res){ println(r) } } } }
test.csv內容:
輸出結果:
保存csv
以下:
val inputRDD = sc.parallelize(List(Person("Mike", "yes"))) inputRDD.map(person => List(person.name,person.favoriteAnimal).toArray) .mapPartitions { people => val stringWriter = new StringWriter() val csvWriter = new CSVWriter(stringWriter) csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) }.saveAsTextFile("E:\\share\\spark\\savecsv")
SequenceFile是由沒有相對關係結構的鍵值對文件組成的經常使用Hadoop格式。是由實現Hadoop的Writable接口的元素組成,常見的數據類型以及它們對應的Writable類以下:
讀取SequenceFile
調用sequenceFile(path , keyClass , valueClass , minPartitions)
保存SequenceFile
調用saveAsSequenceFile(outputFile)
對象文件使用Java序列化寫出,容許存儲只包含值的RDD。對象文件一般用於Spark做業間的通訊。
保存對象文件調用 saveAsObjectFile 讀取對象文件用SparkContext的objectFile()函數接受一個路徑,返回對應的RDD
Spark能夠與任何Hadoop支持的格式交互。
讀取其餘Hadoop輸入格式,使用newAPIHadoopFile接收一個路徑以及三個類,第一個類是格式類,表明輸入格式,第二個類是鍵的類,最後一個類是值的類。
hadoopFile()函數用於使用舊的API實現的Hadoop輸入格式。
KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,能夠用於從文本文件中讀取鍵值對數據。每一行都會被獨立處理,鍵和值之間用製表符隔開。
例子:
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text} import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat object HadoopFile { def main(args: Array[String]) { val conf = new SparkConf().setAppName("hadoopfile").setMaster("local") val sc = new SparkContext(conf) val job = new Job() val data = sc.newAPIHadoopFile("E:\\share\\spark\\test.json" , classOf[KeyValueTextInputFormat], classOf[Text], classOf[Text], job.getConfiguration) data.foreach(println) data.saveAsNewAPIHadoopFile( "E:\\share\\spark\\savehadoop", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], job.getConfiguration) } }
輸出結果:
讀取
保存
若使用舊API以下:
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("E:\\share\\spark\\test.json
").map { case (x, y) => (x.toString, y.toString) } input.foreach(println)
對數據進行壓縮能夠節省存儲空間和網絡傳輸開銷,Spark原生的輸入方式(textFile和sequenFile)能夠自動處理一些類型的壓縮。在讀取壓縮後的數據時,一些壓縮編解碼器能夠推測壓縮類型。
Spark支持讀寫不少種文件系統,可使用任何咱們想要的文件格式。包括:
一、本地文件系統
要求文件在集羣中全部節點的相同路徑下均可以找到。 本地文件系統路徑使用 例如:val rdd = sc.textFile("file:///home/holden/happypandas.gz")。
二、Amazon S3
將一個以s3n://開頭的路徑以s3n://bucket/path-within-bucket的形式傳給Spark的輸入方法。
三、HDFS
在Spark中使用HDFS只須要將輸入路徑輸出路徑指定爲hdfs://master:port/path就能夠了
Apache Hive是Hadoop上一中常見的結構化數據源。Hive能夠在HDFS內或者在其餘存儲系統上存儲多種格式的表。SparkSQL能夠讀取Hive支持的任何表。
將Spark SQL鏈接到已有的Hive上,建立出HiveContext對象也就是Spark SQL入口,而後就可使用Hive查詢語言來對你的表進行查詢,並以由行組成的RDD形式返回數據。
使用HiveContext.jsonFile方法能夠從整個文件中獲取Row對象組成的RDD。例子:
import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Sparksql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkSQL") val sc = new SparkContext(conf) val sql = new HiveContext(sc) val input = sql.jsonFile("E:\\share\\spark\\tweets.json") input.registerTempTable("tweets") val topTweets = sql.sql("select user.name,text from tweets") topTweets.foreach(println) } }
使用數據:
輸出結果:
Spark能夠從任何支持Java數據庫鏈接(JDBC)的關係型數據庫中讀取數據,包括MySQL,Postgre等系統。
Spark鏈接JDBC,經過建立SQLContext對象進行鏈接,設置鏈接參數,而後就可使用sql語句進行查詢,結果返回一個jdbcRDD。以下:
首先在MySQL裏面創建名爲info的數據庫,建表及導入數據:
sql查詢數據:
使用Spark鏈接JDBC查詢,Scala代碼以下:
import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object JDBC { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkSQL") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val mysql = sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/info"). option("dbtable","student").option("driver","com.mysql.jdbc.Driver"). option("user","root").option("password","********").load() mysql.registerTempTable("student") mysql.sqlContext.sql("select * from student where sage >= 20").collect().foreach(println) } }
輸出結果: