Spark的Python編程-初步入門

原文地址:http://spark.apache.org/docs/latest/programming-guide.htmlhtml

Spark提供了Python腳本編程接口,這裏簡單介紹其使用。java

相關連接:East 2015 (Nov 26, 2014)ArchiveDownload Sparkpython

Spark比Hadoop運行程序快達100倍,在磁盤運行也快達10倍。得益於其內存計算架構和DAG任務調度機制,儘管底層一樣使用的是Hadoop的HDFS分佈式存儲系統。shell

在最頂層,Spark包含一個起始程序來運行用戶編寫的main 方法,而且執行多樣的並行操做在一個集羣上。apache

其中最主要的抽象就是Spark提供一個 RDD (彈性集羣數據集),是一些能夠經過節點訪問執行並行任務的集羣的集合。RDD 經過一個Hadoop文件系統中的文件被建立(或者其它支持Hadoop文件系統的系統)。用戶常常會讓Spark去保留一個RDD在內存裏。容許它在並 行操做時被高效重用。最後,Rdd將自動從節點錯誤中恢復。編程

第二個被抽象是,在並行操做時,變量將被共享。默認的,當Spark 在不一樣節點上並行運行任務集合中的一個方法,它爲任務中每一個方法的變量都保留一個備份。有時候,一些變量須要被跨任務共享,或者兩個任務間共享,或者給起 始程序共享。Spark提供兩種形式的變量:廣播變量,能夠在全部節點上緩存變量在內存裏。另外一個是儲蓄變量,只能被增長,例如數量,或者和。數組

本文展現這些功能,使用每一個Spark支持的語言,本文是python。若是你打開 Spark的交互shell,bin/spark-shell, 或者python的shell, bin/pyspark. 很容易學的。緩存

Spark 所使用的數據集RDDs 是一個能夠被並行操做的容錯數據集。有兩種方式去建立它:並行一個你本地存在的集合,或者引用一個額外的儲存系統。例如: 一個共享的文件系統,如HDFS、HBase, 或者任何提供Hadoop輸入格式的資源。架構

並行集合經過Spark 上下文 SparkContext’s parallelize 方法被建立,基於你磁盤上一個存在的能夠迭代的集合。集合中的元素經過經過複製造成一個分佈式數據集,這個數據集能夠被並行使用。例如。下面是一個怎樣建立一個併發數據集用數字1-5:併發

data = [1, 2, 3, 4, 5] 
distData = sc.parallelize(data)

一旦被建立。這個分佈式數據集能夠被併發使用。例如,咱們能夠 distData.reduce(lambda a, b: a + b) 去相加他們。

併發數據集的一個重要的參數是數據集被分割的子集數量。Spark 將爲每一個子集運行一個任務。表明性的,你想要沒給CPU運行2-4個子集。正常狀況下: Spark 將試圖設計你分區的數量分局你的集羣。可是,你依然能夠經過傳遞第二個參數來手動設置它。 

parallelize (e.g. sc.parallelize(data, 10))

注意:你代碼中一些地方會用到 slices (子集的別名),爲了上下兼容性而存在的。

其它數據集合:

Pyspark 能夠從任何被hadoop 支持的存儲資源中建立分佈式數據集。包括本地文件系統,HDFS, Cassandra, HBase, Amazon S3 等。Sparks 支持文本文件,hadoop 輸入格式文件。

文本文件RDDs 能夠經過 SparkContext’s textFile 方法建立這個方法使用一個URI 爲這個文件,不論是本地路徑,仍是hdfs://s3n://, URI。將它讀做行的集合。下面是一個例子:

distFile = sc.textFile("data.txt")

一旦被建立。文件能夠經過集合操做使用。例如,咱們能夠相加行的數目經過使用 map 和 reduce 方法以下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

在用Spark讀取文件時候的一些注意項:

  • 若是使用一個本地文件系統,這個文件的權限必須被開放。能夠被複制或者被共享。

  • 全部的Spark文件輸入方法,包括 textFile, 也支持運行目錄,壓縮文件,和通配符。例如,你可使用 textFile("/my/directory") ,textFile("/my/directory/*.txt") 和  textFile("/my/directory/*.gz");

  • textFile 方法一樣使用第二個參數去控制文件被分割的子集。默認,Spark爲這個文件的每一個Block (在 HDFS中是64M )建立一個子集,可是你能夠經過傳遞一個參數來設定更高的值。注意,這個值不能比默認 blocks分割的更小。

除了 textFile , Spark的Python API 提供了其它的數據格式:

  • SparkContext.wholeTextFiles 讓你讀一個包含不少小文件的目錄。而後返回以鍵值對的方式返回它們(文件名,內容)。這個方法相對 textFile ,後者返回文件的每一行。

  • RDD.saveAsPickleFile and SparkContext.pickleFile  支持以簡單python對象格式存儲一個RDD 。

  • SequenceFile and Hadoop Input/Output Formats

