Spark譯文(一)

Spark Overview(Spark概述)

·Apache Spark是一種快速通用的集羣計算系統。
·它提供Java,Scala,Python和R中的高級API,以及支持通用執行圖的優化引擎。
·它還支持豐富的高級工具集,包括用於SQL和結構化數據處理的Spark SQL,用於機器學習的MLlib,用於圖形處理的GraphX和Spark Streaming

Security(安全性)

·Spark中的安全性默認爲OFF。
·這可能意味着您很容易受到默認攻擊。
·在下載和運行Spark以前,請參閱Spark Security

Downloading

·從項目網站的下載頁面獲取Spark。
·本文檔適用於Spark版本2.4.2。
·Spark使用Hadoop的客戶端庫來實現HDFS和YARN。
·下載是針對少數流行的Hadoop版本預先打包的。
·用戶還能夠經過增長Spark的類路徑下載「Hadoop免費」二進制文件並使用任何Hadoop版本運行Spark。
·Scala和Java用戶可使用Maven座標在他們的項目中包含Spark,而且未來Python用戶也能夠從PyPI安裝Spark。
·若是您想從源代碼構建Spark,請訪問Building Spark。
·Spark在Windows和類UNIX系統(例如Linux,Mac OS)上運行。
·在一臺機器上本地運行很容易 - 您只須要在系統PATH上安裝Java,或者指向Java安裝的JAVA_HOME環境變量。
·Spark運行在Java 8 +,Python 2.7 + / 3.4 +和R 3.1+上。
·對於Scala API,Spark 2.4.2使用Scala 2.12。
·您須要使用兼容的Scala版本(2.12.x)。
·請注意,自Spark 2.2.0起,對2.6.5以前的Java 7,Python 2.6和舊Hadoop版本的支持已被刪除。
·自2.3.0起,對Scala 2.10的支持被刪除。
·自Spark 2.4.1起,對Scala 2.11的支持已被棄用,將在Spark 3.0中刪除。

Running the Examples and Shell(運行示例和Shell)

·Spark附帶了幾個示例程序。
·Scala,Java,Python和R示例位於examples / src / main目錄中。
·要運行其中一個Java或Scala示例程序,請在頂級Spark目錄中使用bin / run-example [params]。
·(在幕後,這將調用更經常使用的spark-submit腳原本啓動應用程序)。
·例如
./bin/run-example SparkPi 10
·您還能夠經過Scala shell的修改版本以交互方式運行Spark。
·這是學習框架的好方法。
./bin/spark-shell --master local[2]
·--master選項指定分佈式集羣的主URL,或本地在一個線程上本地運行,或本地[N]在本地運行N個線程。
·您應該首先使用local進行測試。
·有關選項的完整列表,請使用--help選項運行Spark shell。
·Spark還提供了一個Python API。
·要在Python解釋器中以交互方式運行Spark,請使用bin / pyspark:
./bin/pyspark --master local[2]
·Python中也提供了示例應用程序。
·例如:
./bin/spark-submit examples/src/main/python/pi.py 10

Quick Start(快速開始)

·本教程簡要介紹瞭如何使用Spark。
·咱們將首先經過Spark的交互式shell(在Python或Scala中)介紹API,而後展現如何使用Java,Scala和Python編寫應用程序。
·要繼續本指南,首先,從Spark網站下載Spark的打包版本。
·因爲咱們不會使用HDFS,您能夠下載任何版本的Hadoop的軟件包。
·請注意,在Spark 2.0以前,Spark的主要編程接口是Resilient Distributed Dataset(RDD)。
·在Spark 2.0以後,RDD被數據集取代,數據集像RDD同樣強類型,但在底層有更豐富的優化。
·仍然支持RDD接口,您能夠在RDD編程指南中得到更詳細的參考。
·可是,咱們強烈建議您切換到使用數據集,它具備比RDD更好的性能。
·請參閱SQL編程指南以獲取有關數據集的更多信息

Interactive Analysis with the Spark Shell(使用Spark Shell進行交互式分析)

Basics(基本)

·Spark的shell提供了一種學習API的簡單方法,以及一種以交互方式分析數據的強大工具。
·它能夠在Scala(在Java VM上運行,所以是使用現有Java庫的好方法)或Python中使用。
·經過在Spark目錄中運行如下命令來啓動它:


或者若是在當前環境中使用pip安裝了PySpark:./bin/pyspark
pyspark
·Spark的主要抽象是一個名爲Dataset的分佈式項目集合。
·能夠從Hadoop InputFormats(例如HDFS文件)或經過轉換其餘數據集來建立數據集。
·因爲Python的動態特性,咱們不須要在Python中強類型數據集。
·所以,Python中的全部數據集都是Dataset [Row],咱們稱之爲DataFrame與Pandas和R中的數據框概念一致。讓咱們從Spark源目錄中的README文件的文本中建立一個新的DataFrame:
>>> textFile = spark.read.text("README.md")
·您能夠經過調用某些操做直接從DataFrame獲取值,也能夠轉換DataFrame以獲取新值。
·有關更多詳細信息,請閱讀API文檔。
>>> textFile.count() # Number of rows in this DataFrame 126 >>> textFile.first() # First row in this DataFrame Row(value=u'# Apache Spark')
·如今讓咱們將這個DataFrame轉換爲一個新的DataFrame。
·咱們調用filter來返回一個新的DataFrame,其中包含文件中的一行子集。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
咱們能夠將轉換和行動聯繫在一塊兒:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15

More on Dataset Operations(有關數據集操做的更多信息)

·數據集操做和轉換可用於更復雜的計算。
·假設咱們想要找到含有最多單詞的行:
>>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
·這首先將一行映射爲整數值,並將其別名爲「numWords」,從而建立一個新的DataFrame。
·在該DataFrame上調用agg以查找最大字數。
·select和agg的參數都是Column,咱們可使用df.colName從DataFrame中獲取一列。
·咱們還能夠導入pyspark.sql.functions,它提供了許多方便的功能來從舊的列構建一個新的列。
·一個常見的數據流模式是MapReduce,由Hadoop推廣。
·Spark能夠輕鬆實現MapReduce流程:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
·在這裏,咱們使用select中的explode函數,將行數據集轉換爲單詞數據集,而後將groupBy和count結合起來計算文件中的每一個單詞計數,做爲2列的DataFrame:「word」和「
·計數」。
·要在咱們的shell中收集單詞count,咱們能夠調用collect:
>>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

Caching(高速緩存)

·Spark還支持將數據集提取到羣集範圍的內存緩存中。
·這在重複訪問數據時很是有用,例如查詢小的「熱」數據集或運行像PageRank這樣的迭代算法時。
·舉個簡單的例子,讓咱們標記要緩存的linesWithSpark數據集:
>>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15
·使用Spark來探索和緩存100行文本文件彷佛很愚蠢。
·有趣的是,這些相同的功能可用於很是大的數據集,即便它們跨越數十個或數百個節點進行條帶化。
·您也能夠經過將bin / pyspark鏈接到羣集來交互式地執行此操做,如RDD編程指南中所述。

Self-Contained Applications(自包含的應用程序)

·假設咱們但願使用Spark API編寫一個自包含的應用程序。
·咱們將在Scala(使用sbt),Java(使用Maven)和Python(pip)中使用簡單的應用程序。
·如今咱們將展現如何使用Python API(PySpark)編寫應用程序。
·若是要構建打包的PySpark應用程序或庫,能夠將其添加到setup.py文件中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
做爲示例,咱們將建立一個簡單的Spark應用程序SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
·該程序只計算包含'a'的行數和包含文本文件中'b'的數字。
·請注意,您須要將YOUR_SPARK_HOME替換爲安裝Spark的位置。
·與Scala和Java示例同樣,咱們使用SparkSession來建立數據集。
·對於使用自定義類或第三方庫的應用程序,咱們還能夠經過將它們打包到.zip文件中來添加代碼依賴關係以經過其--py-files參數進行spark-submit(有關詳細信息,請參閱spark-submit --help)。
·SimpleApp很是簡單,咱們不須要指定任何代碼依賴項。
咱們可使用bin / spark-submit腳本運行此應用程序:


若是您的環境中安裝了PySpark pip(例如,pip install pyspark),您可使用常規Python解釋器運行您的應用程序,或者根據您的喜愛使用提供的「spark-submit」。# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
# Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23

RDD Programming Guide(RDD編程指南)

Overview(概觀)

·在較高的層次上,每一個Spark應用程序都包含一個驅動程序,該程序運行用戶的主要功能並在羣集上執行各類並行操做。
·Spark提供的主要抽象是彈性分佈式數據集(RDD),它是跨羣集節點分區的元素的集合,能夠並行操做。
·RDD是經過從Hadoop文件系統(或任何其餘Hadoop支持的文件系統)中的文件或驅動程序中的現有Scala集合開始並對其進行轉換而建立的。
·用戶還能夠要求Spark在內存中保留RDD,容許它在並行操做中有效地重用。
·最後,RDD會自動從節點故障中恢復。
·Spark中的第二個抽象是能夠在並行操做中使用的共享變量。
·默認狀況下,當Spark並行運行一個函數做爲不一樣節點上的一組任務時,它會將函數中使用的每一個變量的副本發送給每一個任務。
·有時,變量須要跨任務共享,或者在任務和驅動程序之間共享。
·Spark支持兩種類型的共享變量:廣播變量,可用於緩存全部節點的內存中的值;累加器,它們是僅「添加」到的變量,例如計數器和總和。
·本指南以Spark支持的每種語言顯示了這些功能。
·若是你啓動Spark的交互式shell,最簡單的方法就是 - 用於Scala shell的bin / spark-shell或用於Python的bin / pyspark。

Linking with Spark(與Spark連接)

·Spark 2.4.2適用於Python 2.7+或Python 3.4+。
·它可使用標準的CPython解釋器,所以可使用像NumPy這樣的C庫。
·它也適用於PyPy 2.3+。
·Spark 2.2.0中刪除了Python 2.6支持。
·Python中的Spark應用程序可使用bin / spark-submit腳本運行,該腳本在運行時包含Spark,也能夠將其包含在setup.py中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
·要在不使用pip安裝PySpark的狀況下在Python中運行Spark應用程序,請使用位於Spark目錄中的bin / spark-submit腳本。
·此腳本將加載Spark的Java / Scala庫,並容許您將應用程序提交到羣集。
·您還可使用bin / pyspark來啓動交互式Python shell。
·若是您但願訪問HDFS數據,則須要使用PySpark構建連接到您的HDFS版本。
·Spark主頁上還提供了預構建的軟件包,可用於常見的HDFS版本。
·最後,您須要將一些Spark類導入到您的程序中。
·添加如下行:
from pyspark import SparkContext, SparkConf
·PySpark在驅動程序和工做程序中都須要相同的次要版本的Python。
·它使用PATH中的默認python版本,您能夠指定PYSPARK_PYTHON要使用的Python版本,例如:
$ PYSPARK_PYTHON=python3.4 bin/pyspark $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

