install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
做爲示例,咱們將建立一個簡單的Spark應用程序SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
·該程序只計算包含'a'的行數和包含文本文件中'b'的數字。
·請注意,您須要將YOUR_SPARK_HOME替換爲安裝Spark的位置。
·與Scala和Java示例同樣,咱們使用SparkSession來建立數據集。
·對於使用自定義類或第三方庫的應用程序,咱們還能夠經過將它們打包到.zip文件中來添加代碼依賴關係以經過其--py-files參數進行spark-submit(有關詳細信息,請參閱spark-submit --help)。
·SimpleApp很是簡單,咱們不須要指定任何代碼依賴項。
咱們可使用bin / spark-submit腳本運行此應用程序:
若是您的環境中安裝了PySpark pip(例如,pip install pyspark),您可使用常規Python解釋器運行您的應用程序,或者根據您的喜愛使用提供的「spark-submit」。# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
# Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23
RDD Programming Guide(RDD編程指南)
Overview(概觀)
·在較高的層次上,每一個Spark應用程序都包含一個驅動程序,該程序運行用戶的主要功能並在羣集上執行各類並行操做。
·Spark提供的主要抽象是彈性分佈式數據集(RDD),它是跨羣集節點分區的元素的集合,能夠並行操做。
·RDD是經過從Hadoop文件系統(或任何其餘Hadoop支持的文件系統)中的文件或驅動程序中的現有Scala集合開始並對其進行轉換而建立的。
·用戶還能夠要求Spark在內存中保留RDD,容許它在並行操做中有效地重用。
·最後,RDD會自動從節點故障中恢復。
·Spark中的第二個抽象是能夠在並行操做中使用的共享變量。
·默認狀況下,當Spark並行運行一個函數做爲不一樣節點上的一組任務時,它會將函數中使用的每一個變量的副本發送給每一個任務。
·有時,變量須要跨任務共享,或者在任務和驅動程序之間共享。
·Spark支持兩種類型的共享變量:廣播變量,可用於緩存全部節點的內存中的值;累加器,它們是僅「添加」到的變量,例如計數器和總和。
·本指南以Spark支持的每種語言顯示了這些功能。
·若是你啓動Spark的交互式shell,最簡單的方法就是 - 用於Scala shell的bin / spark-shell或用於Python的bin / pyspark。
Linking with Spark(與Spark連接)
·Spark 2.4.2適用於Python 2.7+或Python 3.4+。
·它可使用標準的CPython解釋器,所以可使用像NumPy這樣的C庫。
·它也適用於PyPy 2.3+。
·Spark 2.2.0中刪除了Python 2.6支持。
·Python中的Spark應用程序可使用bin / spark-submit腳本運行,該腳本在運行時包含Spark,也能夠將其包含在setup.py中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
·要在不使用pip安裝PySpark的狀況下在Python中運行Spark應用程序,請使用位於Spark目錄中的bin / spark-submit腳本。
·此腳本將加載Spark的Java / Scala庫,並容許您將應用程序提交到羣集。
·您還可使用bin / pyspark來啓動交互式Python shell。
·若是您但願訪問HDFS數據,則須要使用PySpark構建連接到您的HDFS版本。
·Spark主頁上還提供了預構建的軟件包,可用於常見的HDFS版本。
·最後,您須要將一些Spark類導入到您的程序中。
·添加如下行:
from pyspark import SparkContext, SparkConf
·PySpark在驅動程序和工做程序中都須要相同的次要版本的Python。
·它使用PATH中的默認python版本,您能夠指定PYSPARK_PYTHON要使用的Python版本,例如:
$ PYSPARK_PYTHON=python3.4 bin/pyspark $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
Initializing Spark(初始化Spark)
·Spark程序必須作的第一件事是建立一個SparkContext對象,它告訴Spark如何訪問集羣。
·要建立SparkContext,首先須要構建一個包含有關應用程序信息的SparkConf對象。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
·appName參數是應用程序在集羣UI上顯示的名稱。
·master是Spark,Mesos或YARN羣集URL,或者是以本地模式運行的特殊「本地」字符串。
·實際上,在羣集上運行時,您不但願在程序中對master進行硬編碼,而是使用spark-submit啓動應用程序並在那裏接收它。
·可是,對於本地測試和單元測試,您能夠傳遞「local」以在進程中運行Spark。
Using the Shell(使用Shell)
·在PySpark shell中,已經爲你建立了一個特殊的解釋器感知SparkContext,名爲sc。
·製做本身的SparkContext將沒法正常工做。
·您可使用--master參數設置上下文鏈接到的主服務器,而且能夠經過將逗號分隔的列表傳遞給--py-files將Python .zip,.egg或.py文件添加到運行時路徑。
·您還能夠經過向--packages參數提供以逗號分隔的Maven座標列表,將依賴項(例如Spark包)添加到shell會話中。
·任何可能存在依賴關係的其餘存儲庫(例如Sonatype)均可以傳遞給--repositories參數。
·必要時,必須使用pip手動安裝Spark軟件包具備的任何Python依賴項(在該軟件包的requirements.txt中列出)。
·例如,要在四個核心上運行bin / pyspark,請使用:
或者,要將code.py添加到搜索路徑(以便之後可以導入代碼),請使用:$ ./bin/pyspark --master local[4]
$ ./bin/pyspark --master local[4] --py-files code.py
·有關選項的完整列表,請運行pyspark --help。
·在幕後,pyspark調用更通常的spark-submit腳本。
·也能夠在加強的Python解釋器IPython中啓動PySpark shell。
·PySpark適用於IPython 1.0.0及更高版本。
·要使用IPython,請在運行bin / pyspark時將PYSPARK_DRIVER_PYTHON變量設置爲ipython:
要使用Jupyter notebook(之前稱爲IPython notebook)$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
·您能夠經過設置PYSPARK_DRIVER_PYTHON_OPTS來自定義ipython或jupyter命令。
·啓動Jupyter Notebook服務器後,您能夠從「文件」選項卡建立一個新的「Python 2」筆記本。
·在筆記本內部,您能夠在開始嘗試使用Jupyter notebook中的Spark以前輸入命令%pylab inline做爲筆記本的一部分。
Resilient Distributed Datasets (彈性分佈式數據集)(RDDs)
·Spark圍繞彈性分佈式數據集(RDD)的概念展開,RDD是一個能夠並行操做的容錯的容錯集合。
·建立RDD有兩種方法:並行化驅動程序中的現有集合,或引用外部存儲系統中的數據集,例如共享文件系統,HDFS,HBase或提供Hadoop InputFormat的任何數據源。
Parallelized Collections(並行化集合)
·經過在驅動程序中的現有可迭代或集合上調用SparkContext的parallelize方法來建立並行化集合。
·複製集合的元素以造成能夠並行操做的分佈式數據集。
·例如,如下是如何建立包含數字1到5的並行化集合:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
·一旦建立,分佈式數據集(distData)能夠並行操做。
·例如,咱們能夠調用distData.reduce(lambda a,b:a + b)來添加列表的元素。
·咱們稍後將描述對分佈式數據集的操做。
·並行集合的一個重要參數是將數據集切割爲的分區數。
·Spark將爲羣集的每一個分區運行一個任務。
·一般,您但願羣集中的每一個CPU有2-4個分區。
·一般,Spark會嘗試根據您的羣集自動設置分區數。
·可是,您也能夠經過將其做爲第二個參數傳遞給並行化來手動設置它(例如sc.parallelize(data,10))。
·注意:代碼中的某些位置使用術語切片(分區的同義詞)來保持向後兼容性。
External Datasets(外部數據集)
·PySpark能夠從Hadoop支持的任何存儲源建立分佈式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等.Spark支持文本文件,SequenceFiles和任何其餘Hadoop InputFormat。
·可使用SparkContext的textFile方法建立文本文件RDD。
·此方法獲取文件的URI(計算機上的本地路徑,或hdfs://,s3a://等URI)並將其做爲行集合讀取。
·這是一個示例調用:
>>> distFile = sc.textFile("data.txt")
·建立後,distFile能夠由數據集操做執行。
·例如,咱們可使用map添加全部行的大小,並按以下方式減小操做:distFile.map(lambda s:len(s))。reduce(lambda a,b:a + b)。
·有關使用Spark讀取文件的一些注意事項
·若是在本地文件系統上使用路徑,則還必須能夠在工做節點上的相同路徑上訪問該文件。
·將文件複製到全部工做者或使用網絡安裝的共享文件系統。
·Spark的全部基於文件的輸入方法(包括textFile)都支持在目錄,壓縮文件和通配符上運行。
·例如,您可使用textFile(「/ my / directory」),textFile(「/ my / directory / * .txt」)和textFile(「/ my / directory / * .gz」)。
·textFile方法還採用可選的第二個參數來控制文件的分區數。
·默認狀況下,Spark爲文件的每一個塊建立一個分區(HDFS中默認爲128MB),但您也能夠經過傳遞更大的值來請求更多的分區。
·請注意,您不能擁有比塊少的分區。
·除文本文件外,Spark的Python API還支持其餘幾種數據格式:
·SparkContext.wholeTextFiles容許您讀取包含多個小文本文件的目錄,並將它們做爲(文件名,內容)對返回。
·這與textFile造成對比,textFile將在每一個文件中每行返回一條記錄。
·RDD.saveAsPickleFile和SparkContext.pickleFile支持以包含pickle Python對象的簡單格式保存RDD。
·批處理用於pickle序列化,默認批處理大小爲10。
·SequenceFile和Hadoop輸入/輸出格式
·請注意,此功能目前標記爲「實驗」,適用於高級用戶。
·未來可能會使用基於Spark SQL的讀/寫支持替換它,在這種狀況下,Spark SQL是首選方法。
·可寫支持
·PySpark SequenceFile支持在Java中加載鍵值對的RDD,將Writable轉換爲基本Java類型,並使用Pyrolite挖掘生成的Java對象。
·將鍵值對的RDD保存到SequenceFile時,PySpark會反過來。
·它將Python對象解開爲Java對象,而後將它們轉換爲Writable。
·如下Writable會自動轉換:
Writable Type(可寫類型) |
Python Type |
Text |
unicode str |
IntWritable |
int |
FloatWritable |
float |
DoubleWritable |
float |
BooleanWritable |
bool |
BytesWritable |
bytearray |
NullWritable |
None |
MapWritable |
dict |
·數組不是開箱即用的。
·用戶在讀取或寫入時須要指定自定義ArrayWritable子類型。
·編寫時,用戶還須要指定將數組轉換爲自定義ArrayWritable子類型的自定義轉換器。
·在讀取時,默認轉換器將自定義ArrayWritable子類型轉換爲Java Object [],而後將其pickle到Python元組。
·要爲原始類型的數組獲取Python array.array,用戶須要指定自定義轉換器。
Saving and Loading SequenceFiles(保存和加載SequenceFiles)
·與文本文件相似,能夠經過指定路徑來保存和加載SequenceFiles。
·能夠指定鍵和值類,但對於標準Writable,這不是必需的。
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
·請注意,若是InputFormat僅依賴於Hadoop配置和/或輸入路徑,而且能夠根據上表輕鬆轉換鍵和值類,則此方法應適用於此類狀況。
·若是您有自定義序列化二進制數據(例如從Cassandra / HBase加載數據),那麼您首先須要將Scala / Java端的數據轉換爲可由Pyrolite的pickler處理的數據。
·爲此提供了轉換器特性。
·只需擴展此特徵並在convert方法中實現轉換代碼。
·請記住確保將此類以及訪問InputFormat所需的任何依賴項打包到Spark做業jar中幷包含在PySpark類路徑中。
·有關使用Cassandra / HBase InputFormat和OutputFormat以及自定義轉換器的示例,請參閱Python示例和Converter示例。
RDD Operations(RDD操做)
·RDD支持兩種類型的操做:轉換(從現有數據集建立新數據集)和操做(在數據集上運行計算後將值返回到驅動程序)。
·例如,map是一個轉換,它經過一個函數傳遞每一個數據集元素,並返回一個表示結果的新RDD。
·另外一方面,reduce是一個使用某個函數聚合RDD的全部元素的操做,並將最終結果返回給驅動程序(儘管還有一個返回分佈式數據集的並行reduceByKey)。
·Spark中的全部轉換都是惰性的,由於它們不會當即計算結果。
·相反,他們只記得應用於某些基礎數據集(例如文件)的轉換。
·僅當操做須要將結果返回到驅動程序時纔會計算轉換。
·這種設計使Spark可以更有效地運行。
·例如,咱們能夠意識到經過map建立的數據集將用於reduce,而且僅將reduce的結果返回給驅動程序,而不是更大的映射數據集。
·默認狀況下,每次對其執行操做時,均可以從新計算每一個轉換後的RDD。
·可是,您也可使用持久化(或緩存)方法在內存中保留RDD,在這種狀況下,Spark會在羣集上保留元素,以便在下次查詢時更快地訪問。
·還支持在磁盤上保留RDD或在多個節點上覆制。
Basics(基本)
爲了說明RDD基礎知識,請考慮如下簡單程序:html
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
·第一行定義來自外部文件的基本RDD。
·此數據集未加載到內存中或以其餘方式執行:行僅僅是指向文件的指針。
·第二行將lineLengths定義爲地圖轉換的結果。
·一樣,因爲懶惰,lineLengths不會當即計算。
·最後,咱們運行reduce,這是一個動做。
·此時,Spark將計算分解爲在不一樣機器上運行的任務,而且每臺機器都運行其部分映射和本地縮減,僅返回其對驅動程序的答案。
·若是咱們之後想再次使用lineLengths,咱們能夠添加:
在reduce以前,這將致使lineLengths在第一次計算以後保存在內存中。lineLengths.persist()
Passing Functions to Spark(將函數傳遞給Spark)
·Spark的API在很大程度上依賴於在驅動程序中傳遞函數以在集羣上運行。
·有三種建議的方法能夠作到這一點:
·Lambda表達式,用於能夠做爲表達式編寫的簡單函數。
·(Lambdas不支持多語句函數或不返回值的語句。)
·調用Spark的函數內部的本地defs,用於更長的代碼。
·模塊中的頂級函數。
·例如,要傳遞比使用lambda支持的更長的函數,請考慮如下代碼:
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
·請注意,雖然也能夠將引用傳遞給類實例中的方法(而不是單例對象),但這須要發送包含該類的對象以及方法。
·例如,考慮:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
·在這裏,若是咱們建立一個新的MyClass並在其上調用doStuff,那裏的map會引用該MyClass實例的func方法,所以須要將整個對象發送到集羣。
·以相似的方式,訪問外部對象的字段將引用整個對象:
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s)
要避免此問題,最簡單的方法是將字段複製到局部變量中,而不是從外部訪問它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + s)
Understanding closures(理解閉包)
·Spark的一個難點是在跨集羣執行代碼時理解變量和方法的範圍和生命週期。
·修改其範圍以外的變量的RDD操做可能常常引發混淆。
·在下面的示例中,咱們將查看使用foreach()遞增計數器的代碼,但一樣的問題也可能發生在其餘操做中。
Example
·考慮下面的天真RDD元素總和,根據執行是否在同一JVM中發生,它可能表現不一樣。
·一個常見的例子是在本地模式下運行Spark(--master = local [n])而不是將Spark應用程序部署到集羣(例如經過spark-submit to YARN):
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)
Local vs. cluster modes(本地與羣集模式)
·上述代碼的行爲未定義,可能沒法按預期工做。
·爲了執行做業,Spark將RDD操做的處理分解爲任務,每一個任務都由執行程序執行。
·在執行以前,Spark計算任務的閉包。
·閉包是那些變量和方法,它們必須是可見的,以便執行程序在RDD上執行其計算(在本例中爲foreach())。
·該閉包被序列化併發送給每一個執行者。
·發送給每一個執行程序的閉包內的變量如今是副本,所以,當在foreach函數中引用計數器時,它再也不是驅動程序節點上的計數器。
·驅動程序節點的內存中仍然有一個計數器,但執行程序再也不可見!
·執行程序只能看到序列化閉包中的副本。
·所以,計數器的最終值仍然爲零,由於計數器上的全部操做都引用了序列化閉包內的值。
·在本地模式下,在某些狀況下,foreach函數實際上將在與驅動程序相同的JVM中執行,並將引用相同的原始計數器,而且可能實際更新它。
·爲了確保在這些場景中定義良好的行爲,應該使用累加器。
·Spark中的累加器專門用於提供一種機制,用於在跨集羣中的工做節點拆分執行時安全地更新變量。
·本指南的「累加器」部分更詳細地討論了這些內容。
·一般,閉包 - 相似循環或本地定義的方法的構造不該該用於改變某些全局狀態。
·Spark沒有定義或保證從閉包外部引用的對象的突變行爲。
·執行此操做的某些代碼可能在本地模式下工做,但這只是偶然的,而且此類代碼在分佈式模式下不會按預期運行。
·若是須要某些全局聚合,請使用累加器。
Printing elements of an RDD(打印RDD的元素)
·另外一個常見的習慣用法是嘗試使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。
·在一臺機器上,這將生成預期的輸出並打印全部RDD的元素。
·可是,在集羣模式下,執行程序調用的stdout輸出如今寫入執行程序的stdout,而不是驅動程序上的那個,所以驅動程序上的stdout不會顯示這些!
·要打印驅動程序上的全部元素,可使用collect()方法首先將RDD帶到驅動程序節點:rdd.collect()。foreach(println)。
·可是,這會致使驅動程序內存不足,由於collect()會將整個RDD提取到一臺機器上;
·若是你只須要打印RDD的一些元素,更安全的方法是使用take():rdd.take(100).foreach(println)。
Working with Key-Value Pairs(使用鍵值對)
·雖然大多數Spark操做都適用於包含任何類型對象的RDD,但一些特殊操做僅適用於鍵值對的RDD。
·最多見的是分佈式「隨機」操做,例如經過密鑰對元素進行分組或聚合。
·在Python中,這些操做適用於包含內置Python元組的RDD,如(1,2)。
·只需建立這樣的元組,而後調用您想要的操做。
·例如,如下代碼對鍵值對使用reduceByKey操做來計算文件中每行文本出現的次數:
例如,咱們也可使用counts.sortByKey()來按字母順序對這些對進行排序,最後使用counts.collect()將它們做爲對象列表返回到驅動程序。lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
·下表列出了Spark支持的一些常見轉換。
·有關詳細信息,請參閱RDD API文檔(Scala,Java,Python,R)並配對RDD函數doc(Scala,Java)。
·轉型意義
·map(func)返回經過函數func傳遞源的每一個元素造成的新分佈式數據集。
·filter(func)返回經過選擇func返回true的源元素造成的新數據集。
·flatMap(func)與map相似,但每一個輸入項能夠映射到0個或更多輸出項(所以func應返回Seq而不是單個項)。
·mapPartitions(func)與map相似,但在RDD的每一個分區(塊)上單獨運行,所以當在類型T的RDD上運行時,func必須是Iterator => Iterator
類型。
·mapPartitionsWithIndex(func)與mapPartitions相似,但也爲func提供了一個表示分區索引的整數值,所以當在RDD類型上運行時,func必須是類型(Int,Iterator )=> Iterator
·T.
·sample(withReplacement,fraction,seed)使用給定的隨機數生成器種子,使用或不使用替換對數據的一小部分進行採樣。
·union(otherDataset)返回一個新數據集,其中包含源數據集和參數中元素的並集。
·intersection(otherDataset)返回包含源數據集和參數中元素交集的新RDD。
·distinct([numPartitions]))返回包含源數據集的不一樣元素的新數據集。
·groupByKey([numPartitions])在(K,V)對的數據集上調用時,返回(K,Iterable )對的數據集。
·注意:若是要對每一個鍵執行聚合(例如總和或平均值)進行分組,則使用reduceByKey或aggregateByKey將產生更好的性能。
·注意:默認狀況下,輸出中的並行級別取決於父RDD的分區數。
·您能夠傳遞可選的numPartitions參數來設置不一樣數量的任務。
·reduceByKey(func,[numPartitions])當在(K,V)對的數據集上調用時,返回(K,V)對的數據集,其中使用給定的reduce函數func聚合每一個鍵的值,該函數必須是
·type(V,V)=> V.與groupByKey相似,reduce任務的數量可經過可選的第二個參數進行配置。
·aggregateByKey(zeroValue)(seqOp,combOp,[numPartitions])在(K,V)對的數據集上調用時,返回(K,U)對的數據集,其中使用給定的組合函數聚合每一個鍵的值,
·中性的「零」值。
·容許與輸入值類型不一樣的聚合值類型,同時避免沒必要要的分配。
·與groupByKey相似,reduce任務的數量可經過可選的第二個參數進行配置。
·sortByKey([ascending],[numPartitions])在K實現Ordered的(K,V)對的數據集上調用時,返回按鍵按升序或降序排序的(K,V)對數據集,如
·布爾升序參數。
·join(otherDataset,[numPartitions])當調用類型爲(K,V)和(K,W)的數據集時,返回(K,(V,W))對的數據集以及每一個鍵的全部元素對。
·經過leftOuterJoin,rightOuterJoin和fullOuterJoin支持外鏈接。
·cogroup(otherDataset,[numPartitions])當調用類型爲(K,V)和(K,W)的數據集時,返回(K,(Iterable ,Iterable ))元組的數據集。
·此操做也稱爲groupWith。
·cartesian(otherDataset)當調用類型爲T和U的數據集時,返回(T,U)對的數據集(全部元素對)。
·pipe(command,[envVars])經過shell命令管道RDD的每一個分區,例如:
·一個Perl或bash腳本。
·RDD元素被寫入進程的stdin,而且輸出到其stdout的行將做爲字符串的RDD返回。
·coalesce(numPartitions)將RDD中的分區數減小爲numPartitions。
·過濾大型數據集後,能夠更有效地運行操做。
·repartition(numPartitions)隨機從新調整RDD中的數據以建立更多或更少的分區並在它們之間進行平衡。
·這老是隨機播放網絡上的全部數據。
·repartitionAndSortWithinPartitions(partitioner)根據給定的分區程序從新分區RDD,並在每一個生成的分區中按鍵對記錄進行排序。
·這比調用從新分區而後在每一個分區內排序更有效,由於它能夠將排序推送到shuffle機器中。
Actions(動做)
·下表列出了Spark支持的一些常見操做。
·請參閱RDD API文檔(Scala,Java,Python,R)
·並配對RDD函數doc(Scala,Java)以獲取詳細信息。
·行動意義
·reduce(func)使用函數func(它接受兩個參數並返回一個)來聚合數據集的元素。
·該函數應該是可交換的和關聯的,以即可以並行正確計算。
·collect()在驅動程序中將數據集的全部元素做爲數組返回。
·在過濾器或其餘返回足夠小的數據子集的操做以後,這一般頗有用。
·count()返回數據集中的元素數。
·first()返回數據集的第一個元素(相似於take(1))。
·take(n)返回包含數據集的前n個元素的數組。
·takeSample(withReplacement,num,[seed])返回一個數組,其中包含數據集的num個元素的隨機樣本,有或沒有替換,可選地預先指定隨機數生成器種子。
·takeOrdered(n,[ordering])使用天然順序或自定義比較器返回RDD的前n個元素。
·saveAsTextFile(path)將數據集的元素寫爲本地文件系統,HDFS或任何其餘Hadoop支持的文件系統中給定目錄中的文本文件(或文本文件集)。
·Spark將在每一個元素上調用toString,將其轉換爲文件中的一行文本。
·saveAsSequenceFile(路徑)
·(Java和Scala)將數據集的元素做爲Hadoop SequenceFile寫入本地文件系統,HDFS或任何其餘Hadoop支持的文件系統中的給定路徑中。
·這能夠在實現Hadoop的Writable接口的鍵值對的RDD上使用。
·在Scala中,它也能夠在可隱式轉換爲Writable的類型上使用(Spark包括基本類型的轉換,如Int,Double,String等)。
·saveAsObjectFile(路徑)
·(Java和Scala)使用Java序列化以簡單格式編寫數據集的元素,而後可使用SparkContext.objectFile()加載它。
·countByKey()僅適用於類型爲(K,V)的RDD。
·返回(K,Int)對的散列映射,其中包含每一個鍵的計數。
·foreach(func)對數據集的每一個元素運行函數func。
·這一般用於反作用,例如更新累加器或與外部存儲系統交互。
·注意:在foreach()以外修改除累加器以外的變量可能會致使未定義的行爲。
·有關詳細信息,請參閱瞭解閉包。
·Spark RDD API還公開了某些操做的異步版本,例如foreach的foreachAsync,它會當即將一個FutureAction返回給調用者,而不是在完成操做時阻塞。
·這可用於管理或等待操做的異步執行。
Shuffle operations(隨機操做)
·Spark中的某些操做會觸發稱爲shuffle的事件。
·隨機播放是Spark的從新分配數據的機制,所以它能夠跨分區進行不一樣的分組。
·這一般涉及跨執行程序和機器複製數據,使得混洗成爲複雜且昂貴的操做。
Background(背景)
·爲了理解在shuffle期間發生的事情,咱們能夠考慮reduceByKey操做的示例。
·reduceByKey操做生成一個新的RDD,其中單個鍵的全部值都組合成一個元組 - 鍵和對與該鍵關聯的全部值執行reduce函數的結果。
·挑戰在於,並不是單個密鑰的全部值都必須位於同一個分區,甚至是同一個機器上,但它們必須位於同一位置才能計算結果。
·在Spark中,數據一般不跨分區分佈,以便在特定操做的必要位置。
·在計算過程當中,單個任務將在單個分區上運行 - 所以,要組織單個reduceByKey reduce任務執行的全部數據,Spark須要執行所有操做。
·它必須從全部分區讀取以查找全部鍵的全部值,而後將分區中的值彙總在一塊兒以計算每一個鍵的最終結果 - 這稱爲shuffle。
·儘管新洗牌數據的每一個分區中的元素集將是肯定性的,而且分區自己的排序也是如此,但這些元素的排序不是。
·若是在隨機播放後須要可預測的有序數據,則可使用:
·mapPartitions使用例如.sorted對每一個分區進行排序
·repartitionAndSortWithinPartitions在同時從新分區的同時有效地對分區進行排序
·sortBy來建立一個全局排序的RDD
·能夠致使混洗的操做包括從新分區操做,如從新分區和合並,「ByKey操做(計數除外),如groupByKey和reduceByKey,以及聯合操做,如cogroup和join。
·Shuffle是一項昂貴的操做,由於它涉及磁盤I / O,數據序列化和網絡I / O.
·爲了組織shuffle的數據,Spark生成了一系列任務 - 映射任務以組織數據,以及一組reduce任務來聚合它。
·這個術語來自MapReduce,並不直接與Spark的地圖和減小操做相關。
·在內部,各個地圖任務的結果會保留在內存中,直到它們沒法適應。
·而後,這些基於目標分區進行排序並寫入單個文件。
·在reduce方面,任務讀取相關的排序塊。
·某些shuffle操做會消耗大量的堆內存,由於它們使用內存中的數據結構來在傳輸記錄以前或以後組織記錄。
·具體來講,reduceByKey和aggregateByKey在地圖側建立這些結構,而且'ByKey操做在reduce側生成這些結構。
·當數據不適合內存時,Spark會將這些表溢出到磁盤,從而致使磁盤I / O的額外開銷和垃圾收集增長。
·Shuffle還會在磁盤上生成大量中間文件。
·從Spark 1.3開始,這些文件將被保留,直到再也不使用相應的RDD並進行垃圾回收。
·這樣作是爲了在從新計算譜系時不須要從新建立shuffle文件。
·若是應用程序保留對這些RDD的引用或GC不常常啓動,則垃圾收集可能僅在很長一段時間後纔會發生。
·這意味着長時間運行的Spark做業可能會佔用大量磁盤空間。
·配置Spark上下文時,spark.local.dir配置參數指定臨時存儲目錄。
·能夠經過調整各類配置參數來調整隨機行爲。
·請參閱「Spark配置指南」中的「隨機行爲」部分。
RDD Persistence(RDD持久性)
·Spark中最重要的功能之一是跨操做在內存中持久化(或緩存)數據集。
·當您持久保存RDD時,每一個節點都會存儲它在內存中計算的任何分區,並在該數據集(或從中派生的數據集)的其餘操做中重用它們。
·這使得將來的行動更快(一般超過10倍)。
·緩存是迭代算法和快速交互式使用的關鍵工具。
·您可使用persist()或cache()方法標記要保留的RDD。
·第一次在動做中計算它,它將保留在節點的內存中。
·Spark的緩存是容錯的 - 若是丟失了RDD的任何分區,它將使用最初建立它的轉換自動從新計算。
·此外,每一個持久化RDD可使用不一樣的存儲級別進行存儲,例如,容許您將數據集保留在磁盤上,將其保留在內存中,但做爲序列化Java對象(以節省空間),跨節點複製它。
·經過將StorageLevel對象(Scala,Java,Python)傳遞給persist()來設置這些級別。
·cache()方法是使用默認存儲級別的簡寫,即StorageLevel.MEMORY_ONLY(在內存中存儲反序列化的對象)。
·完整的存儲級別是:
·存儲級別含義
·MEMORY_ONLY將RDD存儲爲JVM中的反序列化Java對象。
·若是RDD不適合內存,則某些分區將不會被緩存,而且每次須要時都會從新計算。
·這是默認級別。
·MEMORY_AND_DISK將RDD存儲爲JVM中的反序列化Java對象。
·若是RDD不適合內存,請存儲不適合磁盤的分區,並在須要時從那裏讀取它們。
·MEMORY_ONLY_SER
·(Java和Scala)將RDD存儲爲序列化Java對象(每一個分區一個字節數組)。
·這一般比反序列化對象更節省空間,特別是在使用快速序列化器時,但讀取CPU密集程度更高。
·MEMORY_AND_DISK_SER
·(Java和Scala)與MEMORY_ONLY_SER相似,可是將不適合內存的分區溢出到磁盤,而不是每次須要時動態從新計算它們。
·DISK_ONLY僅將RDD分區存儲在磁盤上。
·MEMORY_ONLY_2,MEMORY_AND_DISK_2等。與上面的級別相同,但複製兩個羣集節點上的每一個分區。
·OFF_HEAP(實驗)與MEMORY_ONLY_SER相似,但將數據存儲在堆外內存中。
·這須要啓用堆外內存。
·注意:在Python中,存儲的對象將始終使用Pickle庫進行序列化,所以您是否選擇序列化級別並不重要。
·Python中的可用存儲級別包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。
·即便沒有用戶調用持久性,Spark也會在隨機操做(例如reduceByKey)中自動保留一些中間數據。
·這樣作是爲了不在shuffle期間節點發生故障時從新計算整個輸入。
·咱們仍然建議用戶在生成的RDD上調用persist,若是他們計劃重用它。
Which Storage Level to Choose?(選擇哪一種存儲級別?)
·Spark的存儲級別旨在提供內存使用和CPU效率之間的不一樣折衷。
·咱們建議您經過如下流程選擇一個:
·若是您的RDD與默認存儲級別(MEMORY_ONLY)很溫馨,請保持這種狀態。
·這是CPU效率最高的選項,容許RDD上的操做盡量快地運行。
·若是沒有,請嘗試使用MEMORY_ONLY_SER並選擇快速序列化庫,以使對象更節省空間,但仍然能夠快速訪問。
·(Java和Scala)
·除非計算數據集的函數很昂貴,不然它們不會溢出到磁盤,或者它們會過濾大量數據。
·不然,從新計算分區可能與從磁盤讀取分區同樣快。
·若是要快速故障恢復,請使用複製的存儲級別(例如,若是使用Spark來處理來自Web應用程序的請求)。
·全部存儲級別經過從新計算丟失的數據提供徹底容錯,但複製的存儲級別容許您繼續在RDD上運行任務,而無需等待從新計算丟失的分區。
Removing Data(刪除數據)
·Spark會自動監視每一個節點上的緩存使用狀況,並以最近最少使用(LRU)的方式刪除舊數據分區。
·若是您想手動刪除RDD而不是等待它退出緩存,請使用RDD.unpersist()方法。
Shared Variables(共享變量)
·一般,當在遠程集羣節點上執行傳遞給Spark操做(例如map或reduce)的函數時,它將在函數中使用的全部變量的單獨副本上工做。
·這些變量將複製到每臺計算機,而且遠程計算機上的變量的更新不會傳播回驅動程序。
·支持跨任務的通用,讀寫共享變量效率低下。
·可是,Spark確實爲兩種常見的使用模式提供了兩種有限類型的共享變量:廣播變量和累加器。
Broadcast Variables(廣播變量)
·廣播變量容許程序員在每臺機器上保留一個只讀變量,而不是隨副本一塊兒發送它的副本。
·例如,它們可用於以有效的方式爲每一個節點提供大輸入數據集的副本。
·Spark還嘗試使用有效的廣播算法來分發廣播變量,以下降通訊成本。
·Spark動做經過一組階段執行,由分佈式「shuffle」操做分隔。
·Spark自動廣播每一個階段中任務所需的公共數據。
·以這種方式廣播的數據以序列化形式緩存並在運行每一個任務以前反序列化。
·這意味着顯式建立廣播變量僅在跨多個階段的任務須要相同數據或以反序列化形式緩存數據很重要時纔有用。
·經過調用SparkContext.broadcast(v)從變量v建立廣播變量。
·廣播變量是v的包裝器,能夠經過調用value方法訪問其值。
·下面的代碼顯示了這個:
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
·建立廣播變量後,應該在羣集上運行的任何函數中使用它而不是值v,這樣v不會屢次傳送到節點。
·另外,在廣播以後不該修改對象v,以便確保全部節點得到廣播變量的相同值(例如,若是稍後將變量發送到新節點)。
Accumulators(累加器)
·累加器是僅經過關聯和交換操做「添加」的變量,所以能夠並行有效地支持。
·它們可用於實現計數器(如MapReduce)或總和。
·Spark自己支持數值類型的累加器,程序員能夠添加對新類型的支持。
·做爲用戶,您能夠建立命名或未命名的累加器。
·以下圖所示,命名累加器(在此實例計數器中)將顯示在Web UI中,用於修改該累加器的階段。
·Spark顯示「任務」表中任務修改的每一個累加器的值。
跟蹤UI中的累加器對於理解運行階段的進度很是有用(注意:Python中尚不支持)。
·經過調用SparkContext.accumulator(v)從初始值v建立累加器。
·而後,可使用add方法或+ =運算符將在羣集上運行的任務添加到其中。
·可是,他們沒法讀懂它的價值。
·只有驅動程序可使用其value方法讀取累加器的值。
·下面的代碼顯示了一個累加器用於添加數組的元素:
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10
·雖然此代碼使用Int類型的累加器的內置支持,但程序員也能夠經過繼承AccumulatorParam來建立本身的類型。
·AccumulatorParam接口有兩種方法:零用於爲數據類型提供「零值」,addInPlace用於將兩個值一塊兒添加。
·例如,假設咱們有一個表示數學向量的Vector類,咱們能夠寫:
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
·對於僅在操做內執行的累加器更新,Spark保證每一個任務對累加器的更新僅應用一次,即從新啓動的任務不會更新該值。
·在轉換中,用戶應該知道,若是從新執行任務或做業階段,則能夠屢次應用每一個任務的更新。
·累加器不會改變Spark的惰性評估模型。
·若是在RDD上的操做中更新它們,則只有在RDD做爲操做的一部分計算時才更新它們的值。
·所以,在像map()這樣的惰性轉換中進行累積器更新時,不能保證執行累加器更新。
·如下代碼片斷演示了此屬性:
accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed.
Deploying to a Cluster(部署到羣集)
·應用程序提交指南介紹瞭如何將應用程序提交到羣集。
·簡而言之,一旦將應用程序打包到JAR(用於Java / Scala)或一組.py或.zip文件(用於Python),bin / spark-submit腳本容許您將其提交給任何支持的集羣管理器。
Launching Spark jobs from Java / Scala(從Java / Scala啓動Spark做業)
org.apache.spark.launcher包提供了使用簡單Java API將Spark做業做爲子進程啓動的類。java
Unit Testing(單元測試)
·Spark對任何流行的單元測試框架進行單元測試都很友好。
·只需在測試中建立一個SparkContext,主URL設置爲local,運行您的操做,而後調用SparkContext.stop()將其拆除。
·確保在finally塊或測試框架的tearDown方法中中止上下文,由於Spark不支持在同一程序中同時運行的兩個上下文。
Where to Go from Here(從這往哪兒走)
You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples
directory (Scala,Java, Python, R). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example
script; for instance:python
./bin/run-example SparkPi
對於Python示例,請使用spark-submit代替:mysql
./bin/spark-submit examples/src/main/python/pi.py
Spark SQL, DataFrames and Datasets Guide
·Spark SQL是用於結構化數據處理的Spark模塊。
·與基本的Spark RDD API不一樣,Spark SQL提供的接口爲Spark提供了有關數據結構和正在執行的計算的更多信息。
·在內部,Spark SQL使用此額外信息來執行額外的優化。
·有幾種與Spark SQL交互的方法,包括SQL和Dataset API。
·在計算結果時,使用相同的執行引擎,與您用於表達計算的API /語言無關。
·這種統一意味着開發人員能夠輕鬆地在不一樣的API之間來回切換,從而提供表達給定轉換的最天然的方式。
·此頁面上的全部示例都使用Spark分發中包含的示例數據,而且能夠在spark-shell,pyspark shell或sparkR shell中運行。
SQL
·Spark SQL的一個用途是執行SQL查詢。
·Spark SQL還可用於從現有Hive安裝中讀取數據。
·有關如何配置此功能的更多信息,請參閱Hive Tables部分。
·從其餘編程語言中運行SQL時,結果將做爲數據集/數據框返回。
·您還可使用命令行或JDBC / ODBC與SQL接口進行交互。
Datasets and DataFrames
·數據集是分佈式數據集合。
·數據集是Spark 1.6中添加的一個新接口,它提供了RDD的優點(強類型,使用強大的lambda函數的能力)和Spark SQL優化執行引擎的優勢。
·數據集能夠從JVM對象構造,而後使用功能轉換(map,flatMap,filter等)進行操做。
·數據集API在Scala和Java中可用。
·Python沒有對Dataset API的支持。
·但因爲Python的動態特性,數據集API的許多好處已經可用(即您能夠經過名稱天然地訪問行的字段row.columnName)。
·R的狀況相似。
·DataFrame是一個組織成命名列的數據集。
·它在概念上等同於關係數據庫中的表或R / Python中的數據框,但在底層具備更豐富的優化。
·DataFrame能夠從多種來源構建,例如:結構化數據文件,Hive中的表,外部數據庫或現有RDD。
·DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行數據集表示。
·在Scala API中,DataFrame只是Dataset [Row]的類型別名。
·而在Java API中,用戶須要使用Dataset 來表示DataFrame。
·在本文檔中,咱們常常將行的Scala / Java數據集稱爲DataFrame。
Getting Started(入門)
Starting Point: SparkSession(起點:SparkSession)
·Spark中全部功能的入口點是SparkSession類。
·要建立基本的SparkSession,只需使用SparkSession.builder:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
·在Spark repo中的「examples / src / main / python / sql / basic.py」中找到完整的示例代碼。
·Spark 2.0中的SparkSession爲Hive功能提供內置支持,包括使用HiveQL編寫查詢,訪問Hive UDF以及從Hive表讀取數據的功能。
·要使用這些功能,您無需擁有現有的Hive設置。
Creating DataFrames(建立DataFrame)
·使用SparkSession,應用程序能夠從現有RDD,Hive表或Spark數據源建立DataFrame。
·做爲示例,如下內容基於JSON文件的內容建立DataFrame:
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Untyped Dataset Operations (aka DataFrame Operations)無類型數據集操做(又名DataFrame操做)
·DataFrames爲Scala,Java,Python和R中的結構化數據操做提供特定於域的語言。
·如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中Rows的數據集。
·與「類型轉換」相比,這些操做也稱爲「無類型轉換」,帶有強類型Scala / Java數據集。
·這裏咱們包括使用數據集進行結構化數據處理的一些基本示例:
·在Python中,能夠經過屬性(df.age)或索引(df ['age'])訪問DataFrame的列。
·雖然前者便於交互式數據探索,但強烈建議用戶使用後一種形式,這是將來的證實,不會破壞也是DataFrame類屬性的列名。
# spark, df are from the previous example
# Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
·在Spark repo中的「examples / src / main / python / sql / basic.py」中找到完整的示例代碼。
·有關可在DataFrame上執行的操做類型的完整列表,請參閱API文檔。
·除了簡單的列引用和表達式以外,DataFrame還具備豐富的函數庫,包括字符串操做,日期算術,常見的數學運算等。
·完整列表可在DataFrame函數參考中找到。
Running SQL Queries Programmatically(以編程方式運行SQL查詢)
SparkSession上的sql函數使應用程序可以以編程方式運行SQL查詢並將結果做爲DataFrame返回。git
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Global Temporary View(全球臨時觀點)
·Spark SQL中的臨時視圖是會話範圍的,若是建立它的會話終止,它將消失。
·若是您但願擁有一個在全部會話之間共享的臨時視圖並保持活動狀態,直到Spark應用程序終止,您能夠建立一個全局臨時視圖。
·全局臨時視圖與系統保留的數據庫global_temp綁定,咱們必須使用限定名稱來引用它,例如
·SELECT * FROM global_temp.view1。
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Creating Datasets(建立數據集)
·數據集與RDD相似,可是,它們不使用Java序列化或Kryo,而是使用專用的編碼器來序列化對象以便經過網絡進行處理或傳輸。
·雖然編碼器和標準序列化都負責將對象轉換爲字節,但編碼器是動態生成的代碼,並使用一種格式,容許Spark執行許多操做,如過濾,排序和散列,而無需將字節反序列化爲對象。
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Interoperating with RDDs(與RDD互操做)
·Spark SQL支持兩種不一樣的方法將現有RDD轉換爲數據集。
·第一種方法使用反射來推斷包含特定類型對象的RDD的模式。
·這種基於反射的方法能夠提供更簡潔的代碼,而且在您編寫Spark應用程序時已經瞭解模式時能夠很好地工做。
·建立數據集的第二種方法是經過編程接口,容許您構建模式,而後將其應用於現有RDD。
·雖然此方法更詳細,但它容許您在直到運行時才知道列及其類型時構造數據集。
Inferring the Schema Using Reflection(使用反射推斷模式)
·Spark SQL能夠將Row對象的RDD轉換爲DataFrame,從而推斷出數據類型。
·經過將鍵/值對列表做爲kwargs傳遞給Row類來構造行。
·此列表的鍵定義表的列名稱,並經過對整個數據集進行採樣來推斷類型,相似於對JSON文件執行的推斷
from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Programmatically Specifying the Schema(以編程方式指定架構)
·當沒法提早定義kwargs字典時(例如,記錄結構以字符串形式編碼,或者文本數據集將被解析,字段將以不一樣方式爲不一樣用戶進行投影),可使用編程方式建立DataFrame
·三個步驟。
·從原始RDD建立元組或列表的RDD;
·建立由StructType表示的模式,該模式與步驟1中建立的RDD中的元組或列表的結構相匹配。
·經過SparkSession提供的createDataFrame方法將模式應用於RDD。
例如:程序員
# Import data types
from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Aggregations(聚合)
·內置的DataFrames函數提供常見的聚合,如count(),countDistinct(),avg(),max(),min()等。雖然這些函數是爲DataFrames設計的,但Spark SQL也有類型安全的版本
·其中一些在Scala和Java中使用強類型數據集。
·此外,用戶不限於預約義的聚合函數,而且能夠建立本身的聚合函數。
Untyped User-Defined Aggregate Functions(無用戶定義的聚合函數)
·用戶必須擴展UserDefinedAggregateFunction抽象類以實現自定義無類型聚合函數。
·例如,用戶定義的平均值可能以下所示:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong