轉載自:http://blog.csdn.net/sdujava2011/article/details/46878153?utm_source=tuicoolhtml
英文地址:https://spark.apache.org/docs/latest/programming-guide.htmljava
Spark編程指南V1.4.0python
· 簡介程序員
· 接入Sparkweb
· Spark初始化算法
· 使用Shellshell
· 在集羣上部署代碼apache
· 彈性分佈式數據集編程
· 並行集合(Parallelized Collections)api
· 其餘數據集
· RDD的操做
· 基礎操做
· 向Spark傳遞函數
· 處理鍵值對
· 轉換
· 動做
· RDD的持久化
· 存儲級別的選擇
· 移除數據
· 共享變量
· 廣播變量
· 累加器
· 部署到一個集羣上
· 單元測試
· 從1.0以前版本的Spark遷移
· 下一步該怎麼作
簡介
總的來講,每個Spark的應用,都是由一個驅動程序(driver program)構成,它運行用戶的main函數,在一個集羣上執行各類各樣的並行操做。Spark提出的最主要抽象概念是彈性分佈式數據集 (resilientdistributed dataset,RDD),它是一個元素集合,劃分到集羣的各個節點上,能夠被並行操做。RDDs的建立能夠從HDFS(或者任意其餘支持Hadoop文件系統)上的一個文件開始,或者經過轉換驅動程序(driver program)中已存在的Scala集合而來。用戶也可讓Spark保留一個RDD在內存中,使其能在並行操做中被有效的重複使用。最後,RDD能自動從節點故障中恢復。
Spark的第二個抽象概念是共享變量(shared variables),能夠在並行操做中使用。在默認狀況下,Spark經過不一樣節點上的一系列任務來運行一個函數,它將每個函數中用到的變量的拷貝傳遞到每個任務中。有時候,一個變量須要在任務之間,或任務與驅動程序之間被共享。Spark支持兩種類型的共享變量:廣播變量,能夠在內存的全部的結點上緩存變量;累加器:只能用於作加法的變量,例如計數或求和。
本指南將用每一種Spark支持的語言來展現這些特性。這都是很容易來跟着作的若是你啓動了Spark的交互式Shell或者Scala的bin/spark-shell或者Python的bin/pyspark。
接入Spark
Scala
Spark1.2.1須要和Scala2.10一塊兒使用。若是你要用Scala來編寫應用,你須要用一個相應版本的Scala(例如2.10.X)。
要寫一個Spark應用程序,你須要在添加Spark的Maven依賴,Spark能夠經過Maven中心庫來得到:
除此以外,若是你想訪問一個HDFS集羣,你須要根據你的HDFS版本,添加一個hadoop-client的依賴。一些通用的HDFS版本標籤在第三方發行版頁面列出。
最後,你須要將一些Spark的類和隱式轉換導入到你的程序中。經過以下語句:
Java
Spark1.2.1須要運行在Java6及更高版本上。若是你正在使用Java8,Spark支持使用Lambda表達式簡潔地編寫函數,或者你可使用在org.apache.spark.api.java.function包中的類。
要使用Java編寫Spark應用程序,你須要添加一個Spark的依賴。Spark能夠經過Maven中心庫得到:
此外,若是你想訪問一個HDFS集羣,你須要根據你的HDFS版本,添加一個hadoop-client的依賴。一些通用的HDFS版本標籤在第三方發行版頁面列出。
最後,你須要將Spark的類導入到你的程序中。添加以下行:
Python
Spark1.2.1須要和Python2.6或者更高的版本(但不是Python3)一塊兒使用。它使用標準的CPython解釋器,所以像NumPy這類的C語言庫能夠用。要用Python的方式運行Spark應用程序,可使用在Spark目錄下的bin/spark-submit腳本。這個腳本會裝載Spark的Java和Scala庫並容許你將程序提交到集羣。你也可使用bin/pyspark來啓動一個交互式Python Shell。
若是你想要訪問HDFS數據,你須要根據你的HDFS版本使用一個PySpark的構建。一些通用的HDFS版本標籤在第三方發行版頁面列出。針對通用的HDFS版本的預先構建的包在Spark主頁上也是可得到。
最後,你須要導入一些Spark相關的類到你的程序中。添加以下的行:
from pyspark import SparkContext, SparkConf
初始化Spark
Scala
Spark程序須要作的第一件事情,就是建立一個SparkContext對象,它將告訴Spark如何訪問一個集羣。要建立一個SparkContext你首先須要創建一個SparkConf對象,這個對象包含你的程序的信息。
每一個JVM只能有一個活動的SparkContext。在建立一個新的SparkContext以前你必須stop()活動的SparkContext。
appName是你的應用的名稱,將會在集羣的Web監控UI中顯示。master參數,是一個用於指定所鏈接的Spark,Mesos or Mesos 集羣URL的字符串,也能夠是一個以下面所描述的用於在local模式運行的特殊字符串「local」。在實踐中,當運行在一個集羣上時,你不會想把master硬編碼到程序中,而是啓動spark-submit來接收它。然而,對於本地測試和單元測試,你能夠經過「local」模式運行Spark。
Java
Spark程序須要作的第一件事情,就是建立一個JavaSparkContext對象,它將告訴Spark如何訪問一個集羣。要建立一個SparkContext你首先須要創建一個SparkConf對象,這個對象包含你的程序的信息。
appName是你的應用的名稱,將會在集羣的Web監控UI中顯示。master參數,是一個用於指定所鏈接的Spark,Mesos or Mesos 集羣URL的字符串,也能夠是一個以下面所描述的用於在local模式運行的特殊字符串「local」。在實踐中,當運行在一個集羣上時,你不會想把master硬編碼到程序中,而是啓動spark-submit來接收它。然而,對於本地測試和單元測試,你能夠經過「local」模式運行Spark。
Python
Spark程序須要作的第一件事情,就是建立一個JavaSparkContext對象,它將告訴Spark如何訪問一個集羣。要建立一個SparkContext你首先須要創建一個SparkConf對象,這個對象包含你的程序的信息。
appName是你的應用的名稱,將會在集羣的Web監控UI中顯示。master參數,是一個用於指定所鏈接的Spark,Mesos or Mesos 集羣URL的字符串,也能夠是一個以下面所描述的用於在local模式運行的特殊字符串「local」。在實踐中,當運行在一個集羣上時,你不會想把master硬編碼到程序中,而是啓動spark-submit來接收它。然而,對於本地測試和單元測試,你能夠經過「local」模式運行Spark。
使用Shell
Scala
在Spark shell中,一個特殊的解釋器感知的SparkContext已經爲你建立好了,變量名叫作sc。建立本身的SparkContext將不會生效。你可使用-master參數設置context鏈接到那個master,而且你可使用-jars參數把用逗號分隔的一個jar包列表添加到classpath中。例如,若是在四核CPU上運行spark-shell,使用:
或者,同時在classpath中加入code.jar,使用:
想要得到完整的選項列表,運行spark-shell –help。在背後,spark-shell調用更通常的spark-submit腳本。
Python
在PySpark shell中,一個特殊的解釋器感知的SparkContext已經爲你建立好了,變量名叫作sc。建立本身的SparkContext將不會生效。你可使用-master參數設置context鏈接到那個master,而且你可使用—py-files參數把用逗號分隔的一個Python .zip,.egg或者.py文件列表添加到classpath中。例如,若是在四核CPU上運行bin/pyspark,使用:
或者,同時將code.py添加到搜索路徑中(爲了之後使用import code),使用:
想要得到完整的選項列表,運行pyspark –help。在背後,pyspark調用更通常的spark-submit腳本。
也能夠在IPython中啓動Pyspark shell,一個加強的Python解釋器。PySpark要使用IPython1.0.0及其以後的版本。要使用IPython,當運行bin/pyspark時要設置PYSPARK_DRIVER_PYTHON變量爲ipython:
你能夠經過設置PYSPARK_DRIVER_PYTHON_OPTS參數來自定義ipython命令。例如,啓動有PyLab支持的IPython Notebook支持:
彈性分佈式數據集(RDDs)
Spark圍繞的概念是彈性分佈式數據集(RDD),是一個有容錯機制並能夠被並行操做的元素集合。目前有兩種建立RDDs的方法:並行化一個在你的驅動程序中已經存在的集合,或者引用在外部存儲系統上的數據集,例如共享文件系統,HDFS,HBase,或者任何以Hadoop輸入格式提供的數據源。
並行集合
Scala
並行集合是經過調用SparkContext的parallelize方法,在一個已經存在的集合上建立的(一個Scala Seq對象)。集合的對象將會被拷貝,建立出一個能夠被並行操做的分佈式數據集。例如,下面展現了怎樣建立一個含有數字1到5的並行集合:
一旦建立了分佈式數據集(distData),就能夠對其執行並行操做。例如,咱們能夠調用distData.reduce((a,b)=>a+b)來累加數組的元素。後續咱們會進一步地描述對分佈式數據集的操做。
並行集合的一個重要參數是分區數(the number of partitions),表示數據集切分的份數。Spark將在集羣上爲每一個分區數據起一個任務。典型狀況下,你但願集羣的每一個CPU分佈2-4個分區(partitions)。一般,Spark會嘗試基於集羣情況自動設置分區數。然而,你也能夠進行手動設置,經過將分區數做爲第二個參數傳遞給parallelize方法來實現。(例如:sc.parallelize(data,10))。注意:代碼中的一些地方使用屬於「分片(分區的近義詞)」來保持向後兼容。
Java
並行集合是經過對存在於驅動程序中的集合調用JavaSparkContext的parallelize方法來構建的。構建時會拷貝集合中的元素,建立一個能夠被並行操做的分佈式數據集。例如,這裏演示瞭如何建立一個包含數字1到5的並行集合:
一旦建立了分佈式數據集(distData),就能夠對其執行並行操做。例如,咱們能夠調用distData.reduce((a,b)=>a+b)來累加數組的元素。後續咱們會進一步地描述對分佈式數據集的操做。
注意:在本指南中,咱們會常用簡潔地Java8的lambda語法來指明Java函數,而在Java的舊版本中,你能夠實現org.apache.spark.api.java.function包中的接口。下面咱們將在把函數傳遞到Spark中描述更多的細節。
並行集合的一個重要參數是分區數(the number of partitions),表示數據集切分的份數。Spark將在集羣上爲每一個分區數據起一個任務。典型狀況下,你但願集羣的每一個CPU分佈2-4個分區(partitions)。一般,Spark會嘗試基於集羣情況自動設置分區數。然而,你也能夠進行手動設置,經過將分區數做爲第二個參數傳遞給parallelize方法來實現。(例如:sc.parallelize(data,10))。注意:代碼中的一些地方使用屬於「分片(分區的近義詞)」來保持向後兼容。
Python
並行集合是經過對存在於驅動程序中的迭代器(iterable)或集合(collection),調用SparkContext的parallelize方法來構建的。構建時會拷貝迭代器或集合中的元素,建立一個能夠被並行操做的分佈式數據集。例如,這裏演示瞭如何建立一個包含數字1到5的並行集合:
一旦建立了分佈式數據集(distData),就能夠對其執行並行操做。例如,咱們能夠調用distData.reduce(lambda a,b:a+b)來累加列表的元素。後續咱們會進一步地描述對分佈式數據集的操做。
並行集合的一個重要參數是分區數(the number of partitions),表示數據集切分的份數。Spark將在集羣上爲每一個分區數據起一個任務。典型狀況下,你但願集羣的每一個CPU分佈2-4個分區(partitions)。一般,Spark會嘗試基於集羣情況自動設置分區數。然而,你也能夠進行手動設置,經過將分區數做爲第二個參數傳遞給parallelize方法來實現。(例如:sc.parallelize(data,10))。注意:代碼中的一些地方使用屬於「分片(分區的近義詞)」來保持向後兼容。
外部數據集
Scala
Spark能夠從Hadoop支持的任何存儲源中構建出分佈式數據集,包括你的本地文件系統,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其餘任何一種Hadoop InputFormat。
Text file RDDs的建立可使用SparkContext的textFile方法。該方法接受一個文件的URI地址(或者是機器上的一個本地路徑,或者是一個hdfs://,s3n://等URI)做爲參數,並讀取文件的每一行數據,放入集合中,下面是一個調用例子:
一旦建立完成,就能夠在distFile上執行數據集操做。例如,要相對全部行的長度進行求和,咱們能夠經過以下的map和reduce操做來完成:
distFile.map(s => s.length).reduce((a, b)=> a + b)
Spark讀文件時的一些注意事項:
1. 若是文件使用本地文件系統上的路徑,那麼該文件必須在工做節點的相同路徑下也能夠訪問。能夠將文件拷貝到全部的worker節點上,或者使用network-mounted共享文件系統。
2. Spark的全部基於文件的輸入方法,包括textFile,支持在目錄上運行,壓縮文件和通配符。例如,你可使用textFile(」/my/directory」),textFile(「/my/directory/*.txt」),和textFile(「/my/directory/*.gz」)。
3. textFile方法也帶有可選的第二個參數,用於控制文件的分區數。默認狀況下,Spark會爲文件的每個block建立一個分區,可是你也能夠經過傳入更大的值,來設置更高的分區數。注意,你設置的分區數不能比文件的塊數小。
除了text文件,Spark的Scala API也支持其餘幾種數據格式:
1. SparkContext.wholeTextFiles可讓你讀取包含多個小text文件的目錄,而且每一個文件對應返回一個(filename,content)對。而對應的textFile方法,文件的每一行對應返回一條記錄(record)。
2. 對於Sequence文件,使用SparkContext的sequenceFile[K,V]方法,其中K和V分別對應文件中key和values的類型。這些類型必須是Hadoop的Writable接口的子類,如IntWritable和Text。另外,Spark容許你使用一些常見的Writables的原生類型;例如,sequenceFile[Int,String]會自動的轉換爲類型IntWritables和Texts。
3. 對於其餘的Hadoop InputFormats,你可使用SparkContext.hadoopRDD方法,它能夠接受一個任意類型的JobConf和輸入格式類,key類和value類。像Hadoop Job設置輸入源那樣去設置這些參數便可。對基於「新」的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也可使用SparkContex.newHadoopRDD。
4. RDD.saveAsObjectFile和SparkContext.objectFile支持由序列化的Java對象組成的簡單格式來保存RDD。雖然這不是一種像Avro那樣有效的序列化格式,可是她提供了一種能夠存儲任何RDD的簡單方式。
Java
Spark能夠從Hadoop支持的任何存儲源中構建出分佈式數據集,包括你的本地文件系統,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其餘任何一種Hadoop InputFormat。
Text file RDDs的建立可使用SparkContext的textFile方法。該方法接受一個文件的URI地址(或者是機器上的一個本地路徑,或者是一個hdfs://,s3n://等URI)做爲參數,並讀取文件的每一行數據,放入集合中,下面是一個調用例子:
JavaRDD<String> distFile =sc.textFile("data.txt");
一旦建立完成,就能夠在distFile上執行數據集操做。例如,要相對全部行的長度進行求和,咱們能夠經過以下的map和reduce操做來完成:
distFile.map(s -> s.length()).reduce((a, b)-> a + b)
Spark讀文件時的一些注意事項:
1. 若是文件使用本地文件系統上的路徑,那麼該文件必須在工做節點的相同路徑下也能夠訪問。能夠將文件拷貝到全部的worker節點上,或者使用network-mounted共享文件系統。
2. Spark的全部基於文件的輸入方法,包括textFile,支持在目錄上運行,壓縮文件和通配符。例如,你可使用textFile(」/my/directory」),textFile(「/my/directory/*.txt」),和textFile(「/my/directory/*.gz」)。
3. textFile方法也帶有可選的第二個參數,用於控制文件的分區數。默認狀況下,Spark會爲文件的每個block建立一個分區,可是你也能夠經過傳入更大的值,來設置更高的分區數。注意,你設置的分區數不能比文件的塊數小。
除了text文件,Spark的Java API也支持其餘幾種數據格式:
1. JavaSparkContext.wholeTextFiles可讓你讀取包含多個小text文件的目錄,而且每一個文件對應返回一個(filename,content)對。而對應的textFile方法,文件的每一行對應返回一條記錄(record)。
2. 對於Sequence文件,使用SparkContext的sequenceFile[K,V]方法,其中K和V分別對應文件中key和values的類型。這些類型必須是Hadoop的Writable接口的子類,如IntWritable和Text。另外,Spark容許你使用一些常見的Writables的原生類型;例如,sequenceFile[Int,String]會自動的轉換爲類型IntWritables和Texts。
3. 對於其餘的Hadoop InputFormats,你可使用JavaSparkContext.hadoopRDD方法,它能夠接受一個任意類型的JobConf和輸入格式類,key類和value類。像Hadoop Job設置輸入源那樣去設置這些參數便可。對基於「新」的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也可使用JavaSparkContex.newHadoopRDD。
4. JavaRDD.saveAsObjectFile和JavaSparkContext.objectFile支持由序列化的Java對象組成的簡單格式來保存RDD。雖然這不是一種像Avro那樣有效的序列化格式,可是她提供了一種能夠存儲任何RDD的簡單方式。
Python
PySpark能夠從Hadoop支持的任何存儲源中構建出分佈式數據集,包括你的本地文件系統,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其餘任何一種Hadoop InputFormat。
Text file RDDs的建立可使用SparkContext的textFile方法。該方法接受一個文件的URI地址(或者是機器上的一個本地路徑,或者是一個hdfs://,s3n://等URI)做爲參數,並讀取文件的每一行數據,放入集合中,下面是一個調用例子:
>>> distFile =sc.textFile("data.txt")
一旦建立完成,就能夠在distFile上執行數據集操做。例如,要相對全部行的長度進行求和,咱們能夠經過以下的map和reduce操做來完成:
distFile.map(lambda s: len(s)).reduce(lambda a,b: a + b)
Spark讀文件時的一些注意事項:
1. 若是文件使用本地文件系統上的路徑,那麼該文件必須在工做節點的相同路徑下也能夠訪問。能夠將文件拷貝到全部的worker節點上,或者使用network-mounted共享文件系統。
2. Spark的全部基於文件的輸入方法,包括textFile,支持在目錄上運行,壓縮文件和通配符。例如,你可使用textFile(」/my/directory」),textFile(「/my/directory/*.txt」),和textFile(「/my/directory/*.gz」)。
3. textFile方法也帶有可選的第二個參數,用於控制文件的分區數。默認狀況下,Spark會爲文件的每個block建立一個分區,可是你也能夠經過傳入更大的值,來設置更高的分區數。注意,你設置的分區數不能比文件的塊數小。
除了text文件,Spark的Python API也支持其餘幾種數據格式:
1. JavaSparkContext.wholeTextFiles可讓你讀取包含多個小text文件的目錄,而且每一個文件對應返回一個(filename,content)對。而對應的textFile方法,文件的每一行對應返回一條記錄(record)。
2. RDD.saveAsPickleFile和SparkContext.pickleFile支持由pickled Python對象組成的簡單格式保存RDD。使用批量的方式處理pickle模塊的對象序列化,默認批處理大小爲10.
3. SequenceFile和Hadoop輸入/輸出格式
注意,此功能當前標識爲試驗性的,是爲高級用戶而提供的。在未來的版本中,可能會由於支持基於SparkSQL的讀寫而被取代,在這種狀況下,SparkSQL是首選的方法。
Writable支持
PySpark的SequenceFile支持加載Java中的鍵值(key-value)對RDD,能夠將Writable轉換爲基本的Java類型,而且經過Pyrolite,在結果Java對象上執行pickles序列化操做。當將一個鍵值對的RDD保存爲SequenceFIle時,PySpark會對其進行反操做。它會unpickles Python的對象爲Java對象,而後再將它們轉換爲Writables。下表中的Writables會被自動地轉換:
Writable Type |
Python Type |
Text |
unicode str |
IntWritable |
int |
FloatWritable |
float |
DoubleWritable |
float |
BooleanWritable |
bool |
BytesWritable |
bytearray |
NullWritable |
None |
MapWritable |
dict |
數組不支持開箱(out-of-the-box)處理。當讀或寫數組時,用戶須要指定自定義的ArrayWritable子類。當寫數組時,用戶也須要指定自定義的轉換器(converters),將數組轉換爲自定義的ArrayWritable子類。當讀數組時,默認的轉換器會將自定義的ArrayWritable子類轉換爲Java的Object[],而後被pickled成Python的元組。若是要獲取包含基本數據類型的數組,Python的array.array的話,用戶須要爲該數組指定自定義的轉換器。
保存和加載SequenFiles
相似於text files,SequenceFiles能夠被保存和加載到指定的路徑下。能夠指定key和value的類型,但對標準的Writables類型則不須要指定。
保存和加載其餘的Hadoop輸入/輸出格式
PySpark也能夠讀任何Hadoop InputFormat或者寫任何Hadoop OutputFormat,包括「新」和「舊」兩個Hadoop MapReduce APIs。若是須要的話,能夠將傳遞進來的一個Hadoop配置當成一個Python字典。這裏有一個使用了Elasticsearch ESInputFormat的樣例:
注意,若是這個InputFormat只是簡單地依賴於Hadoop配置和/或輸入路徑,以及key和value的類型,它就能夠很容易地根據上面的表格進行轉換,那麼這種方法應該能夠很好地處理這些狀況。
若是你有一個定製序列化的二進制數據(好比加載自Cassandra/HBase的數據),那麼你首先要作的,是在Scala/Java側將數據轉換爲能夠用Pyrolite的pickler處理的東西。Converter特質提供了這一轉換功能。簡單地extend該特質,而後在convert方法中實現你本身的轉換代碼。記住要確保該類,以及訪問你的InputFormat所需的依賴,都須要被打包到你的Spark做業的jar包,而且包含在PySpark的類路徑中。
在Python樣例和Converter樣例上給出了帶自定義轉換器的Cassandra/HBase的InputFormat和OutputFormat的使用樣例。
RDD操做
RDDs支持兩種操做:轉換(transformations),能夠從已有的數據集建立一個新的數據集;而動做(actions),在數據集上運行計算後,會向驅動程序返回一個值。例如,map就是一種轉換,它將數據集每個元素都傳遞給函數,並返回一個新的分佈數據集來表示結果。另外一方面,reduce是一種動做,經過一些函數將全部的元素聚合起來,並將最終結果返回給驅動程序(不過還有一個並行的reduceByKey,能返回一個分佈式數據集)。
Spark中的全部轉換都是惰性的,也就是說,它們並不會立刻計算結果。相反的,它們只是記住應用到基礎數據集(例如一個文件)上的這些轉換動做。只有當發生一個要求返回結果給驅動程序的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。例如,咱們對map操做建立的數據集進行reduce操做時,只會向驅動返回reduce操做的結果,而不是返回更大的map操做建立的數據集。
默認狀況下,每個轉換過的RDD都會在你對它執行一個動做時被從新計算。不過,你也可使用持久化或者緩存方法,把一個RDD持久化到內存中。在這種狀況下,Spark會在集羣中保存相關元素,以便你下次查詢這個RDD時,能更快速地訪問。對於把RDDs持久化到磁盤上,或在集羣中複製到多個節點也是支持的。
基礎操做
Scala
爲了描述RDD的基礎操做,能夠考慮下面的簡單程序:
第一行經過一個外部文件定義了一個基本的RDD。這個數據集未被加載到內存,也未在上面執行操做:lines僅僅指向這個文件。第二行定義了lineLengths做爲map轉換結果。此外,因爲惰性,不會當即計算lineLengths。最後,咱們運行reduce,這是一個動做。這時候,Spark纔會將這個計算拆分紅不一樣的task,並運行在獨立的機器上,而且每臺機器運行它本身的map部分和本地的reducatin,僅僅返回它的結果給驅動程序。
若是咱們但願之後能夠複用lineLengths,能夠添加:
lineLengths.persist()
在reduce以前,這將致使lineLengths在第一次被計算以後,被保存在內存中。
Java
爲了描述RDD的基礎操做,能夠考慮下面的簡單程序:
第一行經過一個外部文件定義了一個基本的RDD。這個數據集未被加載到內存,也未在上面執行操做:lines僅僅指向這個文件。第二行定義了lineLengths做爲map轉換結果。此外,因爲惰性,不會當即計算lineLengths。最後,咱們運行reduce,這是一個動做。這時候,Spark纔會將這個計算拆分紅不一樣的task,並運行在獨立的機器上,而且每臺機器運行它本身的map部分和本地的reducatin,僅僅返回它的結果給驅動程序。
若是咱們但願之後能夠複用lineLengths,能夠添加:
lineLengths.persist();
在reduce以前,這將致使lineLengths在第一次被計算以後,被保存在內存中。
Python
爲了描述RDD的基礎操做,能夠考慮下面的簡單程序:
第一行經過一個外部文件定義了一個基本的RDD。這個數據集未被加載到內存,也未在上面執行操做:lines僅僅指向這個文件。第二行定義了lineLengths做爲map轉換結果。此外,因爲惰性,不會當即計算lineLengths。最後,咱們運行reduce,這是一個動做。這時候,Spark纔會將這個計算拆分紅不一樣的task,並運行在獨立的機器上,而且每臺機器運行它本身的map部分和本地的reducatin,僅僅返回它的結果給驅動程序。
若是咱們但願之後能夠複用lineLengths,能夠添加:
lineLengths.persist()
在reduce以前,這將致使lineLengths在第一次被計算以後,被保存在內存中。
把函數傳遞到Spark
Scala
Spark的API,在很大程度上依賴於把驅動程序中的函數傳遞到集羣上運行。這有兩種推薦的實現方式:
●使用匿名函數的語法,這能夠用來替換簡短的代碼。
●使用全局單例對象的靜態方法。好比,你能夠定義函數對象objectMyFunctions,而後傳遞該對象的方法MyFunction.func1,以下所示:
注意:因爲可能傳遞的是一個類實例方法的引用(而不是一個單例對象),在傳遞方法的時候,應該同時傳遞包含該方法的對象。好比,考慮:
這裏,若是咱們建立了一個類實例new MyClass,而且調用了實例的doStuff方法,該方法中的map處調用了這個MyClass實例的func1方法,因此須要將整個對象傳遞到集羣中。相似於寫成:rdd.map(x=>this.func1(x))。
相似地,訪問外部對象的字段時將引用整個對象:
等同於寫成rdd.map(x=>this.field+x),引用了整個this。爲了不這種問題,最簡單的方式是把field拷貝到本地變量,而不是去外部訪問它:
Java
Spark的API,在很大程度上依賴於把驅動程序中的函數傳遞到集羣上運行。在Java中,函數由那些實現了org.apache.spark.api.java.function包中的接口的類表示。有兩種建立這樣的函數的方式:
●在你本身的類中實現Function接口,能夠是匿名內部類,或者命名類,而且傳遞類的一個實例到Spark。
●在Java8中,使用lambda表達式來簡明地定義函數的實現。
爲了保持簡潔性,本指南中大量使用了lambda語法,這在長格式中很容易使用全部相同的APIs。好比,咱們能夠把上面的代碼寫成:
或者,若是不方便編寫內聯函數的話,能夠寫成:
注意,Java中的匿名內部類也能夠訪問封閉域中的變量,只要這些變量標識爲final便可。Spark會像處理其餘語言同樣,將這些變量拷貝到每一個工做節點上。
Python
Spark的API,在很大程度上依賴於把驅動程序中的函數傳遞到集羣上運行。有三種推薦方法可使用:
●使用Lambda表達式來編寫能夠寫成一個表達式的簡單函數(Lambdas不支持沒有返回值的多語句函數或表達式)。
●Spark調用的函數中的Local defs,能夠用來代替更長的代碼。
●模塊中的頂級函數。
例如,若是想傳遞一個支持使用lambda表達式的更長的函數,能夠考慮如下代碼:
注意:因爲可能傳遞的是一個類實例方法的引用(而不是一個單例對象(singleton object)),在傳遞方法的時候,應該同時傳遞包含該方法的對象。好比,考慮:
這裏,若是咱們建立了一個類實例new MyClass,而且調用了實例的doStuff方法,該方法中的map處調用了這個MyClass實例的func1方法,因此須要將整個對象傳遞到集羣中。
相似地,訪問外部對象的字段時將引用整個對象:
爲了不這種問題,最簡單的方式是把field拷貝到本地變量,而不是去外部訪問它:
理解閉包
關於Spark的一個更困難的問題是理解當在一個集羣上執行代碼的時候,變量和方法的範圍以及生命週期。修改範圍以外變量的RDD操做常常是形成混亂的源頭。在下面的例子中咱們看一下使用foreach()來增長一個計數器的代碼,不過一樣的問題也可能有其餘的操做引發。
例子
考慮下面的單純的RDD元素求和,根據是否運行在一個虛擬機上,它們的行爲徹底不一樣。一個日常的例子是在local模式(--master=local[n])下運行Spark對比將Spark程序部署到一個集羣上(例如經過spark-submit提交到YARN)。
Scala
Java
Python
本地模式VS集羣模式
主要的挑戰是,上述代碼的行爲是未定義的。在使用單個JVM的本地模式中,上面的代碼會在RDD中計算值的總和並把它存儲到計數器中。這是由於RDD和計數器變量在驅動節點的同一個內存空間中。
然而,在集羣模式下,發生的事情更爲複雜,上面的代碼可能不會按照目的工做。要執行做業,Spark將RDD操做分紅任務——每一個任務由一個執行器操做。在執行前,Spark計算閉包。閉包是指執行器要在RDD上進行計算時必須對執行節點可見的那些變量和方法(在這裏是foreach())。這個閉包被序列化併發送到每個執行器。在local模式下,只有一個執行器所以全部東西都分享同一個閉包。然而在其餘的模式中,就不是這個狀況了,運行在不一樣工做節點上的執行器有它們本身的閉包的一份拷貝。
這裏發生的事情是閉包中的變量被髮送到每一個執行器都是被拷貝的,所以,當計數器在foreach函數中引用時,它再也不是驅動節點上的那個計數器了。在驅動節點的內存中仍然有一個計數器,但它對執行器來講再也不是可見的了!執行器只能看到序列化閉包中的拷貝。所以,計數器最終的值仍然是0,由於全部在計數器上的操做都是引用的序列化閉包中的值。
在這種狀況下要確保一個良好定義的行爲,應該使用累加器。Spark中的累加器是一個專門用來在執行被分散到一個集羣中的各個工做節點上的狀況下安全更新變量的機制。本指南中的累加器部分會作詳細討論。
通常來講,閉包-構造像循環或者本地定義的方法,不該該用來改變一些全局狀態。Spark沒有定義或者是保證改變在閉包以外引用的對象的行爲。一些這樣作的代碼可能會在local模式下起做用,但那僅僅是個偶然,這樣的代碼在分佈式模式下是不會按照指望工做的。若是須要一些全局的參數,可使用累加器。
打印RDD中的元素
另外一個常見的用法是使用rdd.foreach(println)方法或者rdd.map(println)方法試圖打印出RDD中的元素。在一臺單一的機器上,這樣會產生指望的輸出並打印出RDD中的元素。然而,在集羣模式中,被執行器調用輸出到stdout的輸出如今被寫到了執行器的stdout,並非在驅動上的這一個,所以驅動上的stdout不會顯示這些信息!要在驅動上打印全部的元素,可使用collect()方法首先把RDD取回到驅動節點如:rdd.collect().foreach(println)。然而,這可能致使驅動內存溢出,由於collect()將整個RDD拿到了單臺機器上;若是你只須要打印不多幾個RDD的元素,一個更安全的方法是使用take()方法:rdd.take(100).foreach(println)。
鍵值對的使用
Scala
雖然,在包含任意類型的對象的RDDs中,可使用大部分的Spark操做,但也有一些特殊的操做只能在鍵值對的RDDs上使用。最多見的一個就是分佈式的洗牌(shuffle)操做,諸如基於key值對元素進行分組或聚合的操做。
在Scala中,包含二元組(Tuple2)對象(能夠經過簡單地(a,b)代碼,來構建內置於語言中的元組的RDDs支持這些操做),只要你在程序中導入了org.apache.spark.SparkContext._,就能進行隱式轉換。PairRDDFunction類支持鍵值對的操做,若是你導入了隱式轉換,該類型就能自動地對元組RDD的元素進行轉換。
好比,下列代碼在鍵值對上使用了reduceByKey操做,來計算在一個文件中每行文本出現的總次數:
咱們也可使用counts.sortByKey(),好比,將鍵值對以字典序進行排序。最後使用counts.collect()轉換成對象的數組形式,返回給驅動程序。
注意:在鍵值對操做中,若是使用了自定義對象做爲建,你必須確保該對象實現了自定義的equals()和對應的hashCode()方法。更多詳情請查看Object.hashCode()文檔大綱中列出的規定。
Java
雖然,在包含任意類型的對象的RDDs中,可使用大部分的Spark操做,但也有一些特殊的操做只能在鍵值對的RDDs上使用。最多見的一個就是分佈式的洗牌(shuffle)操做,諸如基於key值對元素進行分組或聚合的操做。
在java中,可使用Scala標準庫中的scala.Tuple2類來表示鍵值對,你能夠簡單地調用new Tuple2(a,b)來建立一個元組,而後使用tuple._1()和tuple._2()方法來訪問元組的字段。
使用JavaPairRDD來表示鍵值對RDDs。你可使用指定版本的map操做,從JavaRDDs構建JavaPairRDDs,好比mapToPair和flatMapToPair。JavaPairRDD支持標準的RDD函數,也支持特殊的鍵值函數。
例如,下面的代碼在鍵值(key-value)對上使用 reduceByKey操做來計算在一個文件中每行文本出現的總次數:
咱們也可使用 counts.sortByKey(),例如,將鍵值對以字典序(alphabetically)進行排序。後調用 counts.collect() 轉換成對象的數組形式,返回給驅動程序(driverprogram)。
注意:在鍵值(key-value)對操做中,若是使用了自定義對象做爲鍵,你必須確保該對象實現了自定義的 equals()和對應的 hashCode()方法。更多詳情請查看 Object.hashCode() documentation文檔大綱中列出的規定。
Python
雖然在包含任意類型的對象的 RDDs中,可使用大部分的 Spark操做,但也有一
些特殊的操做只能在鍵值(key-value)對的 RDDs上使用。最多見的一個就是分佈式的洗牌("shuffle")操做,諸如基於 key值對元素進行分組或聚合的操做。
在 Python中, RDDs支持的操做包含 Python內置的元組(tuples)操做,好比 (1, 2)。你能夠簡單地建立這樣的元組,而後調用指望的操做。
例如,下面的代碼在鍵值(key-value)對上使用 reduceByKey操做來計算在一個文件中每行文本出現的總次數:
咱們也可使用 counts.sortByKey(),例如,按照字典序(alphabetically)排序鍵值對。最後調用 counts.collect() 轉換成對象的數組形式,返回給驅動程序(driver program)。
轉換
下表中列出了 Spark支持的一些常見的轉換 (Transformations)。詳情請參考 RDDAPI文檔 (Scala, Java, Python)和 pair RDD函數文檔 (Scala, Java)。
Transformation |
Meaning |
map(func) |
返回一個新分佈式數據集,由每個輸入元素通過 func函數轉換後組成。 |
filter(func) |
返回一個新數據集,由通過 func函數計算後返回值爲 true的輸入元素組成。 |
flatMap(func) |
相似於 map,可是每個輸入元素能夠被映射爲 0或多個輸出元素(所以 func應該返回一個序列(Seq),而不是單一元素)。 |
mapPartitions(func) |
相似於 map,但獨立地在 RDD的每個分區(partition,對應塊(block))上運行,當在類型爲 T 的 RDD上運行時, func的函數類型必須是Iterator<T> => Iterator<U>。 |
mapPartitionsWithIndex(func) |
相似於 mapPartitions,但 func帶有一個整數參數表示分區(partition)的索引值。當在類型爲 T的 RDD上運行時, func的函數類型必須是(Int, Iterator<T>) => Iterator<U>。 |
sample(withReplacement, fraction, seed) |
根據 fraction指定的比例,對數據進行採樣,能夠選擇是否用隨機數進行替換, seed用於指定隨機數生成器種子。 |
union(otherDataset) |
返回一個新的數據集,新數據集由源數據集和參數數據集的元素聯合(union)而成。 |
intersection(otherDataset) |
返回一個新的數據集,新數據集由源數據集和參數數據集的元素的交集(intersection)組成。 |
distinct([numTasks])) |
返回一個新的數據集,新數據集由源數據集過濾掉多餘的重複元素只保留一個而成。 |
groupByKey([numTasks]) |
在一個 (K, V)對的數據集上調用,返回一個 (K, Iterable<V>)對的數據集。 注意:若是你想在每一個key上分組執行聚合(如總和或平均值)操做,使用 reduceByKey或combineByKey會產生更好的性能。 注意:默認狀況下,輸出的並行數依賴於父 RDD(parent RDD)的分區數(number of partitions)。你能夠經過傳遞可選的第二個參數 numTasks來設置不一樣的任務數。 |
reduceByKey(func, [numTasks]) |
在一個 (K, V)對的數據集上調用時,返回一個 (K, V)對的數據集,使用指定的 reduce函數func將相同 key的值聚合到一塊兒,該函數的類型必須是 (V,V) => V。相似 groupByKey,reduce的任務個數是能夠經過第二個可選參數來配置的。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
在一個 (K, V)對的數據集上調用時,返回一個 (K, U)對的數據集,對每一個鍵的值使用給定的組合函數(combine functions)和一箇中性的「零」值進行聚合。容許聚合後的值類型不一樣於輸入的值類型,從而避免了沒必要要的內存分配。如同 groupByKey,能夠經過設置第二個可選參數來配置 reduce任務的個數。 |
sortByKey([ascending], [numTasks]) |
在一個 (K, V)對的數據集上調用,其中, K必須實現Ordered,返回一個 按照 Key進行排序的 (K, V)對數據集,升序或降序由布爾參數 ascending決定。 |
join(otherDataset, [numTasks]) |
在類型爲 (K, V)和 (K, W)類型的數據集上調用時,返回一個相同 key 對應的全部元素對在一塊兒的 (K, (V, W))對的數據集。也支持外聯(Outer joins),經過使用 leftOuterJoin和 rightOuterJoin. |
cogroup(otherDataset, [numTasks]) |
在類型爲 (K, V)和 (K, W)的數據集上調用,返回一個 (K, Iterable<V>, Iterable<W>)元組(tuples)的數據集。這個操做也能夠稱之爲 groupWith。 |
cartesian(otherDataset) |
笛卡爾積,在類型爲 T和 U類型的數據集上調用時,返回一個 (T, U)對的數據集(全部元素交互進行笛卡爾積)。 |
pipe(command, [envVars]) |
以管道(Pipe)方式將 RDD的各個分區(partition)傳遞到 shell命令,好比一個 Perl或 bash腳本中。 RDD的元素會被寫入進程的標準輸入(stdin),而且將做爲字符串的 RDD(RDD of strings),在進程的標準輸出(stdout)上輸出一行行數據。 |
coalesce(numPartitions) |
把 RDD的分區數下降到指定的 numPartitions。過濾掉一個大數據集 以後再執行操做會更加有效。 |
repartition(numPartitions) |
隨機地對 RDD的數據從新洗牌(Reshuffle),以便建立更多或更少的分區,對它們進行平衡。老是對網絡上的全部數據進行洗牌(shuffles)。 |
repartitionAndSortWithinPartitions(partitioner) |
根據給定的分區器對RDD進行從新分區,在每一個結果分區中,將記錄按照key值進行排序。這在每一個分區中比先調用repartition再排序效率更高,由於它能夠推進排序到分牌機器上。 |
動做
下表中列出了 Spark支持的一些常見的動做 (actions)。詳情請參考 RDD API文檔
(Scala,Java, Python) 和pair RDD函數文檔(Scala, Java)。
Action |
Meaning |
reduce(func) |
經過函數 func (接受兩個參數,返回一個參數),彙集數據集中的全部元素。該函數應該是可交換和可結合的,以便它能夠正確地並行計算。 |
collect() |
在驅動程序中,以數組的形式,返回數據集的全部元素。這一般會在使用filter或者其它操做,並返回一個足夠小的數據子集後再使用會比較有用 |
count() |
返回數據集的元素的個數。 |
first() |
返回數據集的第一個元素。 (相似於 take(1)). |
take(n) |
返回一個由數據集的前 n個元素組成的數組。注意,這個操做目前不能並行執行,而是由驅動程序(driver program)計算全部的元素。 |
takeSample(withReplacement,num, [seed]) |
返回一個數組,由數據集中隨機採樣的 num個元素組成,能夠選擇是否用隨機數替換不足的部分,能夠指定可選參數seed,預先指定一個隨機數生成器的種子。 |
takeOrdered(n, [ordering]) |
返回一個由數據集的前 n個元素,並使用天然順序或定製順序對這些元素進行排序。 |
saveAsTextFile(path) |
將數據集的元素,以 text file (或 text file的集合)的形式,保存到本地文件系統的指定目錄, Spark會對每一個元素調用 toString方法,而後轉換爲文件中的文本行。 |
saveAsSequenceFile(path) |
將數據集的元素,以 Hadoop sequencefile的格式,保存到各類文件系統的指定路徑下,包括本地系統, HDFS或者任何其它 hadoop支持的文件系統。該方法只能用於鍵值(key-value)對的 RDDs,或者實現了 Hadoop的Writable接口的狀況下。在 Scala中,也能夠用於支持隱式轉換爲 Writable的類型。(Spark包括了基本類型的轉換,例如 Int, Double, String,等等)。 |
saveAsObjectFile(path) |
以簡單地 Java序列化方式將數據集的元素寫入指定的路徑,對應的能夠用 SparkContext.objectFile()加載該文件。 |
countByKey() |
只對 (K,V)類型的 RDD有效。返回一個 (K, Int)對的 hashmap,其中 (K,Int)對錶示每個 key對應的元素個數。 |
foreach(func) |
在數據集的每個元素上,運行 func函數。這一般用於反作用(sideeffects),例如更新一個累加器變量(accumulator variable)(參見下文),或者和外部存儲系統進行交互. |
洗牌操做
Spark觸發一個事件後進行的一些操做成爲洗牌。洗牌是Spark從新分配數據的機制,這樣它就能夠跨分區分組。這一般涉及在執行器和機器之間複製數據,這就使得洗牌是一個複雜和高代價的操做。
背景
爲了理解在洗牌的時候發生了什麼,咱們能夠考慮reduceByKey操做的例子。reduceByKey操做產生了一個新的RDD,在這個RDD中,全部的單個的值被組合成了一個元組,key和執行一個reduce函數後的結果中與這個key有關的全部值。面臨的挑戰是一個key的全部的值並不都是在同一個分區上的,甚至不是一臺機器上的,可是他們必須是可鏈接的以計算結果。
在Spark中,數據通常是不會跨分區分佈的,除非是在一個特殊的地方爲了某種特定的目的。在計算過程當中,單個任務將在單個分區上操做——所以,爲了組織全部數據執行單個reduceByKey中的reduce任務,Spark須要執行一個all-to-all操做。它必須讀取全部分區,找到全部key的值,並跨分區把這些值放到一塊兒來計算每一個key的最終結果——這就叫作洗牌。
儘管在每一個分區中新洗牌的元素集合是肯定性的,分區自己的順序也一樣如此,這些元素的順序就不必定是了。若是指望在洗牌後得到可預測的有序的數據,可使用:
mapPartitions 來排序每一個分區,例如使用.sorted
repartitionAndSortWithinPartitions 在從新分區的同時有效地將分區排序
sortBy來建立一個全局排序的RDD
能夠引發洗牌的操做有重分區例如repartition和coalesce,‘ByKey操做(除了計數)像groupByKey和reduceByKey,還有join操做例如cogroup和join。
性能影響
Shuffle是一個代價高昂的操做,由於它調用磁盤I/O,數據序列化和網絡I/O。要組織shuffle的數據,Spark生成一個任務集合——map任務來組織數據,並使用一組reduce任務集合來聚合它。它的命名來自與MapReduce,但並不直接和Spark的map和reduce操做相關。
在內部,單個的map任務的結果被保存在內存中,直到他們在內存中存不下爲止。而後,他們基於目標分區進行排序,並寫入到一個單個的文件中。在reduce這邊,任務讀取相關的已經排序的塊。
某些shuffle操做會消耗大量的堆內存,由於他們用在內存中的數據結構在轉換操做以前和以後都要對數據進行組織。特別的,reduceByKey和aggregateByKey在map側建立這些結構,‘ByKey操做在reduce側生成這些結構。當數據在內存中存不下時,Spark會將他們存儲到磁盤,形成額外的磁盤開銷和增長垃圾收集。
Shuffle也會在磁盤上產生大量的中間文件。在Spark1.3中,這些文件直到Spark中止運行時纔會從Spark的臨時存儲中清理掉,這意味着長時間運行Spark做業會消耗可觀的磁盤空間。這些作了以後若是lineage從新計算了,那shuffle不須要從新計算了。在配置Spark上下文時,臨時存儲目錄由spark.local.dir配置參數指定。
Shuffle的行爲能夠經過調整各類配置參數來調整。請看Spark配置指南中的Shuffle Behavior部分。
RDD持久化
Spark最重要的一個功能,就是在不一樣操做間,將一個數據集持久化(persisting) (或緩存(caching))到內存中。當你持久化(persist)一個 RDD,每個節點都會把它計算的全部分區(partitions)存儲在內存中,並在對數據集 (或者衍生出的數據集)執行其餘動做(actioins)時重用。這將使得後續動做(actions)的執行變得更加迅速(一般快 10 倍)。緩存(Caching)是用 Spark 構建迭代算法和快速地交互使用的關鍵。
你可使用 persist()或 cache()方法來持久化一個 RDD。在首次被一個動做(action)觸發計算後,它將會被保存到節點的內存中。 Spark 的緩存是帶有容錯機制的,若是 RDD丟失任何一個分區的話,會自動地用原先構建它的轉換(transformations)操做來從新進行計算。
此外,每個被持久化的 RDD均可以用不一樣的存儲級別(storage level)進行存儲,好比,容許你持久化數據集到硬盤,以序列化的 Java對象(節省空間)存儲到內存,跨節點複製,或者以off-heap的方式存儲在 Tachyon。這些級別的選擇,是經過將一個 StorageLevel對象 (Scala Java, Python)傳遞到 persist()方法中進行設置的。 cache()方法是使用默認存儲級別的快捷方法,也就是 StorageLevel.MEMORY_ONLY (將反序列化 (deserialized)的對象存入內存)。完整的可選存儲級別以下:
Storage Level |
Meaning |
MEMORY_ONLY |
將 RDD以反序列化(deserialized)的Java對象存儲到 JVM。若是 RDD不能被內存裝下,一些分區將不會被緩存,而且在須要的時候被從新計算。這是默認的級別。 |
MEMORY_AND_DISK |
將 RDD以反序列化(deserialized)的 Java對象存儲到 JVM。若是 RDD不能被內存裝下,超出的分區將被保存在硬盤上,而且在須要時被讀取。 |
MEMORY_ONLY_SER |
將 RDD以序列化(serialized)的 Java對象進行存儲(每一分區佔用一個字節數組)。一般來講,這比將對象反序列化(deserialized)的空間利用率更高,尤爲當使用快速序列化器(fast serializer),但在讀取時會比較耗 CPU。 |
MEMORY_AND_DISK_SER |
相似於 MEMORY_ONLY_SER,可是把超出內存的分區將存儲在硬盤上而不是在每次須要的時候從新計算。 |
DISK_ONLY |
只將 RDD分區存儲在硬盤上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |
與上述的存儲級別同樣,可是將每個分區都複製到兩個集羣節點上。 |
OFF_HEAP (experimental) |
以序列化的格式 (serialized format) 將 RDD存儲到 Tachyon。相比於MEMORY_ONLY_SER, OFF_HEAP 下降了垃圾收集(garbage collection)的開銷,並使 executors變得更小並且共享內存池,這在大堆(heaps)和多應用並行的環境下是很是吸引人的。並且,因爲 RDDs駐留於 Tachyon中, executor的崩潰不會致使內存中的緩存丟失。在這種模式下, Tachyon中的內存是可丟棄的。所以, Tachyon不會嘗試重建一個在內存中被清除的分塊。 |
注意:在 Python中,存儲對象時老是使用 Pickle庫來序列化(serialized),而無論你是否選擇了一個序列化的級別。Spark也會自動地持久化一些洗牌(shuffle)操做(好比,reduceByKey )的中間數據,即便用戶沒有調用 persist。這麼作是爲了不在一個節點上的洗牌(shuffle)過程失敗時,從新計算整個輸入。咱們仍然建議用戶在結果 RDD 上調用 persist,若是但願重用它的話。
如何選擇存儲級別?
Spark 的存儲級別旨在知足內存使用和CPU效率權衡上的不一樣需求。咱們建議經過如下方法進行選擇:
●若是你的 RDDs能夠很好的與默認的存儲級別(MEMORY_ONLY)契合,就不須要作任何修改了。這已是 CPU使用效率最高的選項,它使得RDDs的操做盡量的快。
●若是不行,試着使用 MEMORY_ONLY_SER,而且選擇一個快速序列化庫使對象在有比較高的空間使用率(space-efficient)的狀況下,依然能夠較快被訪問。
●儘量不要存儲到硬盤上,除非計算數據集的函數的計算量特別大,或者它們過濾了大量的數據。不然,從新計算一個分區的速度,可能和從硬盤中讀取差很少快。
●若是你想有快速的故障恢復能力,使用複製存儲級別(例如:用 Spark來響應 web應用的請求)。全部的存儲級別都有經過從新計算丟失的數據來恢復錯誤的容錯機制,可是複製的存儲級別可讓你在 RDD 上持續地運行任務,而不須要等待丟失的分區被從新計算。
●在大量的內存或多個應用程序的環境下,試驗性的 OFF_HEAP模式具備如下幾個優勢:
o 容許多個 executors共享 Tachyon中相同的內存池。
o 極大地下降了垃圾收集器(garbage collection)的開銷。
o 即便個別的 executors崩潰了,緩存的數據也不會丟失。
移除數據
Spark 會自動監控各個節點上的緩存使用狀況,並使用最近最少使用算法(least-recently-used (LRU))刪除老的數據分區。若是你想手動移除一個 RDD,而不是等它自動從緩存中清除,可使用 RDD.unpersist()方法。
共享變量
通常來講,當一個函數被傳遞給一個在遠程集羣節點上運行的 Spark操做(例如 map或 reduce) 時,它操做的是這個函數用到的全部變量的獨立拷貝。這些變量會被拷貝到每一臺機器,並且在遠程機器上對變量的全部更新都不會被傳播回驅動程序。一般看來,讀-寫任務間的共享變量顯然不夠高效。然而,Spark仍是爲兩種常見的使用模式,提供了兩種有限的共享變量:廣播變量(broadcast variables)和累加器(accumulators)。
廣播變量
廣播變量容許程序員保留一個只讀的變量,緩存在每一臺機器上,而不是每一個任務保存一份拷貝。它們能夠這樣被使用,例如,以一種高效的方式給每一個節點一個大的輸入數據集。Spark會嘗試使用一種高效的廣播算法來傳播廣播變量,從而減小通訊的代價。
Spark動做的執行是經過一個階段的集合,經過分佈式的Shuffle操做分離。Spark自動廣播在每一個階段裏任務須要的共同數據。以這種方式廣播的數據以序列化的形式緩存並在運行每一個任務以前進行反序列化。這意味着顯式地建立廣播變量只在當多個階段之間須要相同的數據或者是當用反序列化的形式緩存數據特別重要的時候。
廣播變量是經過調用 SparkContext.broadcast(v)方法從變量 v建立的。廣播變量是一個 v的封裝器,它的值能夠經過調用 value方法得到。以下代碼展現了這個:
Scala
Java
Python
在廣播變量被建立後,它應該在集羣運行的任何函數中,代替 v值被調用,從而 v值不須要被再次傳遞到這些節點上。另外,對象 v不能在廣播後修改,這樣能夠保證全部節點具備相同的廣播變量的值(好比,後續若是變量被傳遞到一個新的節點)。
累加器
累加器是一種只能經過具備結合性的操做(associative operation)進行「加(added)」的變量,所以能夠高效地支持並行。它們能夠用來實現計數器(如 MapReduce 中)和求和器。 Spark原生就支持數值類型的累加器,開發者能夠本身添加新的支持類型。若是建立了一個命名的累加器(accumulators),這些累加器將會顯示在 Spark UI 界面上。這對於瞭解當前運行階段(stages)的進展狀況是很是有用的(注意:這在 Python中還沒有支持)。
一個累加器能夠經過調用 SparkContext.accumulator(v)方法從一個初始值 v中建立。運行在集羣上的任務,能夠經過使用 add方法或 +=操做符(在 Scala和 Python)來給它加值。然而,它們不能讀取這個值。只有驅動程序可使用 value方法來讀取累加器的值。
如下代碼展現瞭如何利用一個累加器,將一個數組裏面的全部元素相加:
Scala
Java
Python
雖然代碼可使用內置支持的 Int類型的累加器,但程序員也能夠經過子類化(subclassing) AccumulatorParam來建立本身的類型。AccumulatorParam接口有兩個方法: zero,爲你的數據類型提供了一個「零值(zero value)」,以及 addInPlace提供了兩個值相加的方法。好比,假設咱們有一個表示數學上向量的 Vector類,咱們能夠這麼寫:
Scala
在 Scala中, Spark也支持更通用的 Accumulable接口去累加數據,其結果類型和累加的元素不一樣(好比,構建一個包含全部元素的列表),而且SparkContext.accumulableCollection方法能夠累加普通的 Scala集合(collection)類型。
Java
在 Java中, Spark也支持更通用的 Accumulable接口去累加數據,其結果類型和累加的元素不一樣(好比,構建一個包含全部元素的列表)。
Python
由於累加器的更新只在action中執行,Spark確保每一個任務對累加器的更新都只會被應用一次,例如,重啓任務將不會更新這個值。在轉換中,用戶應該清楚若是任務或者做業階段是重複運行的,每一個任務的更新可能會應用不止一次。
累加器不會改變Spark的懶惰評價模型。若是它們在一個RDD的操做中正在被更新,他們的值只會被更新一次,RDD做爲動做的一部分被計算。所以,累加器更新當在執行一個懶惰轉換,例如map()時,並不保證被執行。下面的代碼段演示了這個屬性:
Scala
Java
Python
把代碼部署到集羣上
應用程序提交指南(application submission guide)描述瞭如何將應用程序提交到一個集羣,簡單地說,一旦你將你的應用程序打包成一個JAR(對於 Java/Scala)或者一組的 .py或 .zip文件 (對於 Python), bin/spark-submit 腳本可讓你將它提交到支持的任何集羣管理器中。
從Java/Scala中啓動Spark做業
Org.apache.spark.launcher包中提供了相關類來啓動Spark做業做爲子線程的簡單的Java API。
單元測試
Spark 對單元測試很是友好,可使用任何流行的單元測試框架。在你的測試中簡單地建立一個 SparkContext,並將 master URL設置成local,運行你的各類操做,而後調用 SparkContext.stop()結束測試。確保在 finally塊或測試框架的 tearDown方法中調用 context的 stop方法,由於 Spark不支持在一個程序中同時運行兩個contexts。
Spark1.0以前版本的遷移
Scala
Spark 1.0 凍結了 1.X系列的 Spark核心(Core) API,如今,其中的 API,除了標識爲「試驗性(experimental)」或「開發者的(developer) API」的,在未來的版本中都會被支持。對 Scala用戶而言,惟一的改變在於組操做(grouping operations),好比, groupByKey, cogroup和 join,其返回值已經從 (Key, Seq[Value])對修改成 (Key,Iterable[Value])。
遷移指南也能夠從 Spark Streaming, MLlib和 GraphX獲取。
Java
Spark 1.0 凍結了 1.X系列的 Spark核心(Core) API,如今,其中的 API,只要不是標識爲「試驗性(experimental)」或「開發者的(developer) API」的,在未來的版本中都會被支持。其中對 Java API作了一些修改:
•對於 org.apache.spark.api.java.function中的類函數(Function classes),在 1.0版本中變成了接口,這意味着舊的代碼中 extends Function應該須要爲 implement Function。
•增長了 map轉換(transformations)的新變體,如 mapToPair和 mapToDouble,用於建立指定數據類型的 RDDs。
•組操做(grouping operations),如 groupByKey, cogroup 和 join的返回值也被修改了,從原先返回 (Key, List<Value>)對改成(Key,Iterable<Value>)。
遷移指南也能夠從 Spark Streaming, MLlib和 GraphX獲取。
Python
Spark 1.0 凍結了 1.X系列的 Spark核心(Core) API,如今,其中的 API,只要不是
標識爲「試驗性(experimental)」或「開發者的(developer) API」的,在未來的版本中
都會被支持。對 Python用戶而言,惟一的修改在於分組操做(grouping operations),比
如groupByKey,cogroup和join,其返回值從 (key, list of values)對修改成 (key,
iterableof values)。
遷移指南也能夠從 Spark Streaming, MLlib和 GraphX獲取。
下一步
你能夠在 Spark的網站上看到 spark程序的樣例。另外,Spark在 examples目錄 (Scala, Java, Python,R)中也包含了一些樣例。你能夠經過將類名傳遞給 spark的 bin/run-example腳原本運行 Java和 Scala的樣例,例如:
對於 Python樣例,要使用 spark-submit:
對於R樣例,使用spark-submit:
爲了幫助優化你的程序,在配置(configuration)和調優(tuning)的指南上提供了最佳實踐信息。它們在確保將你的數據用一個有效的格式存儲在內存上,是很是重要的。對於部署的幫助信息,能夠查看集羣模式概述(cluster mode overview),描述了分佈式操做以及支持集羣管理器所涉及的組件。
最後,完整的 API文檔能夠查看 Scala, Java,Python和R。