pyspark是Spark官方提供的API接口,同時pyspark也是Spark中的一個程序。html
在terminal中輸入pyspark指令,能夠打開python的shell,同時其中默認初始化了SparkConf和SparkContext.java
在編寫Spark應用的.py文件時,能夠經過import pyspark引入該模塊,並經過SparkConf對Spark的啓動參數進行設置。不過,若是你僅完成了Spark的安裝,直接用python指令運行py文件並不能檢索到pyspark模塊。你能夠經過pip等包管理工具安裝該模塊,也能夠直接使用pyspark(新版本已不支持)或spark-submit直接提交.py文件的做業。python
這裏指的是spark中的bin/pyspark,github地址 。git
實際上pyspark只不過解析了命令行中的參數,並進行了python方面的設置,而後調用spark-submitgithub
exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
在較新一些的版本如Spark2.2中,已經不支持用pyspark運行py腳本文件,一切spark做業都應該使用spark-submit提交。算法
Spark是用scala編寫的框架,不過考慮到主要是機器學習的應用場景,Spark官方提供了能夠用python的API。可是,一方面,python的API是不全的,即不是全部的scala的函數均可以用pyspark調用到,雖然新的API也在隨着版本迭代不斷開放;另外一方面,pyspark模塊,對於不少複雜算法,是經過反射機制調用的Spark中JVM里正在運行的scala編寫的類、方法。因此,若是你將頻繁應用spark於業務或研究,建議學習直接使用scala語言編寫程序,而不是python。shell
這篇博客並不會講述如何去使用pyspark來編寫python的spark應用。各種API以及模塊如何使用,你徹底能夠前往官方文檔查看。這裏的連接是最新版pyspark的文檔,若是你的機器上的spark不是最新版,請去找對應版本的pyspark文檔。由於正如我上面所說,不一樣版本的pyspark逐步開放了新的API並有對舊API進行改進,你在最新版本看到的類、函數,不必定能在舊版本使用。這裏一提,對於大部分機器學習算法,你都會看到ml模塊與mllib模塊都提供了接口,它們的區別在於ml模塊接受DataFrame格式的數據而mllib模塊接受RDD格式的數據。apache
關於pyspark底層,這裏主要探索兩個地方。一個是其初始化時的工做,一個是其對JVM中scala代碼的調用api
SparkContext類在pyspark/context.py中,在python代碼裏經過初試化該類的實例來完成Spark的啓動與初始化。這個類的__init__方法中執行了下面幾行代碼bash
self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise
first_spark_call和CallSite方法都是用來獲取JAVA虛擬機中的堆棧,它們在pyspark/traceback_util.py中。
以後調用了類函數_ensure_initialized函數,對Spark的Java的gate_way和jvm進行設置。
最後調用了類中的_do_init_函數,從函數就能夠看出是對內部類成員SparkConf的實例_conf函數進行設置,判斷各參數值是否爲None,非空的話就進行設置,並讀取一些本地的python環境參數,啓動Spark。
以mllib庫爲例,主要邏輯都在pyspark/mllib/common.py中。你去查看mllib模塊中機器學習算法的類與函數,你會發現基本都是使用self.call或者callMLlibFunc,將函數名與參數傳入。
各種模型的Model類都繼承自common.JavaModelWrapper,這個類代碼很短:
class JavaModelWrapper(object): """ Wrapper for the model in JVM """ def __init__(self, java_model): self._sc = SparkContext._active_spark_context self._java_model = java_model def __del__(self): self._sc._gateway.detach(self._java_model) def call(self, name, *a): """Call method of java_model""" return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
_java_model是來自Java或Scala的類的實例,在調用對應的訓練算法時由對應的scala代碼在末尾將這些類初始化並返回,其關鍵的類方法call,同callMLLibFunc方法同樣,都是調用了callJavaFunc的方法。對於調用某一類的方法,是運用python的getattr函數,將類實例與方法名傳入,使用反射機制獲取函數;而對於調用一些不屬於類的方法,即便用callMLLibFunc時,是傳入的PythonMLLibAPI類的實例以及方法名,來獲取函數:
def callMLlibFunc(name, *args): """ Call API in PythonMLLibAPI """ sc = SparkContext.getOrCreate() api = getattr(sc._jvm.PythonMLLibAPI(), name) return callJavaFunc(sc, api, *args)
最終callJavaFunc作的也很簡單,將python的參數*a,使用_py2java方法轉換爲java的數據類型,並執行函數,再將結果使用_java2py方法轉換爲python的數據類型返回:
def callJavaFunc(sc, func, *args): """ Call Java Function """ args = [_py2java(sc, a) for a in args] return _java2py(sc, func(*args))
這裏的_java2py,對不少數據格式的支持不是很好,因此當你嘗試用底層的call方法調用一些pyspark還沒有支持但scala中已經有的函數時,可能在scala部分能夠執行,可是python的返回結果卻不盡如人意。
ml模塊的調用機制與mllib的機制有些許的不一樣,但本質上都仍是去調用在Spark的JVM中scala代碼的class。
本篇博客其實說的很是簡單,pyspark即便是不涉及具體算法的部分,也還有不少內容還沒有討論。這裏僅是對pyspark產生一個初步的認識,同時簡單分析了一下底層對scala的調用過程。你興許會有這樣的疑問--「去看這些源代碼有什麼用呢?好像就算知道這些,實際使用時不仍是用一下API就行了嗎?」。實際上,看源代碼首先的就是知足一下好奇心,對Spark有一個更充分的瞭解;其次關於具體用途,我舉個例子,不少狀況你使用的集羣可能不是最新版本的,由於複雜的配置致使通常而言也不可能有一個新版本就更新一次,這時你想用新版本的API怎麼辦?看了這篇博客想必你也會有一些「大膽的想法」。後一篇博客會舉例說明我在實際工做中相關的一個問題,以及如何利用這些源碼去解決的。