開發工程師和數據科學家都會受益於本章的部份內容。工程師可能但願探索更多的輸出格式,看看有沒有一些適合他們下游用戶的格式。數據科學家可能會更關注他們已經使用的數據格式。<br>數據庫
Motivation
咱們已經介紹了大量分佈式程序使用的Spark操做。目前爲止,咱們的例子都是從一個本地集合和規整文件中加載數據,可是有可能你的數據不是規整的或者不在一臺機器上,那麼就跟着我一塊兒探索加載和保存數據的操做用法。<br> Spark支持普遍的輸出輸入源,部分緣由是由於Spark構建在Haddoop生態環境之上。Spark能夠經過Hadoop中MapReduce的InputFormat
和OutPutForamt
接口來訪問數據,這些接口可用於不少經常使用的文件格式和存儲系統(如,S3,HDFS,Cassadra,HBase等等)。第84頁上的「Hadoop輸入和輸出格式」展現瞭如何直接使用這些格式。<br> 更常見的狀況是,你會想使用對原始接口封裝的更高級API。Spark和它的生態系統提供了不少使用方式,這一章,咱們會詳細介紹三個經常使用數據源集合:<br>json
-
文件格式和文件系統<br> 對於數據存儲在本地或分佈式的文件系統,如NFS,HDFS,或亞馬遜的S3,Spark能夠訪問各類文件格式,包括文本,JSON,SequnceFile和 Protocol buffer。咱們會介紹如何使用這幾種經常使用文件格式,以及Spark如何與不一樣的文件系統對接和配置壓縮。<br>數組
-
經過Spark SQL構建結構化數據源<br> 第九章介紹的Spark SQL模塊,提供了很是高效的結構化數據源API,包括JSON和Apache的Hive。咱們會簡要介紹一些Spark SQL,可是大部份內容留在第九章<br>app
-
數據庫和鍵值對存儲<br> 咱們會簡要了解如何鏈接Cassandra,HBase,Elasticsearch和JDBC數據庫的內置或第三方庫。<br>分佈式
Spark支持的語言可使用咱們選擇的大多數方法,但有些庫只有Java和Scala可使用。咱們遇到這樣的例子會進行提示。<br>函數
文件格式<br>
對於大多數文件格式,Spark加載和保存的方法都很是簡單。從非結構化的格式,如text,到半結構化數據,如JSON,再到結構化數據,如SequenceFile(見表5-1)。The input formats that Spark wraps all transparently handle compressed formats based on the file extension.<br> (我翻不出來:(,對於輸入格式,Spark會對其包裝,包裝了的輸入格式會基於文件擴展名透明地處理壓縮格式)。<br>oop
格式名 | 是否結構化 | 評價 |
---|---|---|
文本文件 | 非 | 普通文本文件。假定每行一條記錄 |
JSON | 半結構 | 常見的基於文本格式,半結構;大多數庫須要每行一條記錄 |
CSV | 是 | 很是經常使用的基於文本的格式,一般用於電子表格應用 |
SequnceFiles | 是 | Hadoop中一種經常使用的鍵值對數據格式 |
Protocol buffer | 是 | 一種快速的,空間利用率高的多語言格式 |
Object file | 是 | Spark job用來共享代碼的保存數據格式,很是有用 |
除了Spark直接支持的輸出機制以外,咱們能夠將Hadoop的新老文件API用於鍵值對數據。能夠在鍵值對上使用這些API,由於Hadoop的接口要求鍵值對數據,即便是忽略鍵的數據。在忽略鍵的狀況下,一般使用虛擬鍵(如null)。性能
Text Files(文本文件)
Spark中加載保存文本文件很是簡單。當咱們加載一個簡單的文本文件做爲RDD時,每一行輸入變成了RDD的一個基本元素。咱們能夠把多個文本文件同時加載到一個鍵值對RDD中,鍵做爲文件名,值做爲文件內容。<br>大數據
loading text files(加載文本文件)<br>
加載文本文件就和在SparkContext
調用textFile(文件路徑)
同樣簡單,Example5-1到5-3展現了例子。若是咱們想控制分區的數量咱們也能夠明確設定minPartitions
。<br>編碼
Example 5-1. Loading a text file in Python input = sc.textFile("file:///home/holden/repos/spark/README.md") Example 5-2. Loading a text file in Scala val input = sc.textFile("file:///home/holden/repos/spark/README.md") Example 5-3. Loading a text file in Java JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
參數是目錄形式的多文件輸入可使用兩種方式處理。咱們能夠直接使用textFile
方法並把目錄地址做爲參數傳入,這就會把目錄中全部文件傳進咱們的RDD。有時候須要知道某個文件的某一部分的來源(例如時間和文件的鍵結合做爲區分標誌)或者咱們須要一次處理整個文件。若是咱們的文件足夠小,咱們可使用SparkContext.wholeTextFiles()
方法,這會返回一個以輸入文件的名稱做爲鍵的鍵值對RDD。<br>
當每一個文件能表示一個肯定的時間數據,wholeTextFiles()
就會變得頗有用。若是咱們有可以表示不一樣時間段銷售數據的文件,咱們能夠很輕易地計算出每一個時間段的平均值。示例:<br>
Example 5-4. Average value per file in Scala val input = sc.wholeTextFiles("file://home/holden/salesFiles") val result = input.mapValues{y => val nums = y.split(" ").map(x => x.toDouble) nums.sum / nums.size.toDouble }
Spark支持讀取指定目錄的全部文件,而且也支持通配符輸入(如,part-*.txt)。這個用處很是大,由於較大數據集一般分佈在多個文件中,特別是若是其餘文件(如成功標誌)和要處理的文件在同一目錄中。
Saving text files(保存文本文件)<br>
輸出文本文件也很簡單。saveAsTextFile()
方法,如Example5-5所示,以一個輸出的路徑做爲參數,並將RDD內容輸入到該路徑之中。這個路徑會做爲一個目錄而且Spark會把多個文件輸入到該目錄之下。Spark能夠把輸出結果寫入多個節點。這個方法不能控制輸出數據分段的開始和結束位置。但有其餘的方法能夠控制。<br>
Example 5-5. Saving as a text file in Python result.saveAsTextFile(outputFile)
JSON
JSON是一個很流行的半結構化數據格式。加載JSON數據最簡單的方式就是把JSON當作文本文件而後用JSON解析器解析映射值。一樣的,咱們可使用偏好的JSON序列化庫把值寫入字符串,而後再給寫出。在Java和Scala中,咱們可使用定製的Hadoop格式來處理JSON。172也也介紹了Spark SQL怎麼加載JSON數據。<br>
loading JSON(加載JSON)<br>
像文本文件同樣加載而後轉換JSON數據是Spark全部支持的語言均可以使用的一種方法。這是假定你的JSON數據每條記錄都在一行之中,若是你的JSON數據是多行的,你可能必須加載整個文件而後轉換每一個文件。若是你使用的語言構建JSON解析器的代價比較大,你可使用mapPartitions()
來重用解析器;107頁"Working on a Per-Partition Basis"有這方面的細節。 咱們經常使用的這三種語言有大量的JSON庫可使用,但爲了簡單起見,每種語言只介紹一種庫。在Python中咱們使用內置庫(Example5-6),Java和Scala中咱們使用Jackson(Examples5-7和5-8)。選擇這幾個庫是由於他們運行效果很好而且使用相對簡單。若是你在解析過程當中花了大量時間,那能夠看一下Java和Scala其餘的JSON庫。<br>
Example 5-6. Loading unstructured JSON in Python import json data = input.map(lambda x: json.loads(x))
在Scala和Java中,一般把JSON數據轉化爲表示JSON格式的類。解析時,咱們可能但願跳過無效的記錄。下面展現了一個把JSON數據轉換成Person實例的一個例子。<br>
Example 5-7. Loading JSON in Scala import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature ... case class Person(name: String, lovesPandas: Boolean) // Must be a top-level class ... // Parse it into a specific case class. We use flatMap to handle errors // by returning an empty list (None) if we encounter an issue and a // list with one element if everything is ok (Some(_)). //把JSON轉化爲樣例類。使用flatMap轉化,若是遇到錯誤就返回list(None),不然就返回一條記錄的Some(_) val result = input.flatMap(record => { try { Some(mapper.readValue(record, classOf[Person])) } catch { case e: Exception => None }}) Example 5-8. Loading JSON in Java class ParseJson implements FlatMapFunction<Iterator<String>, Person> { public Iterable<Person> call(Iterator<String> lines) throws Exception { ArrayList<Person> people = new ArrayList<Person>(); ObjectMapper mapper = new ObjectMapper(); while (lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Person.class)); } catch (Exception e) { // skip records on failure } } return people; } } JavaRDD<String> input = sc.textFile("file.json"); JavaRDD<Person> result = input.mapPartitions(new ParseJson());
處理格式不正確的記錄可能很麻煩,特別是半結構化的數據,如JSON。對於小數據集因格式不正確的輸入致使程序崩潰還能夠接受,可是,處理大數據集時遇到格式畸形輸入也是屢見不鮮。若是選擇跳過錯誤格式的數據,你可能會想使用累加器來記錄並追蹤錯誤的數量。<br>
Saving JSON(保存JSON)<br>
輸出JSON比加載JSON簡單不少,由於不用擔憂錯誤格式的數據,而且知道咱們輸出數據的類型。咱們能夠直接使用把字符串RDD轉換成JSON的相同的庫,把對象轉換成JSON後再使用Spark文本文件API將其輸出。<br> 假如咱們正在舉行一個對喜好熊貓的人的促銷。使用第一步中的輸入,過濾出喜歡熊貓的人,示例以下:<br>
Example 5-9. Saving JSON in Python (data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)) .saveAsTextFile(outputFile)) Example 5-10. Saving JSON in Scala result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)) .saveAsTextFile(outputFile) Example 5-11. Saving JSON in Java class WriteJson implements FlatMapFunction<Iterator<Person>, String> { public Iterable<String> call(Iterator<Person> people) throws Exception { ArrayList<String> text = new ArrayList<String>(); ObjectMapper mapper = new ObjectMapper(); while (people.hasNext()) { Person person = people.next(); text.add(mapper.writeValueAsString(person)); } return text; } } JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter( new LikesPandas()); JavaRDD<String> formatted = result.mapPartitions(new WriteJson()); formatted.saveAsTextFile(outfile)
使用已有的文本文件的機制再添加上須要的JSON庫,在Spark中加載和保存JSON變得多麼簡單。<br>
Comma-Separated Values and Tab-Separated Values(逗號分割和Tab分割)
逗號分割值文件要求每行字段數量固定,而且這些字段被逗號分割(TSV文件是根據tab鍵進行分割)。一般是每行儲存一條記錄,可是實際上並不是老是如此,由於記錄常常超過一行。CSV和TSV文件有時可能不一致,常常出如今處理換行符,轉義,顯示非ASCII字符,或非整數數字時。CSV天生不能處理嵌套類型字段,因此咱們必須手動處理。<br>
不像JSON的字段,每條記錄都沒有字段名字與其關聯,只取行號就好了。在CSV文件中有個不成文的規定,第一行的每一列的值就是每一個字段的名稱。<br>
Loading CSV(加載CSV)<br>
加載CSV/TSV數據和加載JSON很像,按照文本文件那樣加載而後再進行處理。若是格式缺乏標準化,會致使相同庫的不一樣版本之間處理的方式不一樣。<br>
和JSON同樣,有不少CSV的庫可使用,不過咱們每一個語言只介紹一種。在Python中咱們使用csv庫。在Scala和Java中咱們使用opencsv。<br>
固然也有Hadoop的輸入格式,CSVInputFormat,能夠用來在Scala或Java中加載CSV數據,它也不支持換行符的記錄。<br>
若是你的CSV數據的任何字段都不包含換行符,你能夠直接使用textFile()
加載你的數據並進行轉換。示例以下:<br>
Example 5-12. Loading CSV with textFile() in Python import csv import StringIO ... def loadRecord(line): """Parse a CSV line""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) Example 5-13. Loading CSV with textFile() in Scala import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... val input = sc.textFile(inputFile) val result = input.map{ line => val reader = new CSVReader(new StringReader(line)); reader.readNext(); } Example 5-14. Loading CSV with textFile() in Java import au.com.bytecode.opencsv.CSVReader; import Java.io.StringReader; ... public static class ParseLine implements Function<String, String[]> { public String[] call(String line) throws Exception { CSVReader reader = new CSVReader(new StringReader(line)); return reader.readNext(); } } JavaRDD<String> csvFile1 = sc.textFile(inputFile); JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());
若是字段中內嵌了換行符,咱們須要加載整個文件而且解析整段,就如Examples5-15到5-17中所展現。很不幸的是若是每一個文件都很是大,那加載解析過程可能出現性能瓶頸。文本文件不一樣的加載方法在73頁"Loading text files"有描述。<br>
Example 5-15. Loading CSV in full in Python def loadRecords(fileNameContents): """Load all the records in a given file""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords) Example 5-16. Loading CSV in full in Scala case class Person(name: String, favoriteAnimal: String) val input = sc.wholeTextFiles(inputFile) val result = input.flatMap{ case (_, txt) => val reader = new CSVReader(new StringReader(txt)); reader.readAll().map(x => Person(x(0), x(1))) } Example 5-17. Loading CSV in full in Java public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> { public Iterable<String[]> call(Tuple2<String, String> file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file._2())); return reader.readAll(); } } JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile); JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
若是隻有少許的輸入文件,而且你須要使用
wholeFile()
方法,則可能須要從新分區輸入使得Spark高效並行化執行你的後續操做。<br>
Saving CSV(保存CSV)
就像JSON同樣,輸出CSV/TSV數據也很簡單而且重用輸出的編碼對象也有不少優勢。由於在CSV中咱們不用輸出每一個記錄的字段名,咱們須要建立一個映射來確保輸出的一致性。一個簡單的方法就是寫一個函數,把字段轉換到數組的指定位置。在Python中,若是咱們想輸出字典,CSV writer
按照咱們提供的字段名順序構建writer
,而後將字典輸出。<br>
咱們使用的CSV庫輸出到文件或writer
,因此咱們可使用StringWriter/StringIO
來把結果輸入到RDD之中,示例以下:<br>
Example 5-18. Writing CSV in Python def writeRecords(records): """Write out CSV lines""" output = StringIO.StringIO() writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) for record in records: writer.writerow(record) return [output.getvalue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile) Example 5-19. Writing CSV in Scala pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray) .mapPartitions{people => val stringWriter = new StringWriter(); val csvWriter = new CSVWriter(stringWriter); csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) }.saveAsTextFile(outFile)
你可能注意到了,例子中咱們知道要輸出內容的全部的字段。若是有些輸入的字段是在運行時肯定的,那咱們只能換個方式了。最簡單的辦法就是先遍歷全部的數據來提取不一樣的字段,字段肯定以後就能夠輸出數據了。<br>