Initializing Spark(初始化Spark)

·Spark程序必須作的第一件事是建立一個SparkContext對象,它告訴Spark如何訪問集羣。
·要建立SparkContext,首先須要構建一個包含有關應用程序信息的SparkConf對象。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
·appName參數是應用程序在集羣UI上顯示的名稱。
·master是Spark,Mesos或YARN羣集URL,或者是以本地模式運行的特殊「本地」字符串。
·實際上,在羣集上運行時,您不但願在程序中對master進行硬編碼,而是使用spark-submit啓動應用程序並在那裏接收它。
·可是,對於本地測試和單元測試,您能夠傳遞「local」以在進程中運行Spark。

Using the Shell(使用Shell)

·在PySpark shell中,已經爲你建立了一個特殊的解釋器感知SparkContext,名爲sc。
·製做本身的SparkContext將沒法正常工做。
·您可使用--master參數設置上下文鏈接到的主服務器,而且能夠經過將逗號分隔的列表傳遞給--py-files將Python .zip,.egg或.py文件添加到運行時路徑。
·您還能夠經過向--packages參數提供以逗號分隔的Maven座標列表,將依賴項(例如Spark包)添加到shell會話中。
·任何可能存在依賴關係的其餘存儲庫(例如Sonatype)均可以傳遞給--repositories參數。
·必要時,必須使用pip手動安裝Spark軟件包具備的任何Python依賴項(在該軟件包的requirements.txt中列出)。
·例如,要在四個核心上運行bin / pyspark,請使用:


或者,要將code.py添加到搜索路徑(以便之後可以導入代碼),請使用:$ ./bin/pyspark --master local[4]
$ ./bin/pyspark --master local[4] --py-files code.py
·有關選項的完整列表,請運行pyspark --help。
·在幕後,pyspark調用更通常的spark-submit腳本。
·也能夠在加強的Python解釋器IPython中啓動PySpark shell。
·PySpark適用於IPython 1.0.0及更高版本。
·要使用IPython,請在運行bin / pyspark時將PYSPARK_DRIVER_PYTHON變量設置爲ipython:


要使用Jupyter notebook(之前稱爲IPython notebook)$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
·您能夠經過設置PYSPARK_DRIVER_PYTHON_OPTS來自定義ipython或jupyter命令。
·啓動Jupyter Notebook服務器後,您能夠從「文件」選項卡建立一個新的「Python 2」筆記本。
·在筆記本內部,您能夠在開始嘗試使用Jupyter notebook中的Spark以前輸入命令%pylab inline做爲筆記本的一部分。

Resilient Distributed Datasets (彈性分佈式數據集)(RDDs)

·Spark圍繞彈性分佈式數據集(RDD)的概念展開,RDD是一個能夠並行操做的容錯的容錯集合。
·建立RDD有兩種方法:並行化驅動程序中的現有集合,或引用外部存儲系統中的數據集,例如共享文件系統,HDFS,HBase或提供Hadoop InputFormat的任何數據源。

Parallelized Collections(並行化集合)

·經過在驅動程序中的現有可迭代或集合上調用SparkContext的parallelize方法來建立並行化集合。
·複製集合的元素以造成能夠並行操做的分佈式數據集。
·例如,如下是如何建立包含數字1到5的並行化集合:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
·一旦建立,分佈式數據集(distData)能夠並行操做。
·例如,咱們能夠調用distData.reduce(lambda a,b:a + b)來添加列表的元素。
·咱們稍後將描述對分佈式數據集的操做。
·並行集合的一個重要參數是將數據集切割爲的分區數。
·Spark將爲羣集的每一個分區運行一個任務。
·一般,您但願羣集中的每一個CPU有2-4個分區。
·一般,Spark會嘗試根據您的羣集自動設置分區數。
·可是,您也能夠經過將其做爲第二個參數傳遞給並行化來手動設置它(例如sc.parallelize(data,10))。
·注意:代碼中的某些位置使用術語切片(分區的同義詞)來保持向後兼容性。

External Datasets(外部數據集)

·PySpark能夠從Hadoop支持的任何存儲源建立分佈式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等.Spark支持文本文件,SequenceFiles和任何其餘Hadoop InputFormat。
·可使用SparkContext的textFile方法建立文本文件RDD。
·此方法獲取文件的URI(計算機上的本地路徑,或hdfs://,s3a://等URI)並將其做爲行集合讀取。
·這是一個示例調用:
>>> distFile = sc.textFile("data.txt")
·建立後,distFile能夠由數據集操做執行。
·例如,咱們可使用map添加全部行的大小,並按以下方式減小操做:distFile.map(lambda s:len(s))。reduce(lambda a,b:a + b)。
·有關使用Spark讀取文件的一些注意事項
·若是在本地文件系統上使用路徑,則還必須能夠在工做節點上的相同路徑上訪問該文件。
·將文件複製到全部工做者或使用網絡安裝的共享文件系統。
·Spark的全部基於文件的輸入方法(包括textFile)都支持在目錄,壓縮文件和通配符上運行。
·例如,您可使用textFile(「/ my / directory」),textFile(「/ my / directory / * .txt」)和textFile(「/ my / directory / * .gz」)。
·textFile方法還採用可選的第二個參數來控制文件的分區數。
·默認狀況下,Spark爲文件的每一個塊建立一個分區(HDFS中默認爲128MB),但您也能夠經過傳遞更大的值來請求更多的分區。
·請注意,您不能擁有比塊少的分區。
·除文本文件外,Spark的Python API還支持其餘幾種數據格式:
·SparkContext.wholeTextFiles容許您讀取包含多個小文本文件的目錄,並將它們做爲(文件名,內容)對返回。
·這與textFile造成對比,textFile將在每一個文件中每行返回一條記錄。
·RDD.saveAsPickleFile和SparkContext.pickleFile支持以包含pickle Python對象的簡單格式保存RDD。
·批處理用於pickle序列化,默認批處理大小爲10。
·SequenceFile和Hadoop輸入/輸出格式
·請注意,此功能目前標記爲「實驗」,適用於高級用戶。
·未來可能會使用基於Spark SQL的讀/寫支持替換它,在這種狀況下,Spark SQL是首選方法。
·可寫支持
·PySpark SequenceFile支持在Java中加載鍵值對的RDD,將Writable轉換爲基本Java類型,並使用Pyrolite挖掘生成的Java對象。
·將鍵值對的RDD保存到SequenceFile時,PySpark會反過來。
·它將Python對象解開爲Java對象,而後將它們轉換爲Writable。
·如下Writable會自動轉換:
Writable Type(可寫類型) Python Type
Text unicode str
IntWritable int
FloatWritable float
DoubleWritable float
BooleanWritable bool
BytesWritable bytearray
NullWritable None
MapWritable dict
·數組不是開箱即用的。
·用戶在讀取或寫入時須要指定自定義ArrayWritable子類型。
·編寫時,用戶還須要指定將數組轉換爲自定義ArrayWritable子類型的自定義轉換器。
·在讀取時,默認轉換器將自定義ArrayWritable子類型轉換爲Java Object [],而後將其pickle到Python元組。
·要爲原始類型的數組獲取Python array.array,用戶須要指定自定義轉換器。
Saving and Loading SequenceFiles(保存和加載SequenceFiles)
·與文本文件相似,能夠經過指定路徑來保存和加載SequenceFiles。
·能夠指定鍵和值類,但對於標準Writable,這不是必需的。
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
·請注意,若是InputFormat僅依賴於Hadoop配置和/或輸入路徑,而且能夠根據上表輕鬆轉換鍵和值類,則此方法應適用於此類狀況。
·若是您有自定義序列化二進制數據(例如從Cassandra / HBase加載數據),那麼您首先須要將Scala / Java端的數據轉換爲可由Pyrolite的pickler處理的數據。
·爲此提供了轉換器特性。
·只需擴展此特徵並在convert方法中實現轉換代碼。
·請記住確保將此類以及訪問InputFormat所需的任何依賴項打包到Spark做業jar中幷包含在PySpark類路徑中。
·有關使用Cassandra / HBase InputFormat和OutputFormat以及自定義轉換器的示例,請參閱Python示例和Converter示例。

RDD Operations(RDD操做)

·RDD支持兩種類型的操做:轉換(從現有數據集建立新數據集)和操做(在數據集上運行計算後將值返回到驅動程序)。
·例如,map是一個轉換,它經過一個函數傳遞每一個數據集元素,並返回一個表示結果的新RDD。
·另外一方面,reduce是一個使用某個函數聚合RDD的全部元素的操做,並將最終結果返回給驅動程序(儘管還有一個返回分佈式數據集的並行reduceByKey)。
·Spark中的全部轉換都是惰性的,由於它們不會當即計算結果。
·相反,他們只記得應用於某些基礎數據集(例如文件)的轉換。
·僅當操做須要將結果返回到驅動程序時纔會計算轉換。
·這種設計使Spark可以更有效地運行。
·例如,咱們能夠意識到經過map建立的數據集將用於reduce,而且僅將reduce的結果返回給驅動程序,而不是更大的映射數據集。
·默認狀況下,每次對其執行操做時,均可以從新計算每一個轉換後的RDD。
·可是,您也可使用持久化(或緩存)方法在內存中保留RDD,在這種狀況下,Spark會在羣集上保留元素,以便在下次查詢時更快地訪問。
·還支持在磁盤上保留RDD或在多個節點上覆制。

Basics(基本)

爲了說明RDD基礎知識,請考慮如下簡單程序:html

lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
·第一行定義來自外部文件的基本RDD。
·此數據集未加載到內存中或以其餘方式執行:行僅僅是指向文件的指針。
·第二行將lineLengths定義爲地圖轉換的結果。
·一樣,因爲懶惰,lineLengths不會當即計算。
·最後,咱們運行reduce,這是一個動做。
·此時,Spark將計算分解爲在不一樣機器上運行的任務,而且每臺機器都運行其部分映射和本地縮減,僅返回其對驅動程序的答案。
·若是咱們之後想再次使用lineLengths,咱們能夠添加:


在reduce以前,這將致使lineLengths在第一次計算以後保存在內存中。lineLengths.persist()

Passing Functions to Spark(將函數傳遞給Spark)

·Spark的API在很大程度上依賴於在驅動程序中傳遞函數以在集羣上運行。
·有三種建議的方法能夠作到這一點:
·Lambda表達式,用於能夠做爲表達式編寫的簡單函數。
·(Lambdas不支持多語句函數或不返回值的語句。)
·調用Spark的函數內部的本地defs,用於更長的代碼。
·模塊中的頂級函數。
·例如,要傳遞比使用lambda支持的更長的函數,請考慮如下代碼:
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
·請注意,雖然也能夠將引用傳遞給類實例中的方法(而不是單例對象),但這須要發送包含該類的對象以及方法。
·例如,考慮:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
·在這裏,若是咱們建立一個新的MyClass並在其上調用doStuff,那裏的map會引用該MyClass實例的func方法,所以須要將整個對象發送到集羣。
·以相似的方式,訪問外部對象的字段將引用整個對象:
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s)
要避免此問題,最簡單的方法是將字段複製到局部變量中,而不是從外部訪問它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + s)

