pyspark底層淺析

pyspark底層淺析

pyspark簡介

pyspark是Spark官方提供的API接口,同時pyspark也是Spark中的一個程序。html

在terminal中輸入pyspark指令,能夠打開python的shell,同時其中默認初始化了SparkConf和SparkContext.java

在編寫Spark應用的.py文件時,能夠經過import pyspark引入該模塊,並經過SparkConf對Spark的啓動參數進行設置。不過,若是你僅完成了Spark的安裝,直接用python指令運行py文件並不能檢索到pyspark模塊。你能夠經過pip等包管理工具安裝該模塊,也能夠直接使用pyspark(新版本已不支持)或spark-submit直接提交.py文件的做業。python

pyspark program

這裏指的是spark中的bin/pyspark,github地址git

實際上pyspark只不過解析了命令行中的參數,並進行了python方面的設置,而後調用spark-submitgithub

exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"

在較新一些的版本如Spark2.2中,已經不支持用pyspark運行py腳本文件,一切spark做業都應該使用spark-submit提交。算法

pyspark module

Spark是用scala編寫的框架,不過考慮到主要是機器學習的應用場景,Spark官方提供了能夠用python的API。可是,一方面,python的API是不全的,即不是全部的scala的函數均可以用pyspark調用到,雖然新的API也在隨着版本迭代不斷開放;另外一方面,pyspark模塊,對於不少複雜算法,是經過反射機制調用的Spark中JVM里正在運行的scala編寫的類、方法。因此,若是你將頻繁應用spark於業務或研究,建議學習直接使用scala語言編寫程序,而不是python。shell

這篇博客並不會講述如何去使用pyspark來編寫python的spark應用。各種API以及模塊如何使用,你徹底能夠前往官方文檔查看。這裏的連接是最新版pyspark的文檔,若是你的機器上的spark不是最新版,請去找對應版本的pyspark文檔。由於正如我上面所說,不一樣版本的pyspark逐步開放了新的API並有對舊API進行改進,你在最新版本看到的類、函數,不必定能在舊版本使用。這裏一提,對於大部分機器學習算法,你都會看到ml模塊與mllib模塊都提供了接口,它們的區別在於ml模塊接受DataFrame格式的數據而mllib模塊接受RDD格式的數據。apache

關於pyspark底層,這裏主要探索兩個地方。一個是其初始化時的工做,一個是其對JVM中scala代碼的調用api

SparkContext

SparkContext類在pyspark/context.py中,在python代碼裏經過初試化該類的實例來完成Spark的啓動與初始化。這個類的__init__方法中執行了下面幾行代碼bash

self._callsite = first_spark_call() or CallSite(None, None, None)
        SparkContext._ensure_initialized(self, gateway=gateway)
        try:
            self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
                          conf, jsc, profiler_cls)
        except:
            # If an error occurs, clean up in order to allow future SparkContext creation:
            self.stop()
            raise

first_spark_call和CallSite方法都是用來獲取JAVA虛擬機中的堆棧,它們在pyspark/traceback_util.py中。

以後調用了類函數_ensure_initialized函數,對Spark的Java的gate_way和jvm進行設置。
最後調用了類中的_do_init_函數,從函數就能夠看出是對內部類成員SparkConf的實例_conf函數進行設置,判斷各參數值是否爲None,非空的話就進行設置,並讀取一些本地的python環境參數,啓動Spark。

調用JVM類與方法

以mllib庫爲例,主要邏輯都在pyspark/mllib/common.py中。你去查看mllib模塊中機器學習算法的類與函數,你會發現基本都是使用self.call或者callMLlibFunc,將函數名與參數傳入。

各種模型的Model類都繼承自common.JavaModelWrapper,這個類代碼很短:

class JavaModelWrapper(object):
    """
    Wrapper for the model in JVM
    """
    def __init__(self, java_model):
        self._sc = SparkContext._active_spark_context
        self._java_model = java_model

    def __del__(self):
        self._sc._gateway.detach(self._java_model)

    def call(self, name, *a):
        """Call method of java_model"""
        return callJavaFunc(self._sc, getattr(self._java_model, name), *a)

_java_model是來自Java或Scala的類的實例,在調用對應的訓練算法時由對應的scala代碼在末尾將這些類初始化並返回,其關鍵的類方法call,同callMLLibFunc方法同樣,都是調用了callJavaFunc的方法。對於調用某一類的方法,是運用python的getattr函數,將類實例與方法名傳入,使用反射機制獲取函數;而對於調用一些不屬於類的方法,即便用callMLLibFunc時,是傳入的PythonMLLibAPI類的實例以及方法名,來獲取函數:

def callMLlibFunc(name, *args):
    """ Call API in PythonMLLibAPI """
    sc = SparkContext.getOrCreate()
    api = getattr(sc._jvm.PythonMLLibAPI(), name)
    return callJavaFunc(sc, api, *args)

最終callJavaFunc作的也很簡單,將python的參數*a,使用_py2java方法轉換爲java的數據類型,並執行函數,再將結果使用_java2py方法轉換爲python的數據類型返回:

def callJavaFunc(sc, func, *args):
    """ Call Java Function """
    args = [_py2java(sc, a) for a in args]
    return _java2py(sc, func(*args))

這裏的_java2py,對不少數據格式的支持不是很好,因此當你嘗試用底層的call方法調用一些pyspark還沒有支持但scala中已經有的函數時,可能在scala部分能夠執行,可是python的返回結果卻不盡如人意。

ml模塊的調用機制與mllib的機制有些許的不一樣,但本質上都仍是去調用在Spark的JVM中scala代碼的class。

總結

本篇博客其實說的很是簡單,pyspark即便是不涉及具體算法的部分,也還有不少內容還沒有討論。這裏僅是對pyspark產生一個初步的認識,同時簡單分析了一下底層對scala的調用過程。你興許會有這樣的疑問--「去看這些源代碼有什麼用呢?好像就算知道這些,實際使用時不仍是用一下API就行了嗎?」。實際上,看源代碼首先的就是知足一下好奇心,對Spark有一個更充分的瞭解;其次關於具體用途,我舉個例子,不少狀況你使用的集羣可能不是最新版本的,由於複雜的配置致使通常而言也不可能有一個新版本就更新一次,這時你想用新版本的API怎麼辦?看了這篇博客想必你也會有一些「大膽的想法」。後一篇博客會舉例說明我在實際工做中相關的一個問題,以及如何利用這些源碼去解決的。

相關文章
相關標籤/搜索