Spark主要是由Scala語言開發,爲了方便和其餘系統集成而不引入scala相關依賴,部分實現使用Java語言開發,例如External Shuffle Service等。整體來講,Spark是由JVM語言實現,會運行在JVM中。然而,Spark除了提供Scala/Java開發接口外,還提供了Python、R等語言的開發接口,爲了保證Spark核心實現的獨立性,Spark僅在外圍作包裝,實現對不一樣語言的開發支持,本文主要介紹Python Spark的實現原理,剖析pyspark應用程序是如何運行起來的。html
首先咱們先回顧下Spark的基本運行時架構,以下圖所示,其中橙色部分表示爲JVM,Spark應用程序運行時主要分爲Driver和Executor,Driver負載整體調度及UI展現,Executor負責Task運行,Spark能夠部署在多種資源管理系統中,例如Yarn、Mesos等,同時Spark自身也實現了一種簡單的Standalone(獨立部署)資源管理系統,能夠不用藉助其餘資源管理系統便可運行。更多細節請參考Spark Scheduler內部原理剖析。python
用戶的Spark應用程序運行在Driver上(某種程度上說,用戶的程序就是Spark Driver程序),通過Spark調度封裝成一個個Task,再將這些Task信息發給Executor執行,Task信息包括代碼邏輯以及數據信息,Executor不直接運行用戶的代碼。服務器
爲了避免破壞Spark已有的運行時架構,Spark在外圍包裝一層Python API,藉助Py4j實現Python和Java的交互,進而實現經過Python編寫Spark應用程序,其運行時架構以下圖所示。架構
其中白色部分是新增的Python進程,在Driver端,經過Py4j實如今Python中調用Java的方法,即將用戶寫的PySpark程序」映射」到JVM中,例如,用戶在PySpark中實例化一個Python的SparkContext對象,最終會在JVM中實例化Scala的SparkContext對象;在Executor端,則不須要藉助Py4j,由於Executor端運行的Task邏輯是由Driver發過來的,那是序列化後的字節碼,雖然裏面可能包含有用戶定義的Python函數或Lambda表達式,Py4j並不能實如今Java裏調用Python的方法,爲了能在Executor端運行用戶定義的Python函數或Lambda表達式,則須要爲每一個Task單獨啓一個Python進程,經過socket通訊方式將Python函數或Lambda表達式發給Python進程執行。語言層面的交互整體流程以下圖所示,實線表示方法調用,虛線表示結果返回。機器學習
下面分別詳細剖析PySpark的Driver是如何運行起來的以及Executor是如何運行Task的。socket
當咱們經過spark-submmit提交pyspark程序,首先會上傳python腳本及依賴,並申請Driver資源,當申請到Driver資源後,會經過PythonRunner(其中有main方法)拉起JVM,以下圖所示。函數
PythonRunner入口main函數裏主要作兩件事:post
用戶Python腳本起來後,首先會實例化Python版的SparkContext對象,在實例化過程當中會作兩件事:性能
通過上面兩步後,SparkContext對象初始化完畢,Driver已經起來了,開始申請Executor資源,同時開始調度任務。用戶Python腳本中定義的一系列處理邏輯最終遇到action方法後會觸發Job的提交,提交Job時是直接經過Py4j調用Java的PythonRDD.runJob方法完成,映射到JVM中,會轉給sparkContext.runJob方法,Job運行完成後,JVM中會開啓一個本地Socket等待Python進程拉取,對應地,Python進程在調用PythonRDD.runJob後就會經過Socket去拉取結果。學習
把前面運行時架構圖中Driver部分單獨拉出來,以下圖所示,經過PythonRunner入口main函數拉起JVM和Python進程,JVM進程對應下圖橙色部分,Python進程對應下圖白色部分。Python進程經過Py4j調用Java方法提交Job,Job運行結果經過本地Socket被拉取到Python進程。還有一點是,對於大數據量,例如廣播變量等,Python進程和JVM進程是經過本地文件系統來交互,以減小進程間的數據傳輸。
爲了方便闡述,以Spark On Yarn爲例,當Driver申請到Executor資源時,會經過CoarseGrainedExecutorBackend(其中有main方法)拉起JVM,啓動一些必要的服務後等待Driver的Task下發,在尚未Task下發過來時,Executor端是沒有Python進程的。當收到Driver下發過來的Task後,Executor的內部運行過程以下圖所示。
Executor端收到Task後,會經過launchTask運行Task,最後會調用到PythonRDD的compute方法,來處理一個分區的數據,PythonRDD的compute方法的計算流程大體分三步走:
把前面運行時架構圖中Executor部分單獨拉出來,以下圖所示,橙色部分爲JVM進程,白色部分爲Python進程,每一個Executor上有一個公共的pyspark.deamon進程,負責接收Task請求,並fork pyspark.worker進程單獨處理每一個Task,實際數據處理過程當中,pyspark.worker進程和JVM Task會較頻繁地進行本地Socket數據通訊。
整體上來講,PySpark是藉助Py4j實現Python調用Java,來驅動Spark應用程序,本質上主要仍是JVM runtime,Java到Python的結果返回是經過本地Socket完成。雖然這種架構保證了Spark核心代碼的獨立性,可是在大數據場景下,JVM和Python進程間頻繁的數據通訊致使其性能損耗較多,惡劣時還可能會直接卡死,因此建議對於大規模機器學習或者Streaming應用場景仍是慎用PySpark,儘可能使用原生的Scala/Java編寫應用程序,對於中小規模數據量下的簡單離線任務,可使用PySpark快速部署提交。
http://sharkdtu.com/posts/pyspark-internal.html