East 2015 (Nov 26, 2014)html
Spark wins Daytona Gray Sort 100TB Benchmark (Nov 05, 2014)java
Archivepython
Download Sparkshell
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.apache
外文地址:數組
http://spark.apache.org/docs/latest/programming-guide.html緩存
Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing.併發
在最頂層,Spark包含一個起始程序來運行用戶編寫的main 方法,而且執行多樣的並行操做在一個集羣上。其中最主要的抽象就是Spark提供一個 RDD (彈性集羣數據集),是一些能夠經過節點訪問執行並行任務的集羣的集合。RDD 經過一個Hadoop文件系統中的文件被建立(或者其它支持Hadoop文件系統的系統)。用戶常常會讓Spark去保留一個RDD在內存裏。容許它在並行操做時被高效重用。最後,Rdd將自動從節點錯誤中恢復。分佈式
第二個被抽象是,在並行操做時,變量將被共享。默認的,當Spark 在不一樣節點上並行運行任務集合中的一個方法,它爲任務中每一個方法的變量都保留一個備份。有時候,一些變量須要被跨任務共享,或者兩個任務間共享,或者給起始程序共享。Spark提供兩種形式的變量:廣播變量,能夠在全部節點上緩存變量在內存裏。另外一個是儲蓄變量,只能被增長,例如數量,或者和。ide
本文展現這些功能,使用每一個Spark支持的語言,本文是python。若是你打開 Spark的交互shell,bin/spark-shell, 或者python的shell, bin/pyspark. 很容易學的、
Spark 所使用的數據集RDDs 是一個能夠被並行操做的容錯數據集。有兩種方式去建立它,1,並行一個你本地存在的集合。或者引用一個額外的儲存系統。例如: 一個共享的文件系統,HDFS, HBase, 或者任何提供Hadoop輸入格式的資源。
並行集合經過Spark 上下文 SparkContext
’s parallelize
方法被建立,基於你磁盤上一個存在的能夠迭代的集合。集合中的元素經過經過複製造成一個分佈式數據集,這個數據集能夠被並行使用。例如。下面是一個怎樣建立一個併發數據集用數字1-5:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
一旦被建立。這個分佈式數據集能夠被併發使用。例如,咱們能夠 distData.reduce(lambda a, b: a + b) 去相加他們。咱們稍後併發操做。
併發數據集的一個重要的參數是數據集被分割的子集數量。Spark 將爲每一個子集運行一個任務。表明性的,你想要沒給CPU運行2-4個子集。正常狀況下: Spark 將試圖設計你分區的數量分局你的集羣。可是,你依然能夠經過傳遞第二個參數來手動設置它。 parallelize
(e.g. sc.parallelize(data, 10)
).
注意:你代碼中一些地方會用到 slices (子集的別名),爲了上下兼容性而存在的。
其它數據集合:
Pyspark 能夠從任何被hadoop 支持的存儲資源中 建立分佈式數據集。包括本地文件系統,HDFS, Cassandra, HBase, Amazon S3 等。Sparks 支持文本文件,hadoop 輸入格式文件。
文本文件 RDDs 能夠經過 SparkContext
’s textFile 方法建立這個方法使用一個URI 爲這個文件,不論是本地路徑,仍是
hdfs://
, s3n://
, URI。將它讀做行的集合。下面是一個例子:
>>> distFile = sc.textFile("data.txt")
一旦被建立。文件能夠經過集合操做使用。例如,咱們能夠相加行的數目經過使用 map 和 reduce 方法以下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
.
在用Spark讀取文件時候的一些注意項:
若是使用一個本地文件系統,這個文件的權限必須被開放。能夠被複制或者被共享。
全部的Spark 文件輸入方法,包括 textFile, 也支持運行目錄,壓縮文件,和通配符。例如,你可使用 textFile("/my/directory") ,textFile("/my/directory/*.txt") 和 textFile("/my/directory/*.gz");
textFile 方法一樣使用第二個參數去控制文件被分割的子集。默認,Spark爲這個文件的每一個Block (在 HDFS中是64M )建立一個子集,可是你能夠經過傳遞一個參數來設定更高的值。注意,這個值不能比默認 blocks分割的 更小。
除了 textFile , Spark 的Python API 提供了其它的數據格式:
SparkContext.wholeTextFiles
讓你讀一個包含不少小文件的目錄。而後返回以鍵值對的方式返回它們(文件名,內容)。這個方法相對 textFile ,後者返回文件的每一行。
RDD.saveAsPickleFile
and SparkContext.
pickleFile
支持以簡單python對象格式存儲一個RDD 。
SequenceFile and Hadoop Input/Output Formats
注意,這個功能目前在實驗階段。用於高級用戶。它或許被替代在未來。新的方式將採用基於 Saprk SQL 讀寫。那時候,Spark SQL 將是最優先的方式。
寫支持:
PySpark SequenceFile 支持加載一個java 鍵值組成的RDD ,根據java 類型轉換成可寫的。當保存一個鍵值對組成的RDD 到 SequenceFile中, PySpark 進行這個轉換。它把Python對象轉成 Java對象,而後把它們轉變成可寫的。下面這些類型自動轉換:
數組不被處理,用戶須要去指定自定義的 ArrayWritable 子類型 當讀寫的時候。在寫的時候,用戶還須要指定自定義的轉換把 數組轉換成 自動以的 ArrayWritalbe 子類型。在讀的時候,默認的轉換將自定義的ArrayWritable 子類型轉成Java 對象。而後轉成Python元組。
保存和加載 SequenceFiles
和文本文件相似。SequenceFiles 經過指定路徑被保存和加載。key 和 value 能夠被分開。可是對於標準寫來說,不須要如此。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
Rdd 操做:
Rdd 支持兩種形式的操做,1轉換,根據現有的數據集建立一個新的。2動做,在數據集上運行一些計算後返回值。例如,map 是一個轉換,經過一個方法將數據集的每一個元素返回到一個rdd結果中,另外一邊,reduce 是一個動做,經過方法將RDDd的元素合併成一個最終結果(reduceByKey 返回一個分佈式數據集)。
Spark 中全部的轉換都是懶執行的。因此它們並不立刻計算它們的結果。代替的,它們記着這些用於基礎數據集的轉換,當一個動做要求一個結果被返回來,那麼才執行這些計算。這個設計可使Spark 運行更高效。例如。咱們能夠實現一個經過map建立的數據集被reduce 使用而後返回一個reduce結果,而不是一個大的map過的集合。
默認,每個被轉換的RDD或許被從新計算當你每次對它使用action時。可是,你能夠緩存一個RDD在內存裏,經過使用 persist或者cache,方法。此時,Spark將保存這個元素在集羣上,爲了你下次更快使用它。也支持緩存rdds 在磁盤上。或者複製在多節點上。
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行根據一個外部文件定義了一個基礎rdd。數據集不被加載到內存,除非被動做執行。lines 僅僅是一個文件的指針,第二行定義一個 lineLength 做爲一個map 的轉換結果。一樣的,lineLength 不是當即被計算。由於懶執行。最後,我麼能夠reduce, 它是一個動做。 此時,Spark 分割計算成不少任務。在各個機器上運行,每一個機器運行它本身部分的map和一個reduce 的結果。僅僅返回這個結果到起始程序中。
若是咱們以後須要屢次使用lineLength,咱們能夠緩存它:
lineLengths.persist()
在reduce 以前,lineLengths 將再第一次被計算出來後緩存到內存。
傳遞方法給Spark:
Spark 的API 特別支持傳遞方法給起始程序,而後在集羣上運行。有三種推薦方式:
Lambda表達式, 對於簡單的方法能夠寫成表達式,Lambda不支持多行方法 或者不返回值的方法。
對於長一點的代碼。在方法內部定義。
做爲一個模塊。
例如,傳遞一個長一點的方法而不是使用lomba。看下面代碼:
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
注意,當一個對象實例方法 容許傳入引用時(和單例對象相反)。須要傳遞這個class和方法一塊兒:例如:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
建立了一個新的 MyClass, 而後調用 doStuff。內部的map 引用這個MyClass 實例的 func ,因此整個對象須要被傳遞到 集羣。
相似的,外部對象的可訪問領域,將引用真個對象。
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + x)
爲了防止出錯,最簡單的方式是複製 field 到一個本地變量代替訪問它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + x)
待續!