Learning Spark中文版--第五章--加載保存數據(1)

  開發工程師和數據科學家都會受益於本章的部份內容。工程師可能但願探索更多的輸出格式,看看有沒有一些適合他們下游用戶的格式。數據科學家可能會更關注他們已經使用的數據格式。<br>數據庫

Motivation

  咱們已經介紹了大量分佈式程序使用的Spark操做。目前爲止,咱們的例子都是從一個本地集合和規整文件中加載數據,可是有可能你的數據不是規整的或者不在一臺機器上,那麼就跟着我一塊兒探索加載和保存數據的操做用法。<br>   Spark支持普遍的輸出輸入源,部分緣由是由於Spark構建在Haddoop生態環境之上。Spark能夠經過Hadoop中MapReduce的InputFormatOutPutForamt接口來訪問數據,這些接口可用於不少經常使用的文件格式和存儲系統(如,S3,HDFS,Cassadra,HBase等等)。第84頁上的「Hadoop輸入和輸出格式」展現瞭如何直接使用這些格式。<br>   更常見的狀況是,你會想使用對原始接口封裝的更高級API。Spark和它的生態系統提供了不少使用方式,這一章,咱們會詳細介紹三個經常使用數據源集合:<br>json

  • 文件格式和文件系統<br> 對於數據存儲在本地或分佈式的文件系統,如NFS,HDFS,或亞馬遜的S3,Spark能夠訪問各類文件格式,包括文本,JSON,SequnceFile和 Protocol buffer。咱們會介紹如何使用這幾種經常使用文件格式,以及Spark如何與不一樣的文件系統對接和配置壓縮。<br>數組

  • 經過Spark SQL構建結構化數據源<br> 第九章介紹的Spark SQL模塊,提供了很是高效的結構化數據源API,包括JSON和Apache的Hive。咱們會簡要介紹一些Spark SQL,可是大部份內容留在第九章<br>app

  • 數據庫和鍵值對存儲<br> 咱們會簡要了解如何鏈接Cassandra,HBase,Elasticsearch和JDBC數據庫的內置或第三方庫。<br>分佈式

  Spark支持的語言可使用咱們選擇的大多數方法,但有些庫只有Java和Scala可使用。咱們遇到這樣的例子會進行提示。<br>函數

