PySpark 的背後原理--在Driver端,經過Py4j實如今Python中調用Java的方法.pyspark.executor 端一個Executor上同時運行多少個Task,就會有多少個...

PySpark 的背後原理

Spark主要是由Scala語言開發,爲了方便和其餘系統集成而不引入scala相關依賴,部分實現使用Java語言開發,例如External Shuffle Service等。整體來講,Spark是由JVM語言實現,會運行在JVM中。然而,Spark除了提供Scala/Java開發接口外,還提供了Python、R等語言的開發接口,爲了保證Spark核心實現的獨立性,Spark僅在外圍作包裝,實現對不一樣語言的開發支持,本文主要介紹Python Spark的實現原理,剖析pyspark應用程序是如何運行起來的。html

Spark運行時架構

首先咱們先回顧下Spark的基本運行時架構,以下圖所示,其中橙色部分表示爲JVM,Spark應用程序運行時主要分爲Driver和Executor,Driver負載整體調度及UI展現,Executor負責Task運行,Spark能夠部署在多種資源管理系統中,例如Yarn、Mesos等,同時Spark自身也實現了一種簡單的Standalone(獨立部署)資源管理系統,能夠不用藉助其餘資源管理系統便可運行。更多細節請參考Spark Scheduler內部原理剖析python

spark-structure

用戶的Spark應用程序運行在Driver上(某種程度上說,用戶的程序就是Spark Driver程序),通過Spark調度封裝成一個個Task,再將這些Task信息發給Executor執行,Task信息包括代碼邏輯以及數據信息,Executor不直接運行用戶的代碼。apache

PySpark運行時架構

爲了避免破壞Spark已有的運行時架構,Spark在外圍包裝一層Python API,藉助Py4j實現Python和Java的交互,進而實現經過Python編寫Spark應用程序,其運行時架構以下圖所示。服務器

pyspark-structure

 

 

Application Properties----python 2.4開始支持限制pyspark executor內存

Property Name Default Meaning
spark.app.name (none) The name of your application. This will appear in the UI and in log data.
spark.driver.cores 1 Number of cores to use for the driver process, only in cluster mode.
spark.driver.maxResultSize 1g Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.
spark.driver.memory 1g Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m2g).
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.
spark.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
spark.executor.memory 1g Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m2g).
spark.executor.pyspark.memory Not set The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows.

 

其中白色部分是新增的Python進程,在Driver端,經過Py4j實如今Python中調用Java的方法,即將用戶寫的PySpark程序」映射」到JVM中,例如,用戶在PySpark中實例化一個Python的SparkContext對象,最終會在JVM中實例化Scala的SparkContext對象;在Executor端,則不須要藉助Py4j,由於Executor端運行的Task邏輯是由Driver發過來的,那是序列化後的字節碼,雖然裏面可能包含有用戶定義的Python函數或Lambda表達式,Py4j並不能實如今Java裏調用Python的方法,爲了能在Executor端運行用戶定義的Python函數或Lambda表達式,則須要爲每一個Task單獨啓一個Python進程,經過socket通訊方式將Python函數或Lambda表達式發給Python進程執行。語言層面的交互整體流程以下圖所示,實線表示方法調用,虛線表示結果返回。架構

pyspark-call

下面分別詳細剖析PySpark的Driver是如何運行起來的以及Executor是如何運行Task的。app

Driver端運行原理

當咱們經過spark-submmit提交pyspark程序,首先會上傳python腳本及依賴,並申請Driver資源,當申請到Driver資源後,會經過PythonRunner(其中有main方法)拉起JVM,以下圖所示。less

pyspark-driver-runtime

PythonRunner入口main函數裏主要作兩件事:機器學習

  • 開啓Py4j GatewayServer
  • 經過Java Process方式運行用戶上傳的Python腳本

