Spark 編程指南 (一) [Spark Programming Guide]

Python Programming Guide - Spark(Python)html

Spark應用基本概念

每個運行在cluster上的spark應用程序,是由一個運行main函數的driver program和運行多種並行操做的executes組成python

其中spark的核心是彈性分佈式數據集(Resilient Distributed Dataset—RDD)apache

  • Resilient(彈性):易變化、易計算app

  • Distributed(分佈式):可橫跨多臺機器,集羣分佈maven

  • Dataset(數據集):大批量數據的集合分佈式

<!-- more -->ide

RDD基本概念

RDD是邏輯集中的實體,表明一個分區的只讀數據集,不可發生改變函數

【RDD的重要內部屬性】單元測試

  • 分區列表(partitions)
    對於一個RDD而言,分區的多少涉及對這個RDD並行計算的粒度,每個RDD分區的計算都會在一個單獨的任務中執行,每個分區對應一個Task,分區後的數據存放在內存當中測試

  • 計算每一個分區的函數(compute)
    對於Spark中每一個RDD都是以分區進行計算的,而且每一個分區的compute函數是在對迭代器進行復合操做,不須要每次計算,直到提交動做觸發纔會將以前全部的迭代操做進行計算,lineage在容錯中有重要做用

  • 對父級RDD的依賴(dependencies)
    因爲RDD存在轉換關係,因此新生成的RDD對上一個RDD有依賴關係,RDD之間經過lineage產生依賴關係

【窄依賴】
每個父RDD的分區最多隻被子RDD的一個分區所使用,能夠相似於流水線同樣,計算全部父RDD的分區;在節點計算失敗的恢復上也更有效,能夠直接計算其父RDD的分區,還能夠進行並行計算

子RDD的每一個分區依賴於常數個父分區(即與數據規模無關)
輸入輸出一對一的算子,且結果RDD的分區結構不變,主要是map、flatmap
輸入輸出一對一,但結果RDD的分區結構發生了變化,如union、coalesce
從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample

【寬依賴】
多個子RDD的分區會依賴於同一個父RDD的分區,須要取得其父RDD的全部分區數據進行計算,而一個節點的計算失敗,將會致使其父RDD上多個分區從新計算

子RDD的每一個分區依賴於全部父RDD分區
對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey
對兩個RDD基於key進行jion和重組,如jion

  • 對key-value數據類型RDD的分區器,控制分區策略和分區數(partitioner)
    partitioner就是RDD的分區函數,即HashPartitioner(哈希分區)和RangePartitioner(區域分區),分區函數決定了每一個RDD的分區策略和分區數,而且這個函數只在(k-v)類型的RDD中存在,在非(k-v)結構的RDD中是None

  • 每一個數據分區的地址列表(preferredLocations)
    與Spark中的調度相關,返回的是此RDD的每一個partition所出儲存的位置,按照「移動數據不如移動計算」的理念,在spark進行任務調度的時候,儘量將任務分配到數據塊所存儲的位置

  • 控制操做(control operation)
    spark中對RDD的持久化操做是很重要的,能夠將RDD存放在不一樣的存儲介質中,方便後續的操做能夠重複使用。

主要有cache、persist、checkpoint,checkpoint接口是將RDD持久化到HDFS中,與persist的區別是checkpoint會切斷此RDD以前的依賴關係,而persist會保留依賴關係。checkpoint的兩大做用:一是spark程序長期駐留,過長的依賴會佔用不少的系統資源,按期checkpoint能夠有效的節省資源;二是維護過長的依賴關係可能會出現問題,一旦spark程序運行失敗,RDD的容錯成本會很高

Python鏈接Spark

Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用標準的CPython解釋器, 因此像NumPy這樣的C語言類庫也可使用,一樣也支持PyPy 2.3+

能夠用spark目錄裏的bin/spark-submit腳本在python中運行spark應用程序,這個腳本能夠加載Java/Scala類庫,讓你提交應用程序到集羣當中。你也可使用bin/pyspark腳本去啓動python交互界面

若是你但願訪問HDFS上的數據集,你須要創建對應HDFS版本的PySpark鏈接。

最後,你的程序須要import一些spark類庫:

from pyspark import SparkContext, SparkConf

PySpark 要求driver和workers須要相同的python版本,它一般引用環境變量PATH默認的python版本;你也能夠本身指定PYSPARK_PYTHON所用的python版本,例如:

PYSPARK_PYTHON=python3.4 bin/pyspark
PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

初始化Spark

一個Spark應用程序的第一件事就是去建立SparkContext對象,它的做用是告訴Spark如何創建一個集羣。建立SparkContext以前,先要建立SparkConf對象,SparkConf包含了應用程序的相關信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
  • appName:應用的名稱,用戶顯示在集羣UI上

  • master:Spark、Mesos或者YARN集羣的URL,若是是本地運行,則應該是特殊的'local'字符串

在實際運行時,你不會講master參數寫死在程序代碼裏,而是經過spark-submit來獲取這個參數;在本地測試和單元測試中,你仍然須要'local'去運行Spark應用程序

使用Shell

在PySpark Shell中,一個特殊SparkContext已經幫你建立好了,變量名是:sc,然而在Shell中建立你本身的SparkContext是不起做用的。

你能夠經過--master參數設置master所鏈接的上下文主機;你也能夠經過--py-files參數傳遞一個用逗號做爲分割的列表,將Python中的.zip、.egg、.py等文件添加到運行路徑當中;你一樣能夠經過--packages參數,傳遞一個用逗號分割的maven列表,來個這個Shell會話添加依賴(例如Spark的包)

任何額外的包含依賴的倉庫(如SonaType),均可以經過--repositories參數添加進來。
Spark中全部的Python依賴(requirements.txt的依賴包列表),在必要時都必須經過pip手動安裝

例如用4個核來運行bin/pyspark:

./bin/pyspark --master local[4]

或者,將code.py添加到搜索路徑中(爲了後面能夠import):

./bin/pyspark --master local[4] --py-files code.py

經過運行pyspark --help來查看完整的操做幫助信息,在這種狀況下,pyspark會調用一個通用的spark-submit腳本

在IPython這樣加強Python解釋器中,也能夠運行PySpark Shell;支持IPython 1.0.0+;在利用IPython運行bin/pyspark時,必須將PYSPARK_DRIVER_PYTHON變量設置成ipython:

PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

你能夠經過PYSPARK_DRIVER_PYTHON_OPTS參數來本身定製ipython命令,好比在IPython Notebook中開啓PyLab圖形支持:

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark

參考:Spark Programming Guide 官方文檔

原博連接,請註明出處。

相關文章
相關標籤/搜索