文件格式<br>

  對於大多數文件格式,Spark加載和保存的方法都很是簡單。從非結構化的格式,如text,到半結構化數據,如JSON,再到結構化數據,如SequenceFile(見表5-1)。The input formats that Spark wraps all transparently handle compressed formats based on the file extension.<br> (我翻不出來:(,對於輸入格式,Spark會對其包裝,包裝了的輸入格式會基於文件擴展名透明地處理壓縮格式)。<br>oop

格式名 是否結構化 評價
文本文件 普通文本文件。假定每行一條記錄
JSON 半結構 常見的基於文本格式,半結構;大多數庫須要每行一條記錄
CSV 很是經常使用的基於文本的格式,一般用於電子表格應用
SequnceFiles Hadoop中一種經常使用的鍵值對數據格式
Protocol buffer 一種快速的,空間利用率高的多語言格式
Object file Spark job用來共享代碼的保存數據格式,很是有用

  除了Spark直接支持的輸出機制以外,咱們能夠將Hadoop的新老文件API用於鍵值對數據。能夠在鍵值對上使用這些API,由於Hadoop的接口要求鍵值對數據,即便是忽略鍵的數據。在忽略鍵的狀況下,一般使用虛擬鍵(如null)。性能

Text Files(文本文件)

  Spark中加載保存文本文件很是簡單。當咱們加載一個簡單的文本文件做爲RDD時,每一行輸入變成了RDD的一個基本元素。咱們能夠把多個文本文件同時加載到一個鍵值對RDD中,鍵做爲文件名,值做爲文件內容。<br>大數據

loading text files(加載文本文件)<br>

  加載文本文件就和在SparkContext調用textFile(文件路徑)同樣簡單,Example5-1到5-3展現了例子。若是咱們想控制分區的數量咱們也能夠明確設定minPartitions。<br>編碼

Example 5-1. Loading a text file in Python

input = sc.textFile("file:///home/holden/repos/spark/README.md")

Example 5-2. Loading a text file in Scala

val input = sc.textFile("file:///home/holden/repos/spark/README.md")

Example 5-3. Loading a text file in Java

JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")

  參數是目錄形式的多文件輸入可使用兩種方式處理。咱們能夠直接使用textFile方法並把目錄地址做爲參數傳入,這就會把目錄中全部文件傳進咱們的RDD。有時候須要知道某個文件的某一部分的來源(例如時間和文件的鍵結合做爲區分標誌)或者咱們須要一次處理整個文件。若是咱們的文件足夠小,咱們可使用SparkContext.wholeTextFiles()方法,這會返回一個以輸入文件的名稱做爲鍵的鍵值對RDD。<br>

   當每一個文件能表示一個肯定的時間數據,wholeTextFiles()就會變得頗有用。若是咱們有可以表示不一樣時間段銷售數據的文件,咱們能夠很輕易地計算出每一個時間段的平均值。示例:<br>

Example 5-4. Average value per file in Scala

val input = sc.wholeTextFiles("file://home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}

Spark支持讀取指定目錄的全部文件,而且也支持通配符輸入(如,part-*.txt)。這個用處很是大,由於較大數據集一般分佈在多個文件中,特別是若是其餘文件(如成功標誌)和要處理的文件在同一目錄中。

Saving text files(保存文本文件)<br>

   輸出文本文件也很簡單。saveAsTextFile()方法,如Example5-5所示,以一個輸出的路徑做爲參數,並將RDD內容輸入到該路徑之中。這個路徑會做爲一個目錄而且Spark會把多個文件輸入到該目錄之下。Spark能夠把輸出結果寫入多個節點。這個方法不能控制輸出數據分段的開始和結束位置。但有其餘的方法能夠控制。<br>

Example 5-5. Saving as a text file in Python

result.saveAsTextFile(outputFile)

JSON

  JSON是一個很流行的半結構化數據格式。加載JSON數據最簡單的方式就是把JSON當作文本文件而後用JSON解析器解析映射值。一樣的,咱們可使用偏好的JSON序列化庫把值寫入字符串,而後再給寫出。在Java和Scala中,咱們可使用定製的Hadoop格式來處理JSON。172也也介紹了Spark SQL怎麼加載JSON數據。<br>

loading JSON(加載JSON)<br>

  像文本文件同樣加載而後轉換JSON數據是Spark全部支持的語言均可以使用的一種方法。這是假定你的JSON數據每條記錄都在一行之中,若是你的JSON數據是多行的,你可能必須加載整個文件而後轉換每一個文件。若是你使用的語言構建JSON解析器的代價比較大,你可使用mapPartitions()來重用解析器;107頁"Working on a Per-Partition Basis"有這方面的細節。   咱們經常使用的這三種語言有大量的JSON庫可使用,但爲了簡單起見,每種語言只介紹一種庫。在Python中咱們使用內置庫(Example5-6),Java和Scala中咱們使用Jackson(Examples5-7和5-8)。選擇這幾個庫是由於他們運行效果很好而且使用相對簡單。若是你在解析過程當中花了大量時間,那能夠看一下Java和Scala其餘的JSON庫。<br>

Example 5-6. Loading unstructured JSON in Python

import json
data = input.map(lambda x: json.loads(x))

在Scala和Java中,一般把JSON數據轉化爲表示JSON格式的類。解析時,咱們可能但願跳過無效的記錄。下面展現了一個把JSON數據轉換成Person實例的一個例子。<br>

Example 5-7. Loading JSON in Scala
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) // Must be a top-level class
...
// Parse it into a specific case class. We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
//把JSON轉化爲樣例類。使用flatMap轉化,若是遇到錯誤就返回list(None),不然就返回一條記錄的Some(_)
val result = input.flatMap(record => {
    try {
        Some(mapper.readValue(record, classOf[Person]))
    } catch {
        case e: Exception => None
    }})
    
Example 5-8. Loading JSON in Java

class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
    public Iterable<Person> call(Iterator<String> lines) throws Exception {
        ArrayList<Person> people = new ArrayList<Person>();
        ObjectMapper mapper = new ObjectMapper();
        while (lines.hasNext()) {
            String line = lines.next();
            try {
                people.add(mapper.readValue(line, Person.class));
            } catch (Exception e) {
            // skip records on failure
            }
        }
        return people;
    }
}
JavaRDD<String> input = sc.textFile("file.json");
JavaRDD<Person> result = input.mapPartitions(new ParseJson());

處理格式不正確的記錄可能很麻煩,特別是半結構化的數據,如JSON。對於小數據集因格式不正確的輸入致使程序崩潰還能夠接受,可是,處理大數據集時遇到格式畸形輸入也是屢見不鮮。若是選擇跳過錯誤格式的數據,你可能會想使用累加器來記錄並追蹤錯誤的數量。<br>

Saving JSON(保存JSON)<br>

  輸出JSON比加載JSON簡單不少,由於不用擔憂錯誤格式的數據,而且知道咱們輸出數據的類型。咱們能夠直接使用把字符串RDD轉換成JSON的相同的庫,把對象轉換成JSON後再使用Spark文本文件API將其輸出。<br>   假如咱們正在舉行一個對喜好熊貓的人的促銷。使用第一步中的輸入,過濾出喜歡熊貓的人,示例以下:<br>

Example 5-9. Saving JSON in Python

(data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))
    .saveAsTextFile(outputFile))
Example 5-10. Saving JSON in Scala

result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
    .saveAsTextFile(outputFile)
