Python Programming Guide - Spark(Python)html
每個運行在cluster上的spark應用程序,是由一個運行main函數的driver program和運行多種並行操做的executes組成python
其中spark的核心是彈性分佈式數據集(Resilient Distributed Dataset—RDD)apache
Resilient(彈性):易變化、易計算app
Distributed(分佈式):可橫跨多臺機器,集羣分佈maven
Dataset(數據集):大批量數據的集合分佈式
<!-- more -->ide
RDD是邏輯集中的實體,表明一個分區的只讀數據集,不可發生改變函數
【RDD的重要內部屬性】單元測試
分區列表(partitions)
對於一個RDD而言,分區的多少涉及對這個RDD並行計算的粒度,每個RDD分區的計算都會在一個單獨的任務中執行,每個分區對應一個Task,分區後的數據存放在內存當中測試
計算每一個分區的函數(compute)
對於Spark中每一個RDD都是以分區進行計算的,而且每一個分區的compute函數是在對迭代器進行復合操做,不須要每次計算,直到提交動做觸發纔會將以前全部的迭代操做進行計算,lineage在容錯中有重要做用
對父級RDD的依賴(dependencies)
因爲RDD存在轉換關係,因此新生成的RDD對上一個RDD有依賴關係,RDD之間經過lineage產生依賴關係
【窄依賴】
每個父RDD的分區最多隻被子RDD的一個分區所使用,能夠相似於流水線同樣,計算全部父RDD的分區;在節點計算失敗的恢復上也更有效,能夠直接計算其父RDD的分區,還能夠進行並行計算子RDD的每一個分區依賴於常數個父分區(即與數據規模無關)
輸入輸出一對一的算子,且結果RDD的分區結構不變,主要是map、flatmap
輸入輸出一對一,但結果RDD的分區結構發生了變化,如union、coalesce
從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample【寬依賴】
多個子RDD的分區會依賴於同一個父RDD的分區,須要取得其父RDD的全部分區數據進行計算,而一個節點的計算失敗,將會致使其父RDD上多個分區從新計算子RDD的每一個分區依賴於全部父RDD分區
對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey
對兩個RDD基於key進行jion和重組,如jion
對key-value數據類型RDD的分區器,控制分區策略和分區數(partitioner)
partitioner就是RDD的分區函數,即HashPartitioner(哈希分區)和RangePartitioner(區域分區),分區函數決定了每一個RDD的分區策略和分區數,而且這個函數只在(k-v)類型的RDD中存在,在非(k-v)結構的RDD中是None
每一個數據分區的地址列表(preferredLocations)
與Spark中的調度相關,返回的是此RDD的每一個partition所出儲存的位置,按照「移動數據不如移動計算」的理念,在spark進行任務調度的時候,儘量將任務分配到數據塊所存儲的位置
控制操做(control operation)
spark中對RDD的持久化操做是很重要的,能夠將RDD存放在不一樣的存儲介質中,方便後續的操做能夠重複使用。
主要有cache、persist、checkpoint,checkpoint接口是將RDD持久化到HDFS中,與persist的區別是checkpoint會切斷此RDD以前的依賴關係,而persist會保留依賴關係。checkpoint的兩大做用:一是spark程序長期駐留,過長的依賴會佔用不少的系統資源,按期checkpoint能夠有效的節省資源;二是維護過長的依賴關係可能會出現問題,一旦spark程序運行失敗,RDD的容錯成本會很高
Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用標準的CPython解釋器, 因此像NumPy這樣的C語言類庫也可使用,一樣也支持PyPy 2.3+
能夠用spark目錄裏的bin/spark-submit腳本在python中運行spark應用程序,這個腳本能夠加載Java/Scala類庫,讓你提交應用程序到集羣當中。你也可使用bin/pyspark腳本去啓動python交互界面
若是你但願訪問HDFS上的數據集,你須要創建對應HDFS版本的PySpark鏈接。
最後,你的程序須要import一些spark類庫:
from pyspark import SparkContext, SparkConf
PySpark 要求driver和workers須要相同的python版本,它一般引用環境變量PATH默認的python版本;你也能夠本身指定PYSPARK_PYTHON所用的python版本,例如:
PYSPARK_PYTHON=python3.4 bin/pyspark PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
一個Spark應用程序的第一件事就是去建立SparkContext對象,它的做用是告訴Spark如何創建一個集羣。建立SparkContext以前,先要建立SparkConf對象,SparkConf包含了應用程序的相關信息。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
appName:應用的名稱,用戶顯示在集羣UI上
master:Spark、Mesos或者YARN集羣的URL,若是是本地運行,則應該是特殊的'local'字符串
在實際運行時,你不會講master參數寫死在程序代碼裏,而是經過spark-submit來獲取這個參數;在本地測試和單元測試中,你仍然須要'local'去運行Spark應用程序
在PySpark Shell中,一個特殊SparkContext已經幫你建立好了,變量名是:sc,然而在Shell中建立你本身的SparkContext是不起做用的。
你能夠經過--master參數設置master所鏈接的上下文主機;你也能夠經過--py-files參數傳遞一個用逗號做爲分割的列表,將Python中的.zip、.egg、.py等文件添加到運行路徑當中;你一樣能夠經過--packages參數,傳遞一個用逗號分割的maven列表,來個這個Shell會話添加依賴(例如Spark的包)
任何額外的包含依賴的倉庫(如SonaType),均可以經過--repositories參數添加進來。
Spark中全部的Python依賴(requirements.txt的依賴包列表),在必要時都必須經過pip手動安裝
例如用4個核來運行bin/pyspark:
./bin/pyspark --master local[4]
或者,將code.py添加到搜索路徑中(爲了後面能夠import):
./bin/pyspark --master local[4] --py-files code.py
經過運行pyspark --help來查看完整的操做幫助信息,在這種狀況下,pyspark會調用一個通用的spark-submit腳本
在IPython這樣加強Python解釋器中,也能夠運行PySpark Shell;支持IPython 1.0.0+;在利用IPython運行bin/pyspark時,必須將PYSPARK_DRIVER_PYTHON變量設置成ipython:
PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
你能夠經過PYSPARK_DRIVER_PYTHON_OPTS參數來本身定製ipython命令,好比在IPython Notebook中開啓PyLab圖形支持:
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
參考:Spark Programming Guide 官方文檔
原博連接,請註明出處。