原文地址:http://spark.apache.org/docs/latest/programming-guide.htmlhtml
Spark提供了Python腳本編程接口,這裏簡單介紹其使用。java
相關連接:East 2015 (Nov 26, 2014) 、Archive 、Download Sparkpython
Spark比Hadoop運行程序快達100倍,在磁盤運行也快達10倍。得益於其內存計算架構和DAG任務調度機制,儘管底層一樣使用的是Hadoop的HDFS分佈式存儲系統。shell
在最頂層,Spark包含一個起始程序來運行用戶編寫的main 方法,而且執行多樣的並行操做在一個集羣上。apache
其中最主要的抽象就是Spark提供一個 RDD (彈性集羣數據集),是一些能夠經過節點訪問執行並行任務的集羣的集合。RDD 經過一個Hadoop文件系統中的文件被建立(或者其它支持Hadoop文件系統的系統)。用戶常常會讓Spark去保留一個RDD在內存裏。容許它在並 行操做時被高效重用。最後,Rdd將自動從節點錯誤中恢復。編程
第二個被抽象是,在並行操做時,變量將被共享。默認的,當Spark 在不一樣節點上並行運行任務集合中的一個方法,它爲任務中每一個方法的變量都保留一個備份。有時候,一些變量須要被跨任務共享,或者兩個任務間共享,或者給起 始程序共享。Spark提供兩種形式的變量:廣播變量,能夠在全部節點上緩存變量在內存裏。另外一個是儲蓄變量,只能被增長,例如數量,或者和。數組
本文展現這些功能,使用每一個Spark支持的語言,本文是python。若是你打開 Spark的交互shell,bin/spark-shell, 或者python的shell, bin/pyspark. 很容易學的。緩存
Spark 所使用的數據集RDDs 是一個能夠被並行操做的容錯數據集。有兩種方式去建立它:並行一個你本地存在的集合,或者引用一個額外的儲存系統。例如: 一個共享的文件系統,如HDFS、HBase, 或者任何提供Hadoop輸入格式的資源。架構
並行集合經過Spark 上下文 SparkContext
’s parallelize
方法被建立,基於你磁盤上一個存在的能夠迭代的集合。集合中的元素經過經過複製造成一個分佈式數據集,這個數據集能夠被並行使用。例如。下面是一個怎樣建立一個併發數據集用數字1-5:併發
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
一旦被建立。這個分佈式數據集能夠被併發使用。例如,咱們能夠 distData.reduce(lambda a, b: a + b) 去相加他們。
併發數據集的一個重要的參數是數據集被分割的子集數量。Spark 將爲每一個子集運行一個任務。表明性的,你想要沒給CPU運行2-4個子集。正常狀況下: Spark 將試圖設計你分區的數量分局你的集羣。可是,你依然能夠經過傳遞第二個參數來手動設置它。
parallelize (e.g. sc.parallelize(data, 10))
注意:你代碼中一些地方會用到 slices (子集的別名),爲了上下兼容性而存在的。
其它數據集合:
Pyspark 能夠從任何被hadoop 支持的存儲資源中建立分佈式數據集。包括本地文件系統,HDFS, Cassandra, HBase, Amazon S3 等。Sparks 支持文本文件,hadoop 輸入格式文件。
文本文件RDDs 能夠經過 SparkContext
’s textFile 方法建立這個方法使用一個URI 爲這個文件,不論是本地路徑,仍是
hdfs://
, s3n://
, URI。將它讀做行的集合。下面是一個例子:
distFile = sc.textFile("data.txt")
一旦被建立。文件能夠經過集合操做使用。例如,咱們能夠相加行的數目經過使用 map 和 reduce 方法以下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
在用Spark讀取文件時候的一些注意項:
若是使用一個本地文件系統,這個文件的權限必須被開放。能夠被複制或者被共享。
全部的Spark文件輸入方法,包括 textFile, 也支持運行目錄,壓縮文件,和通配符。例如,你可使用 textFile("/my/directory") ,textFile("/my/directory/*.txt") 和 textFile("/my/directory/*.gz");
textFile 方法一樣使用第二個參數去控制文件被分割的子集。默認,Spark爲這個文件的每一個Block (在 HDFS中是64M )建立一個子集,可是你能夠經過傳遞一個參數來設定更高的值。注意,這個值不能比默認 blocks分割的更小。
除了 textFile , Spark的Python API 提供了其它的數據格式:
SparkContext.wholeTextFiles
讓你讀一個包含不少小文件的目錄。而後返回以鍵值對的方式返回它們(文件名,內容)。這個方法相對 textFile ,後者返回文件的每一行。
RDD.saveAsPickleFile
and SparkContext.
pickleFile
支持以簡單python對象格式存儲一個RDD 。
SequenceFile and Hadoop Input/Output Formats
注意,這個功能目前在實驗階段。用於高級用戶。它或許被替代在未來。新的方式將採用基於 Spark SQL 讀寫。那時候,Spark SQL 將是最優先的方式。
寫支持:
PySpark SequenceFile 支持加載一個java 鍵值組成的RDD ,根據java 類型轉換成可寫的。當保存一個鍵值對組成的RDD 到 SequenceFile中, PySpark 進行這個轉換。它把Python對象轉成 Java對象,而後把它們轉變成可寫的。下面這些類型自動轉換:
數組不被處理,用戶須要去指定自定義的 ArrayWritable 子類型 當讀寫的時候。在寫的時候,用戶還須要指定自定義的轉換把 數組轉換成 自動以的 ArrayWritalbe 子類型。在讀的時候,默認的轉換將自定義的ArrayWritable 子類型轉成Java 對象。而後轉成Python元組。
保存和加載 SequenceFiles
和文本文件相似。SequenceFiles 經過指定路徑被保存和加載。key 和 value 能夠被分開。可是對於標準寫來說,不須要如此。
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) rdd.saveAsSequenceFile("path/to/file") sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
Rdd 操做:
Rdd 支持兩種形式的操做:
一、轉換,根據現有的數據集建立一個新的。
二、動做,在數據集上運行一些計算後返回值。例如,map 是一個轉換,經過一個方法將數據集的每一個元素返回到一個rdd結果中,另外一邊,reduce 是一個動做,經過方法將RDDd的元素合併成一個最終結果(reduceByKey 返回一個分佈式數據集)。
Spark中全部的轉換都是懶執行的,它們並不立刻計算它們的結果。代替的,它們記着這些用於基礎數據集的轉換,當一個動做要求一個結果被返回來,那麼才執行這些計算。這個設計可使Spark 運行更高效。例如。咱們能夠實現一個經過map建立的數據集被reduce 使用而後返回一個reduce結果,而不是一個大的map過的集合。
默認,每個被轉換的RDD或許被從新計算當你每次對它使用action時。可是,你能夠緩存 一個RDD在內存裏,經過使用 persist或者cache方法。此時,Spark將保存這個元素在集羣上,爲了你下次更快使用它。也支持緩存rdds 在磁盤上。或者複製在多節點上。
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行根據一個外部文件定義了一個基礎rdd。數據集不被加載到內存,除非被動做執行。 lines 僅僅是一個文件的指針,第二行定義一個 lineLength 做爲一個map 的轉換結果。一樣的,lineLength不是當即被計算。由於懶執行。最後,我麼能夠reduce, 它是一個動做。 此時,Spark 分割計算成不少任務。在各個機器上運行,每一個機器運行它本身部分的map和一個reduce 的結果。僅僅返回這個結果到起始程序中。
若是咱們以後須要屢次使用lineLength,咱們能夠緩存它:
lineLengths.persist()
在reduce 以前,lineLengths 將再第一次被計算出來後緩存到內存。
傳遞方法給Spark:
Spark 的API 特別支持傳遞方法給起始程序,而後在集羣上運行。有三種推薦方式:
Lambda表達式, 對於簡單的方法能夠寫成表達式,Lambda不支持多行方法或者不返回值的方法。
對於長一點的代碼。在方法內部定義。
做爲一個模塊。
例如,傳遞一個長一點的方法而不是使用lomba。看下面代碼:
"""MyScript.py""" def myFunc(s): words = s.split(" ") return len(words) if __name__ == "__main__": sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
注意,當一個對象實例方法容許傳入引用時(和單例對象相反)。須要傳遞這個class和方法一塊兒:例如:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
建立了一個新的MyClass, 而後調用doStuff。內部的map引用這個MyClass實例的func,因此整個對象須要被傳遞到集羣。
相似的,外部對象的可訪問領域,將引用真個對象。
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + x)
爲了防止出錯,最簡單的方式是複製 field到一個本地變量代替訪問它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + x)