Example 5-11. Saving JSON in Java
class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
    public Iterable<String> call(Iterator<Person> people) throws Exception {
        ArrayList<String> text = new ArrayList<String>();
        ObjectMapper mapper = new ObjectMapper();
        while (people.hasNext()) {
            Person person = people.next();
            text.add(mapper.writeValueAsString(person));
        }
        return text;
    }
}
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(
    new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile)

  使用已有的文本文件的機制再添加上須要的JSON庫,在Spark中加載和保存JSON變得多麼簡單。<br>

Comma-Separated Values and Tab-Separated Values(逗號分割和Tab分割)

  逗號分割值文件要求每行字段數量固定,而且這些字段被逗號分割(TSV文件是根據tab鍵進行分割)。一般是每行儲存一條記錄,可是實際上並不是老是如此,由於記錄常常超過一行。CSV和TSV文件有時可能不一致,常常出如今處理換行符,轉義,顯示非ASCII字符,或非整數數字時。CSV天生不能處理嵌套類型字段,因此咱們必須手動處理。<br>

  不像JSON的字段,每條記錄都沒有字段名字與其關聯,只取行號就好了。在CSV文件中有個不成文的規定,第一行的每一列的值就是每一個字段的名稱。<br>

Loading CSV(加載CSV)<br>

  加載CSV/TSV數據和加載JSON很像,按照文本文件那樣加載而後再進行處理。若是格式缺乏標準化,會致使相同庫的不一樣版本之間處理的方式不一樣。<br>

  和JSON同樣,有不少CSV的庫可使用,不過咱們每一個語言只介紹一種。在Python中咱們使用csv庫。在Scala和Java中咱們使用opencsv。<br>

固然也有Hadoop的輸入格式,CSVInputFormat,能夠用來在Scala或Java中加載CSV數據,它也不支持換行符的記錄。<br>

  若是你的CSV數據的任何字段都不包含換行符,你能夠直接使用textFile()加載你的數據並進行轉換。示例以下:<br>

Example 5-12. Loading CSV with textFile() in Python

import csv
import StringIO
...
def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

Example 5-13. Loading CSV with textFile() in Scala

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{ line =>
    val reader = new CSVReader(new StringReader(line));
    reader.readNext();
}


Example 5-14. Loading CSV with textFile() in Java
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
...
public static class ParseLine implements Function<String, String[]> {
    public String[] call(String line) throws Exception {
        CSVReader reader = new CSVReader(new StringReader(line));
        return reader.readNext();
    }
}
JavaRDD<String> csvFile1 = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());

  若是字段中內嵌了換行符,咱們須要加載整個文件而且解析整段,就如Examples5-15到5-17中所展現。很不幸的是若是每一個文件都很是大,那加載解析過程可能出現性能瓶頸。文本文件不一樣的加載方法在73頁"Loading text files"有描述。<br>

Example 5-15. Loading CSV in full in Python

def loadRecords(fileNameContents):
    """Load all the records in a given file"""
    input = StringIO.StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
    return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

Example 5-16. Loading CSV in full in Scala

case class Person(name: String, favoriteAnimal: String)

val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
    val reader = new CSVReader(new StringReader(txt));
    reader.readAll().map(x => Person(x(0), x(1)))
}

Example 5-17. Loading CSV in full in Java

public static class ParseLine
    implements FlatMapFunction<Tuple2<String, String>, String[]> {
    public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
        CSVReader reader = new CSVReader(new StringReader(file._2()));
        return reader.readAll();
    }
}
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());

若是隻有少許的輸入文件,而且你須要使用wholeFile()方法,則可能須要從新分區輸入使得Spark高效並行化執行你的後續操做。<br>

Saving CSV(保存CSV)

  就像JSON同樣,輸出CSV/TSV數據也很簡單而且重用輸出的編碼對象也有不少優勢。由於在CSV中咱們不用輸出每一個記錄的字段名,咱們須要建立一個映射來確保輸出的一致性。一個簡單的方法就是寫一個函數,把字段轉換到數組的指定位置。在Python中,若是咱們想輸出字典,CSV writer按照咱們提供的字段名順序構建writer,而後將字典輸出。<br>

  咱們使用的CSV庫輸出到文件或writer,因此咱們可使用StringWriter/StringIO來把結果輸入到RDD之中,示例以下:<br>

Example 5-18. Writing CSV in Python

def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
    
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

Example 5-19. Writing CSV in Scala

pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
    val stringWriter = new StringWriter();
    val csvWriter = new CSVWriter(stringWriter);
    csvWriter.writeAll(people.toList)
    Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

  你可能注意到了,例子中咱們知道要輸出內容的全部的字段。若是有些輸入的字段是在運行時肯定的,那咱們只能換個方式了。最簡單的辦法就是先遍歷全部的數據來提取不一樣的字段,字段肯定以後就能夠輸出數據了。<br>

相關文章
相關標籤/搜索