Understanding closures(理解閉包)

·Spark的一個難點是在跨集羣執行代碼時理解變量和方法的範圍和生命週期。
·修改其範圍以外的變量的RDD操做可能常常引發混淆。
·在下面的示例中,咱們將查看使用foreach()遞增計數器的代碼,但一樣的問題也可能發生在其餘操做中。

Example

·考慮下面的天真RDD元素總和,根據執行是否在同一JVM中發生,它可能表現不一樣。
·一個常見的例子是在本地模式下運行Spark(--master = local [n])而不是將Spark應用程序部署到集羣(例如經過spark-submit to YARN):
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)

Local vs. cluster modes(本地與羣集模式)

·上述代碼的行爲未定義,可能沒法按預期工做。
·爲了執行做業,Spark將RDD操做的處理分解爲任務,每一個任務都由執行程序執行。
·在執行以前,Spark計算任務的閉包。
·閉包是那些變量和方法,它們必須是可見的,以便執行程序在RDD上執行其計算(在本例中爲foreach())。
·該閉包被序列化併發送給每一個執行者。
·發送給每一個執行程序的閉包內的變量如今是副本,所以,當在foreach函數中引用計數器時,它再也不是驅動程序節點上的計數器。
·驅動程序節點的內存中仍然有一個計數器,但執行程序再也不可見!
·執行程序只能看到序列化閉包中的副本。
·所以,計數器的最終值仍然爲零,由於計數器上的全部操做都引用了序列化閉包內的值。
·在本地模式下,在某些狀況下,foreach函數實際上將在與驅動程序相同的JVM中執行,並將引用相同的原始計數器,而且可能實際更新它。
·爲了確保在這些場景中定義良好的行爲,應該使用累加器。
·Spark中的累加器專門用於提供一種機制,用於在跨集羣中的工做節點拆分執行時安全地更新變量。
·本指南的「累加器」部分更詳細地討論了這些內容。
·一般,閉包 - 相似循環或本地定義的方法的構造不該該用於改變某些全局狀態。
·Spark沒有定義或保證從閉包外部引用的對象的突變行爲。
·執行此操做的某些代碼可能在本地模式下工做,但這只是偶然的,而且此類代碼在分佈式模式下不會按預期運行。
·若是須要某些全局聚合,請使用累加器。

Printing elements of an RDD(打印RDD的元素)

·另外一個常見的習慣用法是嘗試使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。
·在一臺機器上,這將生成預期的輸出並打印全部RDD的元素。
·可是,在集羣模式下,執行程序調用的stdout輸出如今寫入執行程序的stdout,而不是驅動程序上的那個,所以驅動程序上的stdout不會顯示這些!
·要打印驅動程序上的全部元素,可使用collect()方法首先將RDD帶到驅動程序節點:rdd.collect()。foreach(println)。
·可是,這會致使驅動程序內存不足,由於collect()會將整個RDD提取到一臺機器上;
·若是你只須要打印RDD的一些元素,更安全的方法是使用take():rdd.take(100).foreach(println)。

Working with Key-Value Pairs(使用鍵值對)

·雖然大多數Spark操做都適用於包含任何類型對象的RDD,但一些特殊操做僅適用於鍵值對的RDD。
·最多見的是分佈式「隨機」操做,例如經過密鑰對元素進行分組或聚合。
·在Python中,這些操做適用於包含內置Python元組的RDD,如(1,2)。
·只需建立這樣的元組,而後調用您想要的操做。
·例如,如下代碼對鍵值對使用reduceByKey操做來計算文件中每行文本出現的次數:


例如,咱們也可使用counts.sortByKey()來按字母順序對這些對進行排序,最後使用counts.collect()將它們做爲對象列表返回到驅動程序。lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)

Transformations(轉換)

·下表列出了Spark支持的一些常見轉換。
·有關詳細信息,請參閱RDD API文檔(Scala,Java,Python,R)並配對RDD函數doc(Scala,Java)。
·轉型意義
·map(func)返回經過函數func傳遞源的每一個元素造成的新分佈式數據集。
·filter(func)返回經過選擇func返回true的源元素造成的新數據集。
·flatMap(func)與map相似,但每一個輸入項能夠映射到0個或更多輸出項(所以func應返回Seq而不是單個項)。
·mapPartitions(func)與map相似,但在RDD的每一個分區(塊)上單獨運行,所以當在類型T的RDD上運行時,func必須是Iterator => Iterator  類型。
·mapPartitionsWithIndex(func)與mapPartitions相似,但也爲func提供了一個表示分區索引的整數值,所以當在RDD類型上運行時,func必須是類型(Int,Iterator )=> Iterator
·T.
·sample(withReplacement,fraction,seed)使用給定的隨機數生成器種子,使用或不使用替換對數據的一小部分進行採樣。
·union(otherDataset)返回一個新數據集,其中包含源數據集和參數中元素的並集。
·intersection(otherDataset)返回包含源數據集和參數中元素交集的新RDD。
·distinct([numPartitions]))返回包含源數據集的不一樣元素的新數據集。
·groupByKey([numPartitions])在(K,V)對的數據集上調用時,返回(K,Iterable )對的數據集。
·注意:若是要對每一個鍵執行聚合(例如總和或平均值)進行分組,則使用reduceByKey或aggregateByKey將產生更好的性能。
·注意:默認狀況下,輸出中的並行級別取決於父RDD的分區數。
·您能夠傳遞可選的numPartitions參數來設置不一樣數量的任務。
·reduceByKey(func,[numPartitions])當在(K,V)對的數據集上調用時,返回(K,V)對的數據集,其中使用給定的reduce函數func聚合每一個鍵的值,該函數必須是
·type(V,V)=> V.與groupByKey相似,reduce任務的數量可經過可選的第二個參數進行配置。
·aggregateByKey(zeroValue)(seqOp,combOp,[numPartitions])在(K,V)對的數據集上調用時,返回(K,U)對的數據集,其中使用給定的組合函數聚合每一個鍵的值,
·中性的「零」值。
·容許與輸入值類型不一樣的聚合值類型,同時避免沒必要要的分配。
·與groupByKey相似,reduce任務的數量可經過可選的第二個參數進行配置。
·sortByKey([ascending],[numPartitions])在K實現Ordered的(K,V)對的數據集上調用時,返回按鍵按升序或降序排序的(K,V)對數據集,如
·布爾升序參數。
·join(otherDataset,[numPartitions])當調用類型爲(K,V)和(K,W)的數據集時,返回(K,(V,W))對的數據集以及每一個鍵的全部元素對。
·經過leftOuterJoin,rightOuterJoin和fullOuterJoin支持外鏈接。
·cogroup(otherDataset,[numPartitions])當調用類型爲(K,V)和(K,W)的數據集時,返回(K,(Iterable ,Iterable ))元組的數據集。
·此操做也稱爲groupWith。
·cartesian(otherDataset)當調用類型爲T和U的數據集時,返回(T,U)對的數據集(全部元素對)。
·pipe(command,[envVars])經過shell命令管道RDD的每一個分區,例如:
·一個Perl或bash腳本。
·RDD元素被寫入進程的stdin,而且輸出到其stdout的行將做爲字符串的RDD返回。
·coalesce(numPartitions)將RDD中的分區數減小爲numPartitions。
·過濾大型數據集後,能夠更有效地運行操做。
·repartition(numPartitions)隨機從新調整RDD中的數據以建立更多或更少的分區並在它們之間進行平衡。
·這老是隨機播放網絡上的全部數據。
·repartitionAndSortWithinPartitions(partitioner)根據給定的分區程序從新分區RDD,並在每一個生成的分區中按鍵對記錄進行排序。
·這比調用從新分區而後在每一個分區內排序更有效,由於它能夠將排序推送到shuffle機器中。

Actions(動做)

·下表列出了Spark支持的一些常見操做。
·請參閱RDD API文檔(Scala,Java,Python,R)
·並配對RDD函數doc(Scala,Java)以獲取詳細信息。
·行動意義
·reduce(func)使用函數func(它接受兩個參數並返回一個)來聚合數據集的元素。
·該函數應該是可交換的和關聯的,以即可以並行正確計算。
·collect()在驅動程序中將數據集的全部元素做爲數組返回。
·在過濾器或其餘返回足夠小的數據子集的操做以後,這一般頗有用。
·count()返回數據集中的元素數。
·first()返回數據集的第一個元素(相似於take(1))。
·take(n)返回包含數據集的前n個元素的數組。
·takeSample(withReplacement,num,[seed])返回一個數組,其中包含數據集的num個元素的隨機樣本,有或沒有替換,可選地預先指定隨機數生成器種子。
·takeOrdered(n,[ordering])使用天然順序或自定義比較器返回RDD的前n個元素。
·saveAsTextFile(path)將數據集的元素寫爲本地文件系統,HDFS或任何其餘Hadoop支持的文件系統中給定目錄中的文本文件(或文本文件集)。
·Spark將在每一個元素上調用toString,將其轉換爲文件中的一行文本。
·saveAsSequenceFile(路徑)
·(Java和Scala)將數據集的元素做爲Hadoop SequenceFile寫入本地文件系統,HDFS或任何其餘Hadoop支持的文件系統中的給定路徑中。
·這能夠在實現Hadoop的Writable接口的鍵值對的RDD上使用。
·在Scala中,它也能夠在可隱式轉換爲Writable的類型上使用(Spark包括基本類型的轉換,如Int,Double,String等)。
·saveAsObjectFile(路徑)
·(Java和Scala)使用Java序列化以簡單格式編寫數據集的元素,而後可使用SparkContext.objectFile()加載它。
·countByKey()僅適用於類型爲(K,V)的RDD。
·返回(K,Int)對的散列映射,其中包含每一個鍵的計數。
·foreach(func)對數據集的每一個元素運行函數func。
·這一般用於反作用,例如更新累加器或與外部存儲系統交互。
·注意:在foreach()以外修改除累加器以外的變量可能會致使未定義的行爲。
·有關詳細信息,請參閱瞭解閉包。
·Spark RDD API還公開了某些操做的異步版本,例如foreach的foreachAsync,它會當即將一個FutureAction返回給調用者,而不是在完成操做時阻塞。
·這可用於管理或等待操做的異步執行。