注意,這個功能目前在實驗階段。用於高級用戶。它或許被替代在未來。新的方式將採用基於 Spark SQL 讀寫。那時候,Spark SQL 將是最優先的方式。

寫支持:

PySpark SequenceFile 支持加載一個java 鍵值組成的RDD ,根據java 類型轉換成可寫的。當保存一個鍵值對組成的RDD 到 SequenceFile中, PySpark 進行這個轉換。它把Python對象轉成 Java對象,而後把它們轉變成可寫的。下面這些類型自動轉換:

 

數組不被處理,用戶須要去指定自定義的 ArrayWritable 子類型 當讀寫的時候。在寫的時候,用戶還須要指定自定義的轉換把 數組轉換成 自動以的 ArrayWritalbe 子類型。在讀的時候,默認的轉換將自定義的ArrayWritable 子類型轉成Java 對象。而後轉成Python元組。

保存和加載 SequenceFiles

和文本文件相似。SequenceFiles 經過指定路徑被保存和加載。key 和 value 能夠被分開。可是對於標準寫來說,不須要如此。

rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
rdd.saveAsSequenceFile("path/to/file")
sorted(sc.sequenceFile("path/to/file").collect()) 
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

Rdd 操做:

Rdd 支持兩種形式的操做:

一、轉換,根據現有的數據集建立一個新的。

二、動做,在數據集上運行一些計算後返回值。例如,map 是一個轉換,經過一個方法將數據集的每一個元素返回到一個rdd結果中,另外一邊,reduce 是一個動做,經過方法將RDDd的元素合併成一個最終結果(reduceByKey 返回一個分佈式數據集)。

Spark中全部的轉換都是懶執行的,它們並不立刻計算它們的結果。代替的,它們記着這些用於基礎數據集的轉換,當一個動做要求一個結果被返回來,那麼才執行這些計算。這個設計可使Spark 運行更高效。例如。咱們能夠實現一個經過map建立的數據集被reduce 使用而後返回一個reduce結果,而不是一個大的map過的集合。

默認,每個被轉換的RDD或許被從新計算當你每次對它使用action時。可是,你能夠緩存 一個RDD在內存裏,經過使用 persist或者cache方法。此時,Spark將保存這個元素在集羣上,爲了你下次更快使用它。也支持緩存rdds 在磁盤上。或者複製在多節點上。

lines = sc.textFile("data.txt") 
lineLengths = lines.map(lambda s: len(s)) 
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行根據一個外部文件定義了一個基礎rdd。數據集不被加載到內存,除非被動做執行。 lines 僅僅是一個文件的指針,第二行定義一個 lineLength 做爲一個map 的轉換結果。一樣的,lineLength不是當即被計算。由於懶執行。最後,我麼能夠reduce, 它是一個動做。 此時,Spark 分割計算成不少任務。在各個機器上運行,每一個機器運行它本身部分的map和一個reduce 的結果。僅僅返回這個結果到起始程序中。

若是咱們以後須要屢次使用lineLength,咱們能夠緩存它:

lineLengths.persist()

在reduce 以前,lineLengths 將再第一次被計算出來後緩存到內存。

傳遞方法給Spark:

Spark 的API 特別支持傳遞方法給起始程序,而後在集羣上運行。有三種推薦方式:

  • Lambda表達式, 對於簡單的方法能夠寫成表達式,Lambda不支持多行方法或者不返回值的方法。

  • 對於長一點的代碼。在方法內部定義。

  • 做爲一個模塊。

例如,傳遞一個長一點的方法而不是使用lomba。看下面代碼:

"""MyScript.py""" 
def myFunc(s):         
    words = s.split(" ")         
    return len(words)
        
if __name__ == "__main__":     
    sc = SparkContext(...)     
    sc.textFile("file.txt").map(myFunc)

注意,當一個對象實例方法容許傳入引用時(和單例對象相反)。須要傳遞這個class和方法一塊兒:例如:

class MyClass(object):     
    def func(self, s):         
        return s     
    def doStuff(self, rdd):         
        return rdd.map(self.func)

建立了一個新的MyClass, 而後調用doStuff。內部的map引用這個MyClass實例的func,因此整個對象須要被傳遞到集羣。

相似的,外部對象的可訪問領域,將引用真個對象。

class MyClass(object):     
    def __init__(self):         
        self.field = "Hello"     
    def doStuff(self, rdd):         
        return rdd.map(lambda s: self.field + x)

爲了防止出錯,最簡單的方式是複製 field到一個本地變量代替訪問它:

def doStuff(self, rdd):     
    field = self.field     
    return rdd.map(lambda s: field + x)
相關文章
相關標籤/搜索