用戶Python腳本起來後,首先會實例化Python版的SparkContext對象,在實例化過程當中會作兩件事:socket

  • 實例化Py4j GatewayClient,鏈接JVM中的Py4j GatewayServer,後續在Python中調用Java的方法都是藉助這個Py4j Gateway
  • 經過Py4j Gateway在JVM中實例化SparkContext對象

通過上面兩步後,SparkContext對象初始化完畢,Driver已經起來了,開始申請Executor資源,同時開始調度任務。用戶Python腳本中定義的一系列處理邏輯最終遇到action方法後會觸發Job的提交,提交Job時是直接經過Py4j調用Java的PythonRDD.runJob方法完成,映射到JVM中,會轉給sparkContext.runJob方法,Job運行完成後,JVM中會開啓一個本地Socket等待Python進程拉取,對應地,Python進程在調用PythonRDD.runJob後就會經過Socket去拉取結果。函數

把前面運行時架構圖中Driver部分單獨拉出來,以下圖所示,經過PythonRunner入口main函數拉起JVM和Python進程,JVM進程對應下圖橙色部分,Python進程對應下圖白色部分。Python進程經過Py4j調用Java方法提交Job,Job運行結果經過本地Socket被拉取到Python進程。還有一點是,對於大數據量,例如廣播變量等,Python進程和JVM進程是經過本地文件系統來交互,以減小進程間的數據傳輸。

pyspark-driver

Executor端運行原理

爲了方便闡述,以Spark On Yarn爲例,當Driver申請到Executor資源時,會經過CoarseGrainedExecutorBackend(其中有main方法)拉起JVM,啓動一些必要的服務後等待Driver的Task下發,在尚未Task下發過來時,Executor端是沒有Python進程的。當收到Driver下發過來的Task後,Executor的內部運行過程以下圖所示。

pyspark-executor-runtime

Executor端收到Task後,會經過launchTask運行Task,最後會調用到PythonRDD的compute方法,來處理一個分區的數據,PythonRDD的compute方法的計算流程大體分三步走:

  • 若是不存在pyspark.deamon後臺Python進程,那麼經過Java Process的方式啓動pyspark.deamon後臺進程,注意每一個Executor上只會有一個pyspark.deamon後臺進程,不然,直接經過Socket鏈接pyspark.deamon,請求開啓一個pyspark.worker進程運行用戶定義的Python函數或Lambda表達式。pyspark.deamon是一個典型的多進程服務器,來一個Socket請求,fork一個pyspark.worker進程處理,一個Executor上同時運行多少個Task,就會有多少個對應的pyspark.worker進程。
  • 緊接着會單獨開一個線程,給pyspark.worker進程喂數據,pyspark.worker則會調用用戶定義的Python函數或Lambda表達式處理計算。
  • 在一邊喂數據的過程當中,另外一邊則經過Socket去拉取pyspark.worker的計算結果。

把前面運行時架構圖中Executor部分單獨拉出來,以下圖所示,橙色部分爲JVM進程,白色部分爲Python進程,每一個Executor上有一個公共的pyspark.deamon進程,負責接收Task請求,並fork pyspark.worker進程單獨處理每一個Task,實際數據處理過程當中,pyspark.worker進程和JVM Task會較頻繁地進行本地Socket數據通訊。

pyspark-executor.png

總結

整體上來講,PySpark是藉助Py4j實現Python調用Java,來驅動Spark應用程序,本質上主要仍是JVM runtime,Java到Python的結果返回是經過本地Socket完成。雖然這種架構保證了Spark核心代碼的獨立性,可是在大數據場景下,JVM和Python進程間頻繁的數據通訊致使其性能損耗較多,惡劣時還可能會直接卡死,因此建議對於大規模機器學習或者Streaming應用場景仍是慎用PySpark,儘可能使用原生的Scala/Java編寫應用程序,對於中小規模數據量下的簡單離線任務,可使用PySpark快速部署提交。

轉載請註明出處,本文永久連接:http://sharkdtu.com/posts/pyspark-internal.html

相關文章
相關標籤/搜索