Shuffle operations(隨機操做)

·Spark中的某些操做會觸發稱爲shuffle的事件。
·隨機播放是Spark的從新分配數據的機制,所以它能夠跨分區進行不一樣的分組。
·這一般涉及跨執行程序和機器複製數據,使得混洗成爲複雜且昂貴的操做。

Background(背景)

·爲了理解在shuffle期間發生的事情,咱們能夠考慮reduceByKey操做的示例。
·reduceByKey操做生成一個新的RDD,其中單個鍵的全部值都組合成一個元組 - 鍵和對與該鍵關聯的全部值執行reduce函數的結果。
·挑戰在於,並不是單個密鑰的全部值都必須位於同一個分區,甚至是同一個機器上,但它們必須位於同一位置才能計算結果。
·在Spark中,數據一般不跨分區分佈,以便在特定操做的必要位置。
·在計算過程當中,單個任務將在單個分區上運行 - 所以,要組織單個reduceByKey reduce任務執行的全部數據,Spark須要執行所有操做。
·它必須從全部分區讀取以查找全部鍵的全部值,而後將分區中的值彙總在一塊兒以計算每一個鍵的最終結果 - 這稱爲shuffle。
·儘管新洗牌數據的每一個分區中的元素集將是肯定性的,而且分區自己的排序也是如此,但這些元素的排序不是。
·若是在隨機播放後須要可預測的有序數據,則可使用:
·mapPartitions使用例如.sorted對每一個分區進行排序
·repartitionAndSortWithinPartitions在同時從新分區的同時有效地對分區進行排序
·sortBy來建立一個全局排序的RDD
·能夠致使混洗的操做包括從新分區操做,如從新分區和合並,「ByKey操做(計數除外),如groupByKey和reduceByKey,以及聯合操做,如cogroup和join。

Performance Impact(績效影響)

·Shuffle是一項昂貴的操做,由於它涉及磁盤I / O,數據序列化和網絡I / O.
·爲了組織shuffle的數據,Spark生成了一系列任務 - 映射任務以組織數據,以及一組reduce任務來聚合它。
·這個術語來自MapReduce,並不直接與Spark的地圖和減小操做相關。
·在內部,各個地圖任務的結果會保留在內存中,直到它們沒法適應。
·而後,這些基於目標分區進行排序並寫入單個文件。
·在reduce方面,任務讀取相關的排序塊。
·某些shuffle操做會消耗大量的堆內存,由於它們使用內存中的數據結構來在傳輸記錄以前或以後組織記錄。
·具體來講,reduceByKey和aggregateByKey在地圖側建立這些結構,而且'ByKey操做在reduce側生成這些結構。
·當數據不適合內存時,Spark會將這些表溢出到磁盤,從而致使磁盤I / O的額外開銷和垃圾收集增長。
·Shuffle還會在磁盤上生成大量中間文件。
·從Spark 1.3開始,這些文件將被保留,直到再也不使用相應的RDD並進行垃圾回收。
·這樣作是爲了在從新計算譜系時不須要從新建立shuffle文件。
·若是應用程序保留對這些RDD的引用或GC不常常啓動,則垃圾收集可能僅在很長一段時間後纔會發生。
·這意味着長時間運行的Spark做業可能會佔用大量磁盤空間。
·配置Spark上下文時,spark.local.dir配置參數指定臨時存儲目錄。
·能夠經過調整各類配置參數來調整隨機行爲。
·請參閱「Spark配置指南」中的「隨機行爲」部分。

RDD Persistence(RDD持久性)

·Spark中最重要的功能之一是跨操做在內存中持久化(或緩存)數據集。
·當您持久保存RDD時,每一個節點都會存儲它在內存中計算的任何分區,並在該數據集(或從中派生的數據集)的其餘操做中重用它們。
·這使得將來的行動更快(一般超過10倍)。
·緩存是迭代算法和快速交互式使用的關鍵工具。
·您可使用persist()或cache()方法標記要保留的RDD。
·第一次在動做中計算它,它將保留在節點的內存中。
·Spark的緩存是容錯的 - 若是丟失了RDD的任何分區,它將使用最初建立它的轉換自動從新計算。
·此外,每一個持久化RDD可使用不一樣的存儲級別進行存儲,例如,容許您將數據集保留在磁盤上,將其保留在內存中,但做爲序列化Java對象(以節省空間),跨節點複製它。
·經過將StorageLevel對象(Scala,Java,Python)傳遞給persist()來設置這些級別。
·cache()方法是使用默認存儲級別的簡寫,即StorageLevel.MEMORY_ONLY(在內存中存儲反序列化的對象)。
·完整的存儲級別是:
·存儲級別含義
·MEMORY_ONLY將RDD存儲爲JVM中的反序列化Java對象。
·若是RDD不適合內存,則某些分區將不會被緩存,而且每次須要時都會從新計算。
·這是默認級別。
·MEMORY_AND_DISK將RDD存儲爲JVM中的反序列化Java對象。
·若是RDD不適合內存,請存儲不適合磁盤的分區,並在須要時從那裏讀取它們。
·MEMORY_ONLY_SER
·(Java和Scala)將RDD存儲爲序列化Java對象(每一個分區一個字節數組)。
·這一般比反序列化對象更節省空間,特別是在使用快速序列化器時,但讀取CPU密集程度更高。
·MEMORY_AND_DISK_SER
·(Java和Scala)與MEMORY_ONLY_SER相似,可是將不適合內存的分區溢出到磁盤,而不是每次須要時動態從新計算它們。
·DISK_ONLY僅將RDD分區存儲在磁盤上。
·MEMORY_ONLY_2,MEMORY_AND_DISK_2等。與上面的級別相同,但複製兩個羣集節點上的每一個分區。
·OFF_HEAP(實驗)與MEMORY_ONLY_SER相似,但將數據存儲在堆外內存中。
·這須要啓用堆外內存。
·注意:在Python中,存儲的對象將始終使用Pickle庫進行序列化,所以您是否選擇序列化級別並不重要。
·Python中的可用存儲級別包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。
·即便沒有用戶調用持久性,Spark也會在隨機操做(例如reduceByKey)中自動保留一些中間數據。
·這樣作是爲了不在shuffle期間節點發生故障時從新計算整個輸入。
·咱們仍然建議用戶在生成的RDD上調用persist,若是他們計劃重用它。

Which Storage Level to Choose?(選擇哪一種存儲級別?)

·Spark的存儲級別旨在提供內存使用和CPU效率之間的不一樣折衷。
·咱們建議您經過如下流程選擇一個:
·若是您的RDD與默認存儲級別(MEMORY_ONLY)很溫馨,請保持這種狀態。
·這是CPU效率最高的選項,容許RDD上的操做盡量快地運行。
·若是沒有,請嘗試使用MEMORY_ONLY_SER並選擇快速序列化庫,以使對象更節省空間,但仍然能夠快速訪問。
·(Java和Scala)
·除非計算數據集的函數很昂貴,不然它們不會溢出到磁盤,或者它們會過濾大量數據。
·不然,從新計算分區可能與從磁盤讀取分區同樣快。
·若是要快速故障恢復,請使用複製的存儲級別(例如,若是使用Spark來處理來自Web應用程序的請求)。
·全部存儲級別經過從新計算丟失的數據提供徹底容錯,但複製的存儲級別容許您繼續在RDD上運行任務,而無需等待從新計算丟失的分區。

Removing Data(刪除數據)

·Spark會自動監視每一個節點上的緩存使用狀況,並以最近最少使用(LRU)的方式刪除舊數據分區。
·若是您想手動刪除RDD而不是等待它退出緩存,請使用RDD.unpersist()方法。

Shared Variables(共享變量)

·一般,當在遠程集羣節點上執行傳遞給Spark操做(例如map或reduce)的函數時,它將在函數中使用的全部變量的單獨副本上工做。
·這些變量將複製到每臺計算機,而且遠程計算機上的變量的更新不會傳播回驅動程序。
·支持跨任務的通用,讀寫共享變量效率低下。
·可是,Spark確實爲兩種常見的使用模式提供了兩種有限類型的共享變量:廣播變量和累加器。

Broadcast Variables(廣播變量)

·廣播變量容許程序員在每臺機器上保留一個只讀變量,而不是隨副本一塊兒發送它的副本。
·例如,它們可用於以有效的方式爲每一個節點提供大輸入數據集的副本。
·Spark還嘗試使用有效的廣播算法來分發廣播變量,以下降通訊成本。
·Spark動做經過一組階段執行,由分佈式「shuffle」操做分隔。
·Spark自動廣播每一個階段中任務所需的公共數據。
·以這種方式廣播的數據以序列化形式緩存並在運行每一個任務以前反序列化。
·這意味着顯式建立廣播變量僅在跨多個階段的任務須要相同數據或以反序列化形式緩存數據很重要時纔有用。
·經過調用SparkContext.broadcast(v)從變量v建立廣播變量。
·廣播變量是v的包裝器,能夠經過調用value方法訪問其值。
·下面的代碼顯示了這個:
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
·建立廣播變量後,應該在羣集上運行的任何函數中使用它而不是值v,這樣v不會屢次傳送到節點。
·另外,在廣播以後不該修改對象v,以便確保全部節點得到廣播變量的相同值(例如,若是稍後將變量發送到新節點)。

Accumulators(累加器)

·累加器是僅經過關聯和交換操做「添加」的變量,所以能夠並行有效地支持。
·它們可用於實現計數器(如MapReduce)或總和。
·Spark自己支持數值類型的累加器,程序員能夠添加對新類型的支持。
·做爲用戶,您能夠建立命名或未命名的累加器。
·以下圖所示,命名累加器(在此實例計數器中)將顯示在Web UI中,用於修改該累加器的階段。
·Spark顯示「任務」表中任務修改的每一個累加器的值。
跟蹤UI中的累加器對於理解運行階段的進度很是有用(注意:Python中尚不支持)。
·經過調用SparkContext.accumulator(v)從初始值v建立累加器。
·而後,可使用add方法或+ =運算符將在羣集上運行的任務添加到其中。
·可是,他們沒法讀懂它的價值。
·只有驅動程序可使用其value方法讀取累加器的值。
·下面的代碼顯示了一個累加器用於添加數組的元素:
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10
·雖然此代碼使用Int類型的累加器的內置支持,但程序員也能夠經過繼承AccumulatorParam來建立本身的類型。
·AccumulatorParam接口有兩種方法:零用於爲數據類型提供「零值」,addInPlace用於將兩個值一塊兒添加。
·例如,假設咱們有一個表示數學向量的Vector類,咱們能夠寫:
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
·對於僅在操做內執行的累加器更新,Spark保證每一個任務對累加器的更新僅應用一次,即從新啓動的任務不會更新該值。
·在轉換中,用戶應該知道,若是從新執行任務或做業階段,則能夠屢次應用每一個任務的更新。
·累加器不會改變Spark的惰性評估模型。
·若是在RDD上的操做中更新它們,則只有在RDD做爲操做的一部分計算時才更新它們的值。
·所以,在像map()這樣的惰性轉換中進行累積器更新時,不能保證執行累加器更新。
·如下代碼片斷演示了此屬性:
accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed.

