本文翻譯自Spark(http://spark.apache.org)的官方文檔。因爲Spark更新較快,部分API已通過時,本文僅供參考,請以相應版本的官方文檔和運行時的提示爲準。java
從高層次上來看,每個Spark應用都包含一個驅動程序,用於執行用戶的main函數以及在集羣上運行各類並行操做。Spark提供的主要抽象是彈性分佈式數據集(RDD),這是一個包含諸多元素、被劃分到不一樣節點上進行並行處理的數據集合。RDD經過打開HDFS(或其餘hadoop支持的文件系統)上的一個文件、在驅動程序中打開一個已有的Scala集合或由其餘RDD轉換操做獲得。用戶能夠要求Spark將RDD持久化到內存中,這樣就能夠有效地在並行操做中複用。另外,在節點發生錯誤時RDD能夠自動恢復。python
Spark提供的另外一個抽象是能夠在並行操做中使用的共享變量。在默認狀況下,當Spark將一個函數轉化成許多任務在不一樣的節點上運行的時候,對於全部在函數中使用的變量,每個任務都會獲得一個副本。有時,某一個變量須要在任務之間或任務與驅動程序之間共享。Spark支持兩種共享變量:廣播變量,用來將一個值緩存到全部節點的內存中;累加器,只能用於累加,好比計數器和求和。程序員
這篇指南將展現這些特性在Spark支持的語言中是如何使用的(本文只翻譯了Python部分)。若是你打開了Spark的交互命令行——bin/spark-shell的Scala命令行或bin/pyspark的Python命令行均可以——那麼這篇文章你學習起來將是很容易的。web
Spark1.3.0只支持Python2.6或更高的版本(但不支持Python3)。它使用了標準的CPython解釋器,因此諸如NumPy一類的C庫也是可使用的。算法
經過Spark目錄下的bin/spark-submit腳本你能夠在Python中運行Spark應用。這個腳本會載入Spark的Java/Scala庫而後讓你將應用提交到集羣中。你能夠執行bin/pyspark來打開Python的交互命令行。shell
若是你但願訪問HDFS上的數據,你須要爲你使用的HDFS版本創建一個PySpark鏈接。常見的HDFS版本標籤都已經列在了這個第三方發行版頁面。apache
最後,你須要將一些Spark的類import到你的程序中。加入以下這行:數組
from pyspark import SparkContext, SparkConf
在一個Spark程序中要作的第一件事就是建立一個SparkContext對象來告訴Spark如何鏈接一個集羣。爲了建立SparkContext,你首先須要建立一個SparkConf對象,這個對象會包含你的應用的一些相關信息。緩存
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
appName參數是在集羣UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN集羣的URL,若是你在本地運行那麼這個參數應該是特殊的」local」字符串。在實際使用中,當你在集羣中運行你的程序,你通常不會把master參數寫死在代碼中,而是經過用spark-submit運行程序來得到這個參數。可是,在本地測試以及單元測試時,你仍須要自行傳入」local」來運行Spark程序。網絡
在PySpark命令行中,一個特殊的集成在解釋器裏的SparkContext變量已經創建好了,變量名叫作sc。建立你本身的SparkContext不會起做用。你能夠經過使用—master命令行參數來設置這個上下文鏈接的master主機,你也能夠經過—py-files參數傳遞一個用逗號隔開的列表來將Python的.zip、.egg或.py文件添加到運行時路徑中。你還能夠經過—package參數傳遞一個用逗號隔開的maven列表來給這個命令行會話添加依賴(好比Spark的包)。任何額外的包含依賴包的倉庫(好比SonaType)均可以經過傳給—repositorys參數來添加進去。Spark包的全部Python依賴(列在這個包的requirements.txt文件中)在必要時都必須經過pip手動安裝。
好比,使用四核來運行bin/pyspark應當輸入這個命令:
$ ./bin/pyspark --master local[4]
又好比,把code.py文件添加到搜索路徑中(爲了可以import在程序中),應當使用這條命令:
$ ./bin/pyspark --master local[4] --py-files code.py
想要了解命令行選項的完整信息請執行pyspark --help命令。在這些場景下,pyspark會觸發一個更通用的spark-submit腳本
在IPython這個增強的Python解釋器中運行PySpark也是可行的。PySpark能夠在1.0.0或更高版本的IPython上運行。爲了使用IPython,必須在運行bin/pyspark時將PYSPARK_DRIVER_PYTHON變量設置爲ipython,就像這樣:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
你還能夠經過設置PYSPARK_DRIVER_PYTHON_OPTS來自省定製ipython。好比,在運行IPython Notebook時開啓PyLab圖形支持應該使用這條命令:
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline" ./bin/pyspark
Spark是以RDD概念爲中心運行的。RDD是一個容錯的、能夠被並行操做的元素集合。建立一個RDD有兩個方法:在你的驅動程序中並行化一個已經存在的集合;從外部存儲系統中引用一個數據集,這個存儲系統能夠是一個共享文件系統,好比HDFS、HBase或任意提供了Hadoop輸入格式的數據來源。
並行化集合是經過在驅動程序中一個現有的迭代器或集合上調用SparkContext的parallelize方法創建的。爲了建立一個可以並行操做的分佈數據集,集合中的元素都會被拷貝。好比,如下語句建立了一個包含1到5的並行化集合:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
分佈數據集(distData)被創建起來以後,就能夠進行並行操做了。好比,咱們能夠調用disData.reduce(lambda a, b: a+b)來對元素進行疊加。在後文中咱們會描述分佈數據集上支持的操做。
並行集合的一個重要參數是將數據集劃分紅分片的數量。對每個分片,Spark會在集羣中運行一個對應的任務。 典型狀況下,集羣中的每個CPU將對應運行2-4個分片。通常狀況下,Spark會根據當前集羣的狀況自行設定分片數量。可是,你也能夠經過將第二個參 數傳遞給parallelize方法(好比sc.parallelize(data, 10))來手動肯定分片數量。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下兼容性。
PySpark能夠經過Hadoop支持的外部數據源(包括本地文件系統、HDFS、 Cassandra、HBase、 亞馬遜S3等等)創建分佈數據集。Spark支持文本文件、 序列文件以及其餘任何 Hadoop輸入格式文件。
經過文本文件建立RDD要使用SparkContext的textFile方法。這個方法會使用一個文件的URI(或本地文件路徑,hdfs://、s3n://這樣的URI等等)而後讀入這個文件創建一個文本行的集合。如下是一個例子:
>>> distFile = sc.textFile("data.txt")
創建完成後distFile上就能夠調用數據集操做了。好比,咱們能夠調用map和reduce操做來疊加全部文本行的長度,代碼以下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
在Spark中讀入文件時有幾點要注意:
若是使用了本地文件路徑時,要保證在worker節點上這個文件也可以經過這個路徑訪問。這點能夠經過將這個文件拷貝到全部worker上或者使用網絡掛載的共享文件系統來解決。 包括textFile在內的全部基於文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑做爲參數。好比,如下代碼都是合法的: textFile("/my/directory") textFile("/my/directory/*.txt") textFile("/my/directory/*.gz") textFile方法也能夠傳入第二個可選參數來控制文件的分片數量。默認狀況下,Spark會爲文件的每個塊(在HDFS中塊的大小默認是64MB) 建立一個分片。可是你也能夠經過傳入一個更大的值來要求Spark創建更多的分片。注意,分片的數量毫不能小於文件塊的數量。
除了文本文件以外,Spark的Python API還支持多種其餘數據格式:
SparkContext.wholeTextFiles可以讀入包含多個小文本文件的目錄,而後爲每個文件返回一個(文件名,內容)對。這是與textFile方法爲每個文本行返回一條記錄相對應的。 RDD.saveAsPickleFile和SparkContext.pickleFile支持將RDD以串行化的Python對象格式存儲起來。串行化的過程當中會以默認10個一批的數量批量處理。 序列文件和其餘Hadoop輸入輸出格式。
注意 這個特性目前仍處於試驗階段,被標記爲Experimental,目前只適用於高級用戶。這個特性在將來可能會被基於Spark SQL的讀寫支持所取代,由於Spark SQL是更好的方式。
PySpark序列文件支持利用Java做爲中介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,而後使用 Pyrolite將java結果對象串行化。當將一個鍵值對RDD儲存到一個序列文件中時PySpark將會運行上述過程的相反過程。首先將Python對象反串行化成Java對象,而後轉化成可寫類型。如下可寫類型會自動轉換:
| 可寫類型 | Python類型 | | ———————- | ————- | | Text | unicode str| | IntWritable | int | | FloatWritable | float | | DoubleWritable | float | | BooleanWritable | bool | | BytesWritable | bytearray | | NullWritable | None | | MapWritable | dict |
數組是不能自動轉換的。用戶須要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自定義的ArrayWritable子 類型轉化成Java的Object[],以後串行化成Python的元組。爲了得到Python的array.array類型來使用主要類型的數組,用戶 須要自行指定轉換器。
和文本文件相似,序列文件能夠經過指定路徑來保存與讀取。鍵值類型均可以自行指定,可是對於標準可寫類型能夠不指定。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
PySpark一樣支持寫入和讀出其餘Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。若是有必要,一個Hadoop配置能夠以Python字典的形式傳入。如下是一個例子,使用了Elasticsearch ESInputFormat:
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
注意,若是這個讀入格式僅僅依賴於一個Hadoop配置和/或輸入路徑,並且鍵值類型均可以根據前面的表格直接轉換,那麼剛纔提到的這種方法很是合適。
若是你有一些自定義的序列化二進制數據(好比從Cassandra/HBase中讀取數據),那麼你須要首先在Scala/Java端將這些數據轉化成能夠被Pyrolite的串行化器處理的數據類型。一個轉換器特質已經提供好了。簡單地拓展這個特質同時在convert方法中實現你本身的轉換代碼便可。記住,要確保這個類以及訪問你的輸入格式所需的依賴都被打到了Spark做業包中,而且確保這個包已經包含到了PySpark的classpath中。
這裏有一些經過自定義轉換器來使用Cassandra/HBase輸入輸出格式的Python樣例和轉換器樣例。
RDD支持兩類操做:轉化操做,用於從已有的數據集轉化產生新的數據集;啓動操做,用於在計算結束後向驅動程序返回結果。舉個例子,map是一個轉化操做,能夠將數據集中每個元素傳給一個函數,同時將計算結果做爲一個新的RDD返回。另外一方面,reduce操做是一個啓動操做,可以使用某些函數來彙集計算RDD中全部的元素,而且向驅動程序返回最終結果(同時還有一個並行的reduceByKey操做能夠返回一個分佈數據集)。
在Spark全部的轉化操做都是惰性求值的,就是說它們並不會馬上真的計算出結果。相反,它們僅僅是記錄下了轉換操做的操做對象(好比:一個文件)。只有當一個啓動操做被執行,要向驅動程序返回結果時,轉化操做纔會真的開始計算。這樣的設計使得Spark運行更加高效——好比,咱們會發覺由map操做產生的數據集將會在reduce操做中用到,以後僅僅是返回了reduce的最終的結果而不是map產生的龐大數據集。
在默認狀況下,每個由轉化操做獲得的RDD都會在每次執行啓動操做時從新計算生成。可是,你也能夠經過調用persist(或cache)方法來將RDD持久化到內存中,這樣Spark就能夠在下次使用這個數據集時快速得到。Spark一樣提供了對將RDD持久化到硬盤上或在多個節點間複製的支持。
爲了演示RDD的基本操做,請看如下的簡單程序:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行定義了一個由外部文件產生的基本RDD。這個數據集不是從內存中載入的也不是由其餘操做產生的;lines僅僅是一個指向文件的指針。第二行將lineLengths定義爲map操做的結果。再強調一次,因爲惰性求值的緣故,lineLengths並不會被當即計算獲得。最後,咱們運行了reduce操做,這是一個啓動操做。從這個操做開始,Spark將計算過程劃分紅許多任務並在多機上運行,每臺機器運行本身部分的map操做和reduce操做,最終將本身部分的運算結果返回給驅動程序。
若是咱們但願之後重複使用lineLengths,只需在reduce前加入下面這行代碼:
lineLengths.persist()
這條代碼將使得lineLengths在第一次計算生成以後保存在內存中。
Spark的API嚴重依賴於向驅動程序傳遞函數做爲參數。有三種推薦的方法來傳遞函數做爲參數。
Lambda表達式,簡單的函數能夠直接寫成一個lambda表達式(lambda表達式不支持多語句函數和無返回值的語句)。 對於代碼很長的函數,在Spark的函數調用中在本地用def定義。 模塊中的頂級函數。
好比,傳遞一個沒法轉化爲lambda表達式長函數,能夠像如下代碼這樣:
"MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
值得指出的是,也能夠傳遞類實例中方法的引用(與單例對象相反),這種傳遞方法會將整個對象傳遞過去。好比,考慮如下代碼:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
在這裏,若是咱們建立了一個新的MyClass對象,而後對它調用doStuff方法,map會用到這個對象中func方法的引用,因此整個對象都須要傳遞到集羣中。
還有另外一種類似的寫法,訪問外層對象的數據域會傳遞整個對象的引用:
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + x)
此類問題最簡單的避免方法就是,使用一個本地變量緩存一份這個數據域的拷貝,直接訪問這個數據域:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + x)
雖然大部分Spark的RDD操做都支持全部種類的對象,可是有少部分特殊的操做只能做用於鍵值對類型的RDD。這類操做中最多見的就是分佈的shuffle操做,好比將元素經過鍵來分組或彙集計算。
在Python中,這類操做通常都會使用Python內建的元組類型,好比(1, 2)。它們會先簡單地建立相似這樣的元組,而後調用你想要的操做。
好比,一下代碼對鍵值對調用了reduceByKey操做,來統計每一文本行在文本文件中出現的次數:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
咱們還可使用counts.sortByKey(),好比,當咱們想將這些鍵值對按照字母表順序排序,而後調用counts.collect()方法來將結果以對象列表的形式返回。
下面的表格列出了Spark支持的經常使用轉化操做。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。
轉化操做 | 做用 ————| —— map(func) | 返回一個新的分佈數據集,由原數據集元素經func處理後的結果組成 filter(func) | 返回一個新的數據集,由傳給func返回True的原數據集元素組成 flatMap(func) | 與map相似,可是每一個傳入元素可能有0或多個返回值,func能夠返回一個序列而不是一個值 mapParitions(func) | 相似map,可是RDD的每一個分片都會分開獨立運行,因此func的參數和返回值必須都是迭代器 mapParitionsWithIndex(func) | 相似mapParitions,可是func有兩個參數,第一個是分片的序號,第二個是迭代器。返回值仍是迭代器 sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,而後替換或不替換 union(otherDataset) | 返回新的數據集,包括原數據集和參數數據集的全部元素 intersection(otherDataset) | 返回新數據集,是兩個集的交集 distinct([numTasks]) | 返回新的集,包括原集中的不重複元素 groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的數據集 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每個Key的value進行彙集計算 sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個參數決定 join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回 (K, (V迭代器, W迭代器))RDD cartesian(otherDataset) | 用於T和U類型RDD時返回(T, U)對類型鍵值對RDD pipe(command, [envVars]) | 經過shell命令管道處理每一個RDD分片 coalesce(numPartitions) | 把RDD的分片數量下降到參數大小 repartition(numPartitions) | 從新打亂RDD中元素順序並從新分片,數量由參數決定 repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器從新分片,同時每一個分片內部按照鍵排序
下面的表格列出了Spark支持的部分經常使用啓動操做。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。 (譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔) 啓動操做 | 做用 ————| —— reduce(func) | 使用func進行彙集計算,func的參數是兩個,返回值一個,兩次func運行應當是徹底解耦的,這樣才能正確地並行運算 collect() | 向驅動程序返回數據集的元素組成的數組 count() | 返回數據集元素的數量 first() | 返回數據集的第一個元素 take(n) | 返回前n個元素組成的數組 takeSample(withReplacement, num, [seed]) | 返回一個由原數據集中任意num個元素的suzuki,而且替換之 takeOrder(n, [ordering]) | 返回排序後的前n個元素 saveAsTextFile(path) | 將數據集的元素寫成文本文件 saveAsSequenceFile(path) | 將數據集的元素寫成序列文件,這個API只能用於Java和Scala程序 saveAsObjectFile(path) | 將數據集的元素使用Java的序列化特性寫到文件中,這個API只能用於Java和Scala程序 countByCount() | 只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每一個key的出現次數 foreach(func) | 對數據集的每一個元素執行func, 一般用於完成一些帶有反作用的函數,好比更新累加器(見下文)或與外部存儲交互等
Spark的一個重要功能就是在將數據集持久化(或緩存)到內存中以便在多個操做 中重複使用。當咱們持久化一個RDD是,每個節點將這個RDD的每個分片計算並保存到內存中以便在下次對這個數據集(或者這個數據集衍生的數據集)的 計算中能夠複用。這使得接下來的計算過程速度可以加快(常常能加快超過十倍的速度)。緩存是加快迭代算法和快速交互過程速度的關鍵工具。
你能夠經過調用persist或cache方法來標記一個想要持久化的RDD。在第一次被計算產生以後,它就會始終停留在節點的內存中。Spark的緩存是具備容錯性的——若是RDD的任意一個分片丟失了,Spark就會依照這個RDD產生的轉化過程自動重算一遍。
另外,每個持久化的RDD都有一個可變的存儲級別,這個級別使得用戶能夠改變RDD持久化的儲存位置。好比,你能夠將數據集持久化到硬盤上,也能夠將它以序列化的Java對象形式(節省空間)持久化到內存中,還能夠將這個數據集在節點之間複製,或者使用Tachyon將它儲存到堆外。這些存儲級別都是經過向persist()傳遞一個StorageLevel對象(Scala, Java, Python)來設置的。存儲級別的全部種類請見下表:
注意:在Python中,儲存的對象永遠是經過Pickle庫序列化過的,因此設不設置序列化級別不會產生影響。
Spark還會在shuffle操做(好比reduceByKey)中自動儲存中間數據,即便用戶沒有調用persist。這是爲了防止在shuffle過程當中某個節點出錯而致使的全盤重算。不過若是用戶打算複用某些結果RDD,咱們仍然建議用戶對結果RDD手動調用persist,而不是依賴自動持久化機制。
應該選擇哪一個存儲級別? Spark的存儲級別是爲了提供內存使用與CPU效率之間的不一樣取捨平衡程度。咱們建議用戶經過考慮如下流程來選擇合適的存儲級別:
若是你的RDD很適合默認的級別(MEMORY_ONLY),那麼久使用默認級別吧。這是CPU最高效運行的選擇,可以讓RDD上的操做以最快速度運行。 不然,試試MEMORY_ONLY_SER選項而且選擇一個快的序列化庫來使對象的空間利用率更高,同時儘可能保證訪問速度足夠快。 不要往硬盤上持久化,除非重算數據集的過程代價確實很昂貴,或者這個過程過濾了巨量的數據。不然,從新計算分片有可能跟讀硬盤速度同樣快。 若是你但願快速的錯誤恢復(好比用Spark來處理web應用的請求),使用複製級別。全部的存儲級別都提供了重算丟失數據的完整容錯機制,可是複製一份副本能省去等待重算的時間。 在大內存或多應用的環境中,處於實驗中的OFF_HEAP模式有諸多優勢: 這個模式容許多個執行者共享Tachyon中的同一個內存池 這個模式顯著下降了垃圾回收的花銷。 在某一個執行者個體崩潰以後緩存的數據不會丟失。
Spark會自動監視每一個節點的緩存使用同時使用LRU算法丟棄舊數據分片。若是你想手動刪除某個RDD而不是等待它被自動刪除,調用RDD.unpersist()方法。
一般狀況下,當一個函數傳遞給一個在遠程集羣節點上運行的Spark操做(好比map和reduce)時,Spark會對涉及到的變量的全部副本執行這個函數。這些變量會被複制到每一個機器上,並且這個過程不會被反饋給驅動程序。一般狀況下,在任務之間讀寫共享變量是很低效的。可是,Spark仍然提供了有限的兩種共享變量類型用於常見的使用場景:廣播變量和累加器。
廣播變量容許程序員在每臺機器上保持一個只讀變量的緩存而不是將一個變量的拷貝傳遞給各個任務。它們能夠被使用,好比,給每個節點傳遞一份大輸入數據集的拷貝是很低效的。Spark試圖使用高效的廣播算法來分佈廣播變量,以此來下降通訊花銷。
能夠經過SparkContext.broadcast(v)來從變量v建立一個廣播變量。這個廣播變量是v的一個包裝,同時它的值能夠功過調用value方法來得到。如下的代碼展現了這一點:
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
在廣播變量被建立以後,在全部函數中都應當使用它來代替原來的變量v,這樣就能夠保證v在節點之間只被傳遞一次。另外,v變量在被廣播以後不該該再被修改了,這樣能夠確保每個節點上儲存的廣播變量的一致性(若是這個變量後來又被傳輸給一個新的節點)。
累加器是在一個相關過程當中只能被」累加」的變量,對這個變量的操做能夠有效地被並行化。它們能夠被用於實現計數器(就像在MapReduce過程當中)或求 和運算。Spark原生支持對數字類型的累加器,程序員也能夠爲其餘新的類型添加支持。累加器被以一個名字建立以後,會在Spark的UI中顯示出來。這 有助於瞭解計算的累進過程(注意:目前Python中不支持這個特性)。
能夠經過SparkContext.accumulator(v)來從變量v建立一個累加器。在集羣中運行的任務隨後可使用add方法或+=操做符(在Scala和Python中)來向這個累加器中累加值。可是,他們不能讀取累加器中的值。只有驅動程序能夠讀取累加器中的值,經過累加器的value方法。
如下的代碼展現了向一個累加器中累加數組元素的過程:
>>> accum = sc.accumulator(0) Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value 10
這段代碼利用了累加器對int類型的內建支持,程序員能夠經過繼承AccumulatorParam類來建立本身想要的類型支持。AccumulatorParam的接口提供了兩個方法:zero'用於爲你的數據類型提供零值;'addInPlace'用於計算兩個值得和。好比,假設咱們有一個Vector`類表示數學中的向量,咱們能夠這樣寫:
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
累加器的更新操做只會被運行一次,Spark提供了保證,每一個任務中對累加器的更新操做都只會被運行一次。好比,重啓一個任務不會再次更新累加器。在轉化過程當中,用戶應該留意每一個任務的更新操做在任務或做業從新運算時是否被執行了超過一次。
累加器不會該別Spark的惰性求值模型。若是累加器在對RDD的操做中被更新了,它們的值只會在啓動操做中做爲RDD計算過程當中的一部分被更新。因此,在一個懶惰的轉化操做中調用累加器的更新,並無法保證會被及時運行。下面的代碼段展現了這一點:
accum = sc.accumulator(0) data.map(lambda x => acc.add(x); f(x)) # Here, acc is still 0 because no actions have cause the `map` to be computed.
這個應用提交指南描述了一個應用被提交到集羣上的過程。簡而言之,只要你把你的應用打成了JAR包(Java/Scala應用)或.py文件的集合或.zip壓縮包(Python應用),bin/spark-submit腳本會將應用提交到任意支持的集羣管理器上。
Spark對單元測試是友好的,能夠與任何流行的單元測試框架相容。你只須要在測試中建立一個SparkContext,並如前文所述將master的URL設爲local,執行你的程序,最後調用SparkContext.stop()來終止運行。請確保你在finally塊或測試框架的tearDown方法中終止了上下文,由於Spark不支持兩個上下文在一個程序中同時運行。
Spark1.0凍結了1.X系列Spark的核心API。如今版本中沒有標註」experimental」或是」developer API」的API在將來的版本中仍會被支持。對Python用戶來講惟一的變化就是組管理操做,好比groupByKey, cogroup, join, 它們的返回值都從(鍵,值列表)對變成了(鍵, 值迭代器)對。
你還能夠閱讀Spark Streaming, MLlib和GraphX的遷移指南。
你能夠在Spark的網站上看到更多的Spark樣例程序。另外,在examples目錄下還有許多樣例代碼(Scala, Java, Python)。你能夠經過將類名稱傳給Spark的bin/run-example 腳原本運行Java和Scala語言樣例,舉例說明:
./bin/run-example SparkPi
對於Python例子,使用spark-submit腳本代替:
./bin/spark-submit examples/src/main/python/pi.py
爲了給你優化代碼提供幫助,配置指南和調優指南提供了關於最佳實踐的一些信息。確保你的數據儲存在以高效的格式儲存在內存中,這很重要。爲了給你部署應用提供幫助,集羣模式概覽描述了許多內容,包括分佈式操做和支持的集羣管理器。
最後,完整的API文檔在這裏。Scala版本 Java版本 Python版本