Deploying to a Cluster(部署到羣集)

·應用程序提交指南介紹瞭如何將應用程序提交到羣集。
·簡而言之,一旦將應用程序打包到JAR(用於Java / Scala)或一組.py或.zip文件(用於Python),bin / spark-submit腳本容許您將其提交給任何支持的集羣管理器。

Launching Spark jobs from Java / Scala(從Java / Scala啓動Spark做業)

org.apache.spark.launcher包提供了使用簡單Java API將Spark做業做爲子進程啓動的類。java

Unit Testing(單元測試)

·Spark對任何流行的單元測試框架進行單元測試都很友好。
·只需在測試中建立一個SparkContext,主URL設置爲local,運行您的操做,而後調用SparkContext.stop()將其拆除。
·確保在finally塊或測試框架的tearDown方法中中止上下文,由於Spark不支持在同一程序中同時運行的兩個上下文。

Where to Go from Here(從這往哪兒走)

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala,JavaPythonR). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:python

./bin/run-example SparkPi

對於Python示例,請使用spark-submit代替:mysql

./bin/spark-submit examples/src/main/python/pi.py

Spark SQL, DataFrames and Datasets Guide

·Spark SQL是用於結構化數據處理的Spark模塊。
·與基本的Spark RDD API不一樣,Spark SQL提供的接口爲Spark提供了有關數據結構和正在執行的計算的更多信息。
·在內部,Spark SQL使用此額外信息來執行額外的優化。
·有幾種與Spark SQL交互的方法,包括SQL和Dataset API。
·在計算結果時,使用相同的執行引擎,與您用於表達計算的API /語言無關。
·這種統一意味着開發人員能夠輕鬆地在不一樣的API之間來回切換,從而提供表達給定轉換的最天然的方式。
·此頁面上的全部示例都使用Spark分發中包含的示例數據,而且能夠在spark-shell,pyspark shell或sparkR shell中運行。

SQL

·Spark SQL的一個用途是執行SQL查詢。
·Spark SQL還可用於從現有Hive安裝中讀取數據。
·有關如何配置此功能的更多信息,請參閱Hive Tables部分。
·從其餘編程語言中運行SQL時,結果將做爲數據集/數據框返回。
·您還可使用命令行或JDBC / ODBC與SQL接口進行交互。

Datasets and DataFrames

·數據集是分佈式數據集合。
·數據集是Spark 1.6中添加的一個新接口,它提供了RDD的優點(強類型,使用強大的lambda函數的能力)和Spark SQL優化執行引擎的優勢。
·數據集能夠從JVM對象構造,而後使用功能轉換(map,flatMap,filter等)進行操做。
·數據集API在Scala和Java中可用。
·Python沒有對Dataset API的支持。
·但因爲Python的動態特性,數據集API的許多好處已經可用(即您能夠經過名稱天然地訪問行的字段row.columnName)。
·R的狀況相似。
·DataFrame是一個組織成命名列的數據集。
·它在概念上等同於關係數據庫中的表或R / Python中的數據框,但在底層具備更豐富的優化。
·DataFrame能夠從多種來源構建,例如:結構化數據文件,Hive中的表,外部數據庫或現有RDD。
·DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行數據集表示。
·在Scala API中,DataFrame只是Dataset [Row]的類型別名。
·而在Java API中,用戶須要使用Dataset 來表示DataFrame。
·在本文檔中,咱們常常將行的Scala / Java數據集稱爲DataFrame。

Getting Started(入門)

Starting Point: SparkSession(起點:SparkSession)

·Spark中全部功能的入口點是SparkSession類。
·要建立基本的SparkSession,只需使用SparkSession.builder:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() 
·在Spark repo中的「examples / src / main / python / sql / basic.py」中找到完整的示例代碼。
·Spark 2.0中的SparkSession爲Hive功能提供內置支持,包括使用HiveQL編寫查詢,訪問Hive UDF以及從Hive表讀取數據的功能。
·要使用這些功能,您無需擁有現有的Hive設置。

Creating DataFrames(建立DataFrame)

·使用SparkSession,應用程序能夠從現有RDD,Hive表或Spark數據源建立DataFrame。
·做爲示例,如下內容基於JSON文件的內容建立DataFrame:
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Untyped Dataset Operations (aka DataFrame Operations)無類型數據集操做(又名DataFrame操做)

·DataFrames爲Scala,Java,Python和R中的結構化數據操做提供特定於域的語言。
·如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中Rows的數據集。
·與「類型轉換」相比,這些操做也稱爲「無類型轉換」,帶有強類型Scala / Java數據集。
·這裏咱們包括使用數據集進行結構化數據處理的一些基本示例:
·在Python中,能夠經過屬性(df.age)或索引(df ['age'])訪問DataFrame的列。
·雖然前者便於交互式數據探索,但強烈建議用戶使用後一種形式,這是將來的證實,不會破壞也是DataFrame類屬性的列名。
# spark, df are from the previous example
# Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+ 
·在Spark repo中的「examples / src / main / python / sql / basic.py」中找到完整的示例代碼。
·有關可在DataFrame上執行的操做類型的完整列表,請參閱API文檔。
·除了簡單的列引用和表達式以外,DataFrame還具備豐富的函數庫,包括字符串操做,日期算術,常見的數學運算等。
·完整列表可在DataFrame函數參考中找到。

Running SQL Queries Programmatically(以編程方式運行SQL查詢)

SparkSession上的sql函數使應用程序可以以編程方式運行SQL查詢並將結果做爲DataFrame返回。git

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Global Temporary View(全球臨時觀點)

·Spark SQL中的臨時視圖是會話範圍的,若是建立它的會話終止,它將消失。
·若是您但願擁有一個在全部會話之間共享的臨時視圖並保持活動狀態,直到Spark應用程序終止,您能夠建立一個全局臨時視圖。
·全局臨時視圖與系統保留的數據庫global_temp綁定,咱們必須使用限定名稱來引用它,例如
·SELECT * FROM global_temp.view1。
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Creating Datasets(建立數據集)

·數據集與RDD相似,可是,它們不使用Java序列化或Kryo,而是使用專用的編碼器來序列化對象以便經過網絡進行處理或傳輸。
·雖然編碼器和標準序列化都負責將對象轉換爲字節,但編碼器是動態生成的代碼,並使用一種格式,容許Spark執行許多操做,如過濾,排序和散列,而無需將字節反序列化爲對象。
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Interoperating with RDDs(與RDD互操做)

·Spark SQL支持兩種不一樣的方法將現有RDD轉換爲數據集。
·第一種方法使用反射來推斷包含特定類型對象的RDD的模式。
·這種基於反射的方法能夠提供更簡潔的代碼,而且在您編寫Spark應用程序時已經瞭解模式時能夠很好地工做。
·建立數據集的第二種方法是經過編程接口,容許您構建模式,而後將其應用於現有RDD。
·雖然此方法更詳細,但它容許您在直到運行時才知道列及其類型時構造數據集。

Inferring the Schema Using Reflection(使用反射推斷模式)

·Spark SQL能夠將Row對象的RDD轉換爲DataFrame,從而推斷出數據類型。
·經過將鍵/值對列表做爲kwargs傳遞給Row類來構造行。
·此列表的鍵定義表的列名稱,並經過對整個數據集進行採樣來推斷類型,相似於對JSON文件執行的推斷
from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Programmatically Specifying the Schema(以編程方式指定架構)

·當沒法提早定義kwargs字典時(例如,記錄結構以字符串形式編碼,或者文本數據集將被解析,字段將以不一樣方式爲不一樣用戶進行投影),可使用編程方式建立DataFrame
·三個步驟。
·從原始RDD建立元組或列表的RDD;
·建立由StructType表示的模式,該模式與步驟1中建立的RDD中的元組或列表的結構相匹配。
·經過SparkSession提供的createDataFrame方法將模式應用於RDD。

例如:程序員

# Import data types
from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Aggregations(聚合)

·內置的DataFrames函數提供常見的聚合,如count(),countDistinct(),avg(),max(),min()等。雖然這些函數是爲DataFrames設計的,但Spark SQL也有類型安全的版本
·其中一些在Scala和Java中使用強類型數據集。
·此外,用戶不限於預約義的聚合函數,而且能夠建立本身的聚合函數。

Untyped User-Defined Aggregate Functions(無用戶定義的聚合函數)

·用戶必須擴展UserDefinedAggregateFunction抽象類以實現自定義無類型聚合函數。
·例如,用戶定義的平均值可能以下所示:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions(類型安全的用戶定義聚合函數)

·強類型數據集的用戶定義聚合圍繞Aggregator抽象類。
·例如,類型安全的用戶定義平均值可能以下所示:
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.expressions.Aggregator case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources(數據源)

·Spark SQL支持經過DataFrame接口對各類數據源進行操做。
·DataFrame可使用關係轉換進行操做,也能夠用於建立臨時視圖。
·將DataFrame註冊爲臨時視圖容許您對其數據運行SQL查詢。
·本節介紹使用Spark數據源加載和保存數據的通常方法,而後介紹可用於內置數據源的特定選項。

Generic Load/Save Functions(通用加載/保存功能)

在最簡單的形式中,默認數據源(parquet除非另外由spark.sql.sources.default配置)將用於全部操做。github

df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Manually Specifying Options(手動指定選項)

·您還能夠手動指定將要使用的數據源以及要傳遞給數據源的任何其餘選項。
·數據源由其徹底限定名稱(即org.apache.spark.sql.parquet)指定,但對於內置源,您還可使用其短名稱(json,parquet,jdbc,orc,libsvm,csv,text
·)。
·從任何數據源類型加載的DataFrame均可以使用此語法轉換爲其餘類型。

要加載JSON文件,您可使用:web

df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

要加載CSV文件,您可使用:算法

df = spark.read.load("examples/src/main/resources/people.csv", format="csv", sep=":", inferSchema="true", header="true") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
·在寫操做期間也使用額外選項。
·例如,您能夠控制ORC數據源的bloom過濾器和字典編碼。
·如下ORC示例將在favorite_color上建立bloom過濾器,並對name和favorite_color使用字典編碼。
·對於Parquet,也存在parquet.enable.dictionary。
·要查找有關額外ORC / Parquet選項的更多詳細信息,請訪問官方Apache ORC / Parquet網站。
df = spark.read.orc("examples/src/main/resources/users.orc") (df.write.format("orc") .option("orc.bloom.filter.columns", "favorite_color") .option("orc.dictionary.key.threshold", "1.0") .save("users_with_options.orc")) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Run SQL on files directly(直接在文件上運行SQL)

能夠直接使用SQL查詢該文件,而不是使用讀取API將文件加載到DataFrame並進行查詢sql

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Save Modes(保存模式)

·保存操做能夠選擇使用SaveMode,它指定如何處理現有數據(若是存在)。
·重要的是要意識到這些保存模式不使用任何鎖定而且不是原子的。
·此外,執行覆蓋時,將在寫出新數據以前刪除數據。
Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error" or "errorifexists"(default) 將DataFrame保存到數據源時,若是數據已存在,則會引起異常。
SaveMode.Append "append" 將DataFrame保存到數據源時,若是數據/表已存在,則DataFrame的內容應附加到現有數據。
SaveMode.Overwrite "overwrite" 覆蓋模式意味着在將DataFrame保存到數據源時,若是數據/表已經存在,則預期現有數據將被DataFrame的內容覆蓋。
SaveMode.Ignore "ignore" Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Saving to Persistent Tables(保存到持久表)

·也可使用saveAsTable命令將DataFrames做爲持久表保存到Hive Metastore中。
·請注意,使用此功能不須要現有的Hive部署。
·Spark將爲您建立默認的本地Hive Metastore(使用Derby)。
·與createOrReplaceTempView命令不一樣,saveAsTable將實現DataFrame的內容並建立指向Hive Metastore中數據的指針。
·只要您保持與同一Metastore的鏈接,即便您的Spark程序從新啓動後,持久表仍然存在。
·能夠經過使用表的名稱調用SparkSession上的table方法來建立持久表的DataFrame。
·對於基於文件的數據源,例如
·text,parquet,json等您能夠經過路徑選項指定自定義表路徑,例如
·df.write.option(「path」,「/ some / path」).saveAsTable(「t」)。
·刪除表時,將不會刪除自定義表路徑,而且表數據仍然存在。
·若是未指定自定義表路徑,則Spark會將數據寫入倉庫目錄下的默認表路徑。
·刪除表時,也將刪除默認表路徑。
·從Spark 2.1開始,持久數據源表將每一個分區元數據存儲在Hive Metastore中。
·這帶來了幾個好處:
·因爲Metastore只能返回查詢所需的分區,所以再也不須要在表的第一個查詢中發現全部分區。
·如今,對於使用Datasource API建立的表,可使用ALTER TABLE PARTITION ... SET LOCATION等Hive DDL。
·請注意,在建立外部數據源表(具備路徑選項的表)時,默認狀況下不會收集分區信息。
·要同步Metastore中的分區信息,能夠調用MSCK REPAIR TABLE。

Bucketing, Sorting and Partitioning

·對於基於文件的數據源,還能夠對輸出進行存儲和排序或分區。
·分段和排序僅適用於持久表:
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

雖然分區能夠在使用數據集API時與save和saveAsTable一塊兒使用。

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 
·在Spark repo中的「examples / src / main / python / sql / datasource.py」中找到完整的示例代碼。
·雖然分區能夠在使用數據集API時與save和saveAsTable一塊兒使用。
df = spark.read.parquet("examples/src/main/resources/users.parquet") (df .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed")) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
·partitionBy建立一個目錄結構,如分區發現部分所述。
·所以,它對具備高基數的列的適用性有限。
·相比之下,bucketBy能夠在固定數量的存儲桶中分配數據,而且能夠在許多惟一值無限制時使用。

Parquet Files(Parquet文件)

·Parquet是一種柱狀格式,許多其餘數據處理系統都支持它。
·Spark SQL支持讀取和寫入Parquet文件,這些文件自動保留原始數據的模式。
·在編寫Parquet文件時,出於兼容性緣由,全部列都會自動轉換爲可爲空。

Loading Data Programmatically(以編程方式加載數據)

使用上面示例中的數據:

peopleDF = spark.read.json("examples/src/main/resources/people.json") # DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet") # Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() # +------+ # | name| # +------+ # |Justin| # +------+ 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Partition Discovery(分區發現)

·表分區是Hive等系統中經常使用的優化方法。
·在分區表中,數據一般存儲在不一樣的目錄中,分區列值在每一個分區目錄的路徑中編碼。
·全部內置文件源(包括Text / CSV / JSON / ORC / Parquet)都可以自動發現和推斷分區信息。
·例如,咱們可使用如下目錄結構將全部之前使用的填充數據存儲到分區表中,並將兩個額外的列(性別和國家/地區)做爲分區列:
path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
·經過將path / to / table傳遞給SparkSession.read.parquet或SparkSession.read.load,Spark SQL將自動從路徑中提取分區信息。
·如今返回的DataFrame的架構變爲:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
·請注意,分區列的數據類型是自動推斷的。
·目前,支持數字數據類型,日期,時間戳和字符串類型。
·有時,用戶可能不但願自動推斷分區列的數據類型。
·對於這些用例,能夠經過spark.sql.sources.partitionColumnTypeInference.enabled配置自動類型推斷,默認爲true。
·禁用類型推斷時,字符串類型將用於分區列。
·從Spark 1.6.0開始,分區發現默認只查找給定路徑下的分區。
·對於上面的示例,若是用戶將path / to / table / gender = male傳遞給SparkSession.read.parquet或SparkSession.read.load,則不會將性別視爲分區列。
·若是用戶須要指定分區發現應該開始的基本路徑,則能夠在數據源選項中設置basePath。
·例如,當path / to / table / gender = male是數據的路徑而且用戶將basePath設置爲path / to / table /時,gender將是分區列。

Schema Merging(架構合併)

·與Protocol Buffer,Avro和Thrift同樣,Parquet也支持模式演變。
·用戶能夠從簡單模式開始,並根據須要逐漸向模式添加更多列。
·經過這種方式,用戶可能最終獲得具備不一樣但相互兼容的模式的多個Parquet文件。
·Parquet數據源如今可以自動檢測這種狀況併合並全部這些文件的模式。
·因爲模式合併是一項相對昂貴的操做,而且在大多數狀況下不是必需的,所以咱們默認從1.5.0開始關閉它。
·您能夠啓用它
·在讀取Parquet文件時將數據源選項mergeSchema設置爲true(以下面的示例所示),或
·將全局SQL選項spark.sql.parquet.mergeSchema設置爲true。
from pyspark.sql import Row # spark is from the previous example. # Create a simple DataFrame, stored into a partition directory sc = spark.sparkContext squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)) .map(lambda i: Row(single=i, double=i ** 2))) squaresDF.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i ** 3))) cubesDF.write.parquet("data/test_table/key=2") # Read the partitioned table mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() # The final schema consists of all 3 columns in the Parquet files together # with the partitioning column appeared in the partition directory paths. # root # |-- double: long (nullable = true) # |-- single: long (nullable = true) # |-- triple: long (nullable = true) # |-- key: integer (nullable = true) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Hive metastore Parquet table conversion(Hive Metastore Parquet錶轉換)

·在讀取和寫入Hive Metastore Parquet表時,Spark SQL將嘗試使用本身的Parquet支持而不是Hive SerDe來得到更好的性能。
·此行爲由spark.sql.hive.convertMetastoreParquet配置控制,默認狀況下處於打開狀態。

Hive/Parquet Schema Reconciliation

·從表模式處理的角度來看,Hive和Parquet之間存在兩個主要區別。
·Hive不區分大小寫,而Parquet則不區分大小寫
·Hive認爲全部列均可覺得空,而P​​arquet中的可空性很重要
·因爲這個緣由,在將Hive Metastore Parquet錶轉換爲Spark SQL Parquet表時,咱們必須將Hive Metastore模式與Parquet模式進行協調。
·對賬規則是:
·兩個模式中具備相同名稱的字段必須具備相同的數據類型,而無論是否爲空。
·協調字段應具備Parquet端的數據類型,以便遵循可爲空性。
·協調的模式剛好包含Hive Metastore模式中定義的那些字段。
·僅出如今Parquet模式中的任何字段都將放入已協調的模式中。
·僅出如今Hive Metastore模式中的任何字段都將在協調模式中添加爲可空字段。

Metadata Refreshing(元數據刷新)

·Spark SQL緩存Parquet元數據以得到更好的性能。
·啓用Hive Metastore Parquet錶轉換後,還會緩存這些轉換表的元數據。
·若是這些表由Hive或其餘外部工具更新,則須要手動刷新它們以確保元數據一致。
# spark is an existing SparkSession spark.catalog.refreshTable("my_table")

Configuration(構造)

可使用SparkSession上的setConf方法或使用SQL運行SET key = value命令來完成Parquet的配置。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false
·其餘一些Parquet生成系統,特別是Impala,Hive和舊版本的Spark SQL,在寫出Parquet模式時不區分二進制數據和字符串。
·此標誌告訴Spark SQL將二進制數據解釋爲字符串,以提供與這些系統的兼容性。
spark.sql.parquet.int96AsTimestamp true
·一些Parquet生產系統,特別是Impala和Hive,將時間戳存儲到INT96中。
·此標誌告訴Spark SQL將INT96數據解釋爲時間戳,以提供與這些系統的兼容性。
spark.sql.parquet.compression.codec snappy
·設置編寫Parquet文件時使用的壓縮編解碼器。
·若是在特定於表的選項/屬性中指定了「compression」或「parquet.compression」,則優先級爲「compression」,「parquet.compression」,「spark.sql.parquet.compression.codec」。
·可接受的值包括:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。
·請注意,`zstd`須要在Hadoop 2.9.0以前安裝`ZStandardCodec`,`brotli`須要安裝`BrotliCodec`。
spark.sql.parquet.filterPushdown true 設置爲true時啓用Parquet過濾器下推優化。
spark.sql.hive.convertMetastoreParquet true 設置爲false時,Spark SQL將使用Hive SerDe做爲鑲木桌而不是內置支持。
spark.sql.parquet.mergeSchema false

若是爲true,則Parquet數據源合併從全部數據文件收集的模式,不然,若是沒有可用的摘要文件,則從摘要文件或隨機數據文件中選取模式。

spark.sql.parquet.writeLegacyFormat false
·若是爲true,則數據將以Spark 1.4及更早版本的方式寫入。
·例如,十進制值將以Apache Parquet的固定長度字節數組格式寫入,其餘系統(如Apache Hive和Apache Impala)也使用該格式。
·若是爲false,將使用Parquet中的較新格式。
·例如,小數將以基於int的格式寫入。
·若是Parquet輸出旨在用於不支持此較新格式的系統,請設置爲true。

JSON Files(JSON文件)

·Spark SQL能夠自動推斷JSON數據集的架構並將其做爲DataFrame加載。
·可使用JSON文件上的SparkSession.read.json完成此轉換。
·請注意,做爲json文件提供的文件不是典型的JSON文件。
·每行必須包含一個單獨的,自包含的有效JSON對象。
·有關更多信息,請參閱JSON Lines文本格式,也稱爲換行符分隔的JSON。
·對於常規多行JSON文件,請將multiLine參數設置爲True。
# spark is from the previous example.
sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path) # The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # | name| # +------+ # |Justin| # +------+ # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() # +---------------+----+ # | address|name| # +---------------+----+ # |[Columbus,Ohio]| Yin| # +---------------+----+ 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Hive Tables(Hive表)

·Spark SQL還支持讀取和寫入存儲在Apache Hive中的數據。
·可是,因爲Hive具備大量依賴項,所以這些依賴項不包含在默認的Spark分發中。
·若是能夠在類路徑上找到Hive依賴項,Spark將自動加載它們。
·請注意,這些Hive依賴項也必須存在於全部工做節點上,由於它們須要訪問Hive序列化和反序列化庫(SerDes)才能訪問存儲在Hive中的數據。
·經過在conf /中放置hive-site.xml,core-site.xml(用於安全性配置)和hdfs-site.xml(用於HDFS配置)文件來完成Hive的配置。
·使用Hive時,必須使用Hive支持實例化SparkSession,包括鏈接到持久性Hive Metastore,支持Hive serdes和Hive用戶定義函數。
·沒有現有Hive部署的用戶仍能夠啓用Hive支持。
·當未由hive-site.xml配置時,上下文會自動在當前目錄中建立metastore_db,並建立一個由spark.sql.warehouse.dir配置的目錄,該目錄默認爲Spark應用程序當前目錄中的目錄spark-warehouse
·開始了。
·請注意,自Spark 2.0.0起,不推薦使用hive-site.xml中的hive.metastore.warehouse.dir屬性。
·而是使用spark.sql.warehouse.dir指定倉庫中數據庫的默認位置。
·您可能須要向啓動Spark應用程序的用戶授予寫入權限。
from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key| value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ... # Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # | 500 | # +--------+ # The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") # The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ... # You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records") # Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() # +---+------+---+------+ # |key| value|key| value| # +---+------+---+------+ # | 2| val_2| 2| val_2| # | 4| val_4| 4| val_4| # | 5| val_5| 5| val_5| # ... 
Find full example code at "examples/src/main/python/sql/hive.py" in the Spark repo.

Specifying storage format for Hive tables(指定Hive表的存儲格式)

·建立Hive表時,須要定義此表應如何從/向文件系統讀取/寫入數據,即「輸入格式」和「輸出格式」。
·您還須要定義此表如何將數據反序列化爲行,或將行序列化爲數據,即「serde」。
·如下選項可用於指定存儲格式(「serde」,「輸入格式」,「輸出格式」),例如,
·CREATE TABLE src(id int)使用配置單元選項(fileFormat'planra')。
·默認狀況下,咱們將表文件做爲純文本讀取。
·請注意,建立表時尚不支持Hive存儲處理程序,您可使用Hive端的存儲處理程序建立表,並使用Spark SQL讀取它
Property Name Meaning
fileFormat
·fileFormat是一種存儲格式規範包,包括「serde」,「input format」和「output format」。
·目前咱們支持6種fileFormats:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat
·這兩個選項將相應的`InputFormat`和`OutputFormat`類的名稱指定爲字符串文字,例如
·`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。
·這兩個選項必須出如今pair中,若是已經指定了`fileFormat`選項,則沒法指定它們。
serde
·此選項指定serde類的名稱。
·當指定`fileFormat`選項時,若是給定的`fileFormat`已經包含serde的信息,則不要指定此選項。
·目前「sequencefile」,「textfile」和「rcfile」不包含serde信息,您能夠將此選項與這3個fileFormats一塊兒使用。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
·這些選項只能與「textfile」fileFormat一塊兒使用。
·它們定義瞭如何將分隔文件讀入行。

使用OPTIONS定義的全部其餘屬性將被視爲Hive serde屬性。

Interacting with Different Versions of Hive Metastore(與不一樣版本的Hive Metastore交互)

·Spark SQL的Hive支持最重要的部分之一是與Hive Metastore的交互,這使得Spark SQL可以訪問Hive表的元數據。
·從Spark 1.4.0開始,可使用單個二進制構建的Spark SQL來查詢不一樣版本的Hive Metastores,使用下面描述的配置。
·請注意,獨立於用於與Metastore通訊的Hive版本,內部Spark SQL將針對Hive 1.2.1進行編譯,並使用這些類進行內部執行(serdes,UDF,UDAF等)。
·如下選項可用於配置用於檢索元數據的Hive版本:
Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Version of the Hive metastore. Available options are 0.12.0 through 2.3.3.
spark.sql.hive.metastore.jars builtin Location of the jars that should be used to instantiate the HiveMetastoreClient. This property can be one of three options:
    1. builtin
    2. Use Hive 1.2.1, which is bundled with the Spark assembly when 
-Phive
       is enabled. When this option is chosen, 
spark.sql.hive.metastore.version
      must be either 
1.2.1
     or not defined.
  1. maven
  2. Use Hive jars of specified version downloaded from Maven repositories. This configuration is not generally recommended for production deployments.
  3. A classpath in the standard format for the JVM. This classpath must include all of Hive and its dependencies, including the correct version of Hadoop. These jars only need to be present on the driver, but if you are running in yarn cluster mode then you must ensure they are packaged with your application.
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

A comma-separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive. An example of classes that should be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need to be shared are those that interact with classes that are already shared. For example, custom appenders that are used by log4j.

spark.sql.hive.metastore.barrierPrefixes (empty)

A comma separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*).

JDBC To Other Databases(JDBC到其餘數據庫)

·Spark SQL還包括一個可使用JDBC從其餘數據庫讀取數據的數據源。
·與使用JdbcRDD相比,此功能應該更受歡迎。
·這是由於結果做爲DataFrame返回,能夠在Spark SQL中輕鬆處理,也能夠與其餘數據源鏈接。
·JDBC數據源也更易於使用Java或Python,由於它不須要用戶提供ClassTag。
·(請注意,這與Spark SQL JDBC服務器不一樣,後者容許其餘應用程序使用Spark SQL運行查詢)。
·首先,您須要在spark類路徑中包含特定數據庫的JDBC驅動程序。
·例如,要從Spark Shell鏈接到postgres,您將運行如下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
·可使用Data Sources API將遠程數據庫中的表加載爲DataFrame或Spark SQL臨時視圖。
·用戶能夠在數據源選項中指定JDBC鏈接屬性。
·用戶和密碼一般做爲登陸數據源的鏈接屬性提供。
·除鏈接屬性外,Spark還支持如下不區分大小寫的選項:
Property Name Meaning
url
·要鏈接的JDBC URL。
·能夠在URL中指定特定於源的鏈接屬性。
·例如,jdbc:postgresql:// localhost / test?user = fred&password = secret
dbtable
·應該讀取或寫入的JDBC表。
·請注意,在讀取路徑中使用它時,可使用在SQL查詢的FROM子句中有效的任何內容。
·例如,您也能夠在括號中使用子查詢,而不是完整的表。
·不容許同時指定`dbtable`和`query`選項。
query
·將用於將數據讀入Spark的查詢。指定的查詢將括起來並用做FROM子句中的子查詢。Spark還會爲子查詢子句分配別名。
·例如,spark將向JDBC Source發出如下形式的查詢

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

·使用此選項時,如下是一些限制。
·不容許同時指定`dbtable`和`query`選項。
·不容許同時指定`query`和`partitionColumn`選項。
·當須要指定`partitionColumn`選項時,可使用`dbtable`選項指定子查詢,而且可使用做爲`dbtable`的一部分提供的子查詢別名來限定分區列。

範例:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()

driver 用於鏈接到此URL的JDBC驅動程序的類名
partitionColumn, lowerBound, upperBound
·若是指定了任何選項,則必須所有指定這些選項。此外,必須指定numPartitions。它們描述了在從多個工做者並行讀取時如何對錶進行分區。partitionColumn必須是相關表中的數字,日期或時間戳列。
·請注意,lowerBound和upperBound僅用於決定分區步幅,而不是用於過濾表中的行。所以,表中的全部行都將被分區並返回。此選項僅適用於閱讀。
numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
queryTimeout The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.
fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.
isolationLevel The transaction isolation level, which applies to current connection. It can be one of NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing. Please refer the documentation in java.sql.Connection.
sessionInitStatement After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
cascadeTruncate This is a JDBC writer related option. If enabled and supported by the JDBC database (PostgreSQL and Oracle at the moment), this options allows execution of a TRUNCATE TABLE t CASCADE (in the case of PostgreSQL a TRUNCATE TABLE ONLY t CASCADE is executed to prevent inadvertently truncating descendant tables). This will affect other tables, and thus should be used with care. This option applies only to writing. It defaults to the default cascading truncate behaviour of the JDBC database in question, specified in the isCascadeTruncate in each JDBCDialect.
createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.
createTableColumnTypes The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
customSchema
·用於從JDBC鏈接器讀取數據的自定義架構。例如,「id DECIMAL(38,0),名稱爲STRING」。您還能夠指定部分字段,其餘字段使用默認類型映射。例如,「id DECIMAL(38,0)」。列名應與JDBC表的相應列名相同。用戶能夠指定Spark SQL的相應數據類型,而不是使用默認值。
·此選項僅適用於閱讀。
pushDownPredicate
·用於啓用或禁用謂詞下推到JDBC數據源的選項。默認值爲true,在這種狀況下,Spark會盡量地將過濾器下推到JDBC數據源。不然,若是設置爲false,則不會將過濾器下推到JDBC數據源,所以全部過濾器都將由Spark處理。當Spark經過比JDBC數據源更快地執行謂詞過濾時,謂詞下推一般會被關閉。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load() jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying dataframe column data types on read jdbcDF3 = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .option("customSchema", "id DECIMAL(38, 0), name STRING") \ .load() # Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Apache Avro Data Source Guide(Apache Avro數據源指南)

自Spark 2.4發佈以來,Spark SQL爲讀取和編寫Apache Avro數據提供了內置支持。

Deploying(配置)

·spark-avro模塊是外置的,默認狀況下不包含在spark-submit或spark-shell中。
·與任何Spark應用程序同樣,spark-submit用於啓動您的應用程序。
·spark-avro_2.12及其依賴項能夠直接添加到spark-submit使用--packages,例如
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.2 ...

對於在spark-shell上進行試驗,您還可使用--packages直接添加org.apache.spark:spark-avro_2.12及其依賴項

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.2 ...

有關提交具備外部依賴性的應用程序的詳細信息,請參閱「應用程序提交指南。

Load and Save Functions(加載和保存功能)

·因爲spark-avro模塊是外部的,所以DataFrameReader或DataFrameWriter中沒有.avro API。
·要以Avro格式加載/保存數據,您須要將數據源選項格式指定爲avro(或org.apache.spark.sql.avro)。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro") df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

to_avro() and from_avro()

·Avro軟件包提供了to_avro函數,能夠將列編碼爲Avro格式的二進制文件,from_avro()將Avro二進制數據解碼爲列。
·兩個函數都將一列轉換爲另外一列,輸入/輸出SQL數據類型能夠是複雜類型或基本類型。
·在讀取或寫入像Kafka這樣的流媒體源時,將Avro記錄用做列很是有用。
·每一個Kafka鍵值記錄都會增長一些元數據,例如Kafka的攝取時間戳,Kafka的偏移量等。
·若是包含數據的「value」字段位於Avro中,則可使用from_avro()提取數據,豐富數據,清理數據,而後再將其下游推送到Kafka或將其寫入文件。
·to_avro()可用於將結構轉換爲Avro記錄。
·在將數據寫入Kafka時,若是要將多個列從新編碼爲單個列,此方法特別有用。
·這兩個函數目前僅在Scala和Java中可用。
import org.apache.spark.sql.avro._ // `from_avro` requires Avro schema in JSON string format. val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() // 1. Decode the Avro data into a struct; // 2. Filter by column `favorite_color`; // 3. Encode the column `name` in Avro format. val output = df .select(from_avro('value, jsonFormatSchema) as 'user) .where("user.favorite_color == \"red\"") .select(to_avro($"user.name") as 'value) val query = output .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic2") .start()

Data Source Option(數據源選項)

可使用DataFrameReader或DataFrameWriter上的.option方法設置Avro的數據源選項。

Property Name Default Meaning Scope
avroSchema None Optional Avro schema provided by an user in JSON format. The date type and naming of record fields should match the input Avro data or Catalyst data, otherwise the read/write action will fail. read and write
recordName topLevelRecord Top level record name in write result, which is required in Avro spec. write
recordNamespace "" Record namespace in write result. write
ignoreExtension true The option controls ignoring of files without .avro extensions in read.
If the option is enabled, all files (with and without .avro extension) are loaded.
read
compression snappy The compression option allows to specify a compression codec used in write.
Currently supported codecs are uncompressedsnappydeflatebzip2 and xz.
If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.
write

Configuration(構造)

可使用SparkSession上的setConf方法或使用SQL運行SET key = value命令來完成Avro的配置。

Property Name Default Meaning
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true If it is set to true, the data source provider com.databricks.spark.avro is mapped to the built-in but external Avro data source module for backward compatibility.
spark.sql.avro.compression.codec snappy Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.
spark.sql.avro.deflate.level -1 Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.

Compatibility with Databricks spark-avro(與Databricks spark-avro的兼容性)

·此Avro數據源模塊最初來自Databricks的開源存儲庫spark-avro並與之兼容。
·默認狀況下,啓用SQL配置spark.sql.legacy.replaceDatabricksSparkAvro.enabled,數據源提供程序com.databricks.spark.avro將映射到此內置Avro模塊。
·對於在目錄元庫中使用Provider屬性建立的Spark表做爲com.databricks.spark.avro,若是您使用此內置Avro模塊,則映射對於加載這些表相當重要。
·請注意,在Databricks的spark-avro中,爲快捷函數.avro()建立了隱式類AvroDataFrameWriter和AvroDataFrameReader。
·在這個內置但外部的模塊中,兩個隱式類都被刪除了。
·請改用DataFrameWriter或DataFrameReader中的.format(「avro」),它應該乾淨且足夠好。
·若是您更喜歡使用本身構建的spark-avro jar文件,則只需禁用配置spark.sql.legacy.replaceDatabricksSparkAvro.enabled,並在部署應用程序時使用選項--jars。
·有關詳細信息,請閱讀「應用程序提交指南」中的「高級依賴關係管理」部分

Supported types for Avro -> Spark SQL conversion

目前,Spark支持在Avro記錄下讀取全部原始類型和複雜類型。

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union See below
·除了上面列出的類型,它還支持讀取聯合類型。
·如下三種類型被視爲基本聯合類型:
·union(int,long)將映射到LongType。
·union(float,double)將映射到DoubleType。
·union(something,null),其中某些東西是任何支持的Avro類型。
·這將被映射到與某事物相同的Spark SQL類型,並將nullable設置爲true。
·全部其餘聯合類型都被認爲是複雜的
·根據union的成員,它們將映射到StructType,其中字段名稱是member0,member1等。
·這與Avro和Parquet之間的轉換行爲一致。
·它還支持讀取如下Avro邏輯類型:
Avro logical type Avro type Spark SQL type
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前,忽略了Avro文件中存在的文檔,別名和其餘屬性。

Supported types for Spark SQL -> Avro conversion(支持的Spark SQL類型 - > Avro轉換)

·Spark支持將全部Spark SQL類型寫入Avro。
·對於大多數類型,從Spark類型到Avro類型的映射很簡單(例如,IntegerType轉換爲int);
·可是,下面列出了一些特殊狀況:
Spark SQL type Avro type Avro logical type
ByteType int  
ShortType int  
BinaryType bytes  
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal
您還可使用選項avroSchema指定整個輸出Avro架構,以即可以將Spark SQL類型轉換爲其餘Avro類型。
·默認狀況下不該用如下轉換,而且須要用戶指定的Avro架構:
Spark SQL type Avro type Avro logical type
BinaryType fixed  
StringType enum  
TimestampType long timestamp-millis
DecimalType bytes decimal

Performance Tuning(性能調優)

對於某些工做負載,能夠經過在內存中緩存數據或打開一些實驗選項來提升性能。

Caching Data In Memory(在內存中緩存數據)

·Spark SQL能夠經過調用spark.catalog.cacheTable(「tableName」)或dataFrame.cache()使用內存中的列式格式來緩存表。
·而後,Spark SQL將僅掃描所需的列,並自動調整壓縮以最小化內存使用和GC壓力。
·您能夠調用spark.catalog.uncacheTable(「tableName」)從內存中刪除該表。
·可使用SparkSession上的setConf方法或使用SQL運行SET key = value命令來完成內存中緩存的配置。
Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true 設置爲true時,Spark SQL將根據數據統計信息自動爲每列選擇壓縮編解碼器
spark.sql.inMemoryColumnarStorage.batchSize 10000
·控制柱狀緩存的批次大小。
·較大的批處理大小能夠提升內存利用率和壓縮率,但在緩存數據時存在OOM風險。

Other Configuration Options(其餘配置選項)

·如下選項也可用於調整查詢執行的性能。
·因爲更多優化會自動執行,所以在未來的版本中可能會棄用這些選項。
Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 讀取文件時打包到單個分區的最大字節數。
spark.sql.files.openCostInBytes 4194304 (4 MB)
·能夠在同一時間掃描經過字節數測量的打開文件的估計成本。
·將多個文件放入分區時使用。
·最好過分估計,而後使用較小文件的分區將比具備較大文件的分區(首先安排的分區)更快
spark.sql.broadcastTimeout 300

廣播鏈接中廣播等待時間的超時(以秒爲單位)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB)
·配置在執行鏈接時將廣播到全部工做節點的表的最大大小(以字節爲單位)。
·經過將此值設置爲-1,能夠禁用廣播。
·請注意,目前僅支持運行命令ANALYZE TABLE COMPUTE STATISTICS noscan的Hive Metastore表的統計信息。
spark.sql.shuffle.partitions 200 配置在爲鏈接或聚合洗牌數據時要使用的分區數。

Broadcast Hint for SQL Queries(SQL查詢的廣播提示)

·BROADCAST提示指導Spark在將其與另外一個表或視圖鏈接時廣播每一個指定的表。
·當Spark決定鏈接方法時,廣播散列鏈接(即BHJ)是首選,即便統計信息高於配置spark.sql.autoBroadcastJoinThreshold。
·指定鏈接的兩端時,Spark會廣播具備較低統計信息的那一方。
·注意Spark並不保證始終選擇BHJ,由於並不是全部狀況(例如全外鏈接)都支持BHJ。
·當選擇廣播嵌套循環鏈接時,咱們仍然尊重提示。
from pyspark.sql.functions import broadcast broadcast(spark.table("src")).join(spark.table("records"), "key").show()

Distributed SQL Engine(分佈式SQL引擎)

·Spark SQL還可使用其JDBC / ODBC或命令行界面充當分佈式查詢引擎。
·在此模式下,最終用戶或應用程序能夠直接與Spark SQL交互以運行SQL查詢,而無需編寫任何代碼

Running the Thrift JDBC/ODBC server(運行Thrift JDBC / ODBC服務器)

·此處實現的Thrift JDBC / ODBC服務器對應於Hive 1.2.1中的HiveServer2。
·您可使用Spark或Hive 1.2.1附帶的beeline腳本測試JDBC服務器。
·要啓動JDBC / ODBC服務器,請在Spark目錄中運行如下命令:
./sbin/start-thriftserver.sh
·此腳本接受全部bin / spark-submit命令行選項,以及--hiveconf選項以指定Hive屬性。
·您能夠運行./sbin/start-thriftserver.sh --help以獲取全部可用選項的完整列表。
·默認狀況下,服務器偵聽localhost:10000。
·您能夠經過任一環境變量覆蓋此行爲,即:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

或系統屬性:

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

如今您可使用beeline來測試Thrift JDBC / ODBC服務器:

./bin/beeline

使用如下方式直接鏈接到JDBC / ODBC服務器:

beeline> !connect jdbc:hive2://localhost:10000
·Beeline會詢問您的用戶名和密碼。
·在非安全模式下,只需在您的計算機上輸入用戶名和空白密碼便可。
·對於安全模式,請按照直線文檔中的說明進行操做。
·經過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf /中來完成Hive的配置。
·您也可使用Hive附帶的beeline腳本。
·Thrift JDBC服務器還支持經過HTTP傳輸發送thrift RPC消息。
·使用如下設置將HTTP模式做爲系統屬性或在conf /中的hive-site.xml文件中啓用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要進行測試,請使用beeline以http模式鏈接到JDBC / ODBC服務器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Running the Spark SQL CLI(運行Spark SQL CLI)

·Spark SQL CLI是一種方便的工具,能夠在本地模式下運行Hive Metastore服務,並執行從命令行輸入的查詢。
·請注意,Spark SQL CLI沒法與Thrift JDBC服務器通訊。
·要啓動Spark SQL CLI,請在Spark目錄中運行如下命令:
./bin/spark-sql
·經過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf /中來完成Hive的配置。
·您能夠運行./bin/spark-sql --help以獲取全部可用選項的完整列表。
相關文章
相關標籤/搜索