PySpark啓動過程解密

原文做者:李海強,來自平安銀行零售大數據團隊
java



前言



做爲數據工程師,你可能會碰到過不少種啓動PySpark的方法,可能搞不懂這些方法有什麼共同點、有什麼區別,不一樣的方法對程序開發、部署有什麼影響,今天咱們一塊兒分析一下這些啓動PySpark的方法。python


如下代碼分析都是基於spark-2.4.4版本展開的,爲了不歧義,務必對照這個版本的Spark深刻理解web


啓動PySpark的方法





啓動PySpark代碼分析




下面咱們分別來分析一下三種方法的代碼實現過程。 sql

/path/to/spark-submit python_file.pyshell


1. spark-submit是一個shell腳本apache

2. spark-submit調用shell命令spark-class org.apache.spark.deploy.SparkSubmit python_file.pywindows

3. spark-class,line 71,執行jvm org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit python_file.py重寫SparkSubmit參數api

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")


4.深刻分析一下org.apache.spark.launcher.Main如何重寫SparkSubmit參數,能夠看到buildCommand分三種狀況,分別對應三種不一樣的場景,PySpark shell、Spark R shell、Spark submit,場景對用不一樣的class微信

 /**
   * This constructor is used when invoking spark-submit; it parses and validates arguments
   * provided by 
the user on the command line.
   */

  SparkSubmitCommandBuilder(List<String> args) {
    this.allowsMixedArguments = false;
    this.parsedArgs = new ArrayList<>();
    boolean isExample = false;
    List<String> submitArgs = args;
    this.userArgs = Collections.emptyList();

    if (args.size() > 0) {
      switch (args.get(0)) {
        case PYSPARK_SHELL:
          this.allowsMixedArguments = true;
          appResource = PYSPARK_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case SPARKR_SHELL:
          this.allowsMixedArguments = true;
          appResource = SPARKR_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case RUN_EXAMPLE:
          isExample = true;
          appResource = SparkLauncher.NO_RESOURCE;
          submitArgs = args.subList(1, args.size());
      }

      this.isExample = isExample;
      OptionParser parser = new OptionParser(true);
      parser.parse(submitArgs);
      this.isSpecialCommand = parser.isSpecialCommand;
    } else {
      this.isExample = isExample;
      this.isSpecialCommand = true;
    }
  }

  @Override
  public List<String> buildCommand(Map<StringString> env)
      throws IOException, IllegalArgumentException {
    if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
      return buildPySparkShellCommand(env);
    } else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
      return buildSparkRCommand(env);
    } else {
      return buildSparkSubmitCommand(env);
    }
  }



5. 這裏buildCommand返回的class是org.apache.spark.deploy.SparkSubmit,參數是python_file.pysession

6. 由於SparkSubmit的參數是.py文件,因此選擇class org.apache.spark.deploy.PythonRunner


最後看一下PythonRunner的實現,首先建立一個py4j.GatewayServer的線程,用於接收python發起的請求,而後起一個子進程執行用戶的python代碼python_file.py,python_file.py會經過py4j發起各類Spark操做,就如上篇文章[PySpark工做原理]提到的。

/**
 * A main class used to launch Python applications. It executes python as a
 * subprocess and then has it connect back to the JVM to access system properties, etc.
 */

object PythonRunner {
  def main(args: Array[String]) {
    val pythonFile = args(0)
    val pyFiles = args(1)
    val otherArgs = args.slice(2, args.length)
    val sparkConf = new SparkConf()
    val secret = Utils.createSecret(sparkConf)
    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
      .orElse(sparkConf.get(PYSPARK_PYTHON))
      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
      .orElse(sys.env.get("PYSPARK_PYTHON"))
      .getOrElse("python")

    // Format python file paths before adding them to the PYTHONPATH
    val formattedPythonFile = formatPath(pythonFile)
    val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))

    // Launch a Py4J gateway server for the process to connect to; this will let it see our
    // Java system properties and such
    val localhost = InetAddress.getLoopbackAddress()
    val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
      .authToken(secret)
      .javaPort(0)
      .javaAddress(localhost)
      .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
      .build()
    val thread = new Thread(new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions {
        gatewayServer.start()
      }
    })
    thread.setName("py4j-gateway-init")
    thread.setDaemon(true)
    thread.start()

    // Wait until the gateway server has started, so that we know which port is it bound to.
    // `gatewayServer.start()` will start a new thread and run the server code there, after
    // initializing the socket, so the thread started above will end as soon as the server is
    // ready to serve connections.
    thread.join()

    // Build up a PYTHONPATH that includes the Spark assembly (where this class is), the
    // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
    val pathElements = new ArrayBuffer[String]
    pathElements ++= formattedPyFiles
    pathElements += PythonUtils.sparkPythonPath
    pathElements += sys.env.getOrElse("PYTHONPATH""")
    val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

    // Launch Python process
    val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
    val env = builder.environment()
    env.put("PYTHONPATH", pythonPath)
    // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
    env.put("PYTHONUNBUFFERED""YES"// value is needed to be set to a non-empty string
    env.put("PYSPARK_GATEWAY_PORT""" + gatewayServer.getListeningPort)
    env.put("PYSPARK_GATEWAY_SECRET", secret)
    // pass conf spark.pyspark.python to python process, the only way to pass info to
    // python process is through environment variable.
    sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
    sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
    builder.redirectErrorStream(true// Ugly but needed for stdout and stderr to synchronize
    try {
      val process = builder.start()

      new RedirectThread(process.getInputStream, System.out"redirect output").start()

      val exitCode = process.waitFor()
      if (exitCode != 0) {
        throw new SparkUserAppException(exitCode)
      }
    } finally {
      gatewayServer.shutdown()
    }
  }



/path/to/python python_file

1. 直接執行python python_file.py

2. 調用SparkContext._ensure_initialized來初始化Spark Context(第2步),調用launch_gateway建立Spark py4j.GatewayServer實例,其實最終是起一個子進程執行spark-submit pyspark-shell(第3步)

    @classmethod
    def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
        """
        Checks whether a SparkContext is initialized or not.
        Throws error if a SparkContext is already running.
        """

        with SparkContext._lock:
            if not SparkContext._gateway:
                SparkContext._gateway = gateway or launch_gateway(conf)
                SparkContext._jvm = SparkContext._gateway.jvm
def _launch_gateway(conf=None, insecure=False):
    """
    launch jvm gateway
    :param conf: spark configuration passed to spark-submit
    :param insecure: True to create an insecure gateway; only for testing
    :return: a JVM gateway
    """

    if insecure and os.environ.get("SPARK_TESTING""0") != "1":
        raise ValueError("creating insecure gateways is only for testing")
    if "PYSPARK_GATEWAY_PORT" in os.environ:
        gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
        gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
    else:
        SPARK_HOME = _find_spark_home()
        # Launch the Py4j gateway using Spark's run command so that we pick up the
        # proper classpath and settings from spark-env.sh
        on_windows = platform.system() == "Windows"
        script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
        command = [os.path.join(SPARK_HOME, script)]
        if conf:
            for k, v in conf.getAll():
                command += ['--conf''%s=%s' % (k, v)]
        submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS""pyspark-shell")
        if os.environ.get("SPARK_TESTING"):
            submit_args = ' '.join([
                "--conf spark.ui.enabled=false",
                submit_args
            ])
        command = command + shlex.split(submit_args)

        # Create a temporary directory where the gateway server should write the connection
        # information.
        conn_info_dir = tempfile.mkdtemp()
        try:
            fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
            os.close(fd)
            os.unlink(conn_info_file)

            env = dict(os.environ)
            env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
            if insecure:
                env["_PYSPARK_CREATE_INSECURE_GATEWAY"] = "1"

            # Launch the Java gateway.
            # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
            if not on_windows:
                # Don't send ctrl-c / SIGINT to the Java gateway:
                def preexec_func():
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
            else:
                # preexec_fn not supported on Windows
                proc = Popen(command, stdin=PIPE, env=env)

            # Wait for the file to appear, or for the process to exit, whichever happens first.
            while not proc.poll() and not os.path.isfile(conn_info_file):
                time.sleep(0.1)

            if not os.path.isfile(conn_info_file):
                raise Exception("Java gateway process exited before sending its port number")

            with open(conn_info_file, "rb"as info:
                gateway_port = read_int(info)
                gateway_secret = UTF8Deserializer().loads(info)
        finally:
            shutil.rmtree(conn_info_dir)

        # In Windows, ensure the Java child processes do not linger after Python has exited.
        # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
        # the parent process' stdin sends an EOF). In Windows, however, this is not possible
        # because java.lang.Process reads directly from the parent process' stdin, contending
        # with any opportunity to read an EOF from the parent. Note that this is only best
        # effort and will not take effect if the python process is violently terminated.
        if on_windows:
            # In Windows, the child process here is "spark-submit.cmd", not the JVM itself
            # (because the UNIX "exec" command is not available). This means we cannot simply
            # call proc.kill(), which kills only the "spark-submit.cmd" process but not the
            # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
            # child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
            def killChild():
                Popen(["cmd""/c""taskkill""/f""/t""/pid", str(proc.pid)])
            atexit.register(killChild)

    # Connect to the gateway
    gateway_params = GatewayParameters(port=gateway_port, auto_convert=True)
    if not insecure:
        gateway_params.auth_token = gateway_secret
    gateway = JavaGateway(gateway_parameters=gateway_params)

    # Import the classes used by PySpark
    java_import(gateway.jvm, "org.apache.spark.SparkConf")
    java_import(gateway.jvm, "org.apache.spark.api.java.*")
    java_import(gateway.jvm, "org.apache.spark.api.python.*")
    java_import(gateway.jvm, "org.apache.spark.ml.python.*")
    java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
    # TODO(davies): move into sql
    java_import(gateway.jvm, "org.apache.spark.sql.*")
    java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
    java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
    java_import(gateway.jvm, "scala.Tuple2")

    return gateway


接下來的過程和第一種方法相似,這回選擇的class是org.apache.spark.api.python.PythonGatewayServer,咱們來看一下代碼,就是起一個py4j.GatewayServer,處理python端發起的請求

/**
 * Process that starts a Py4J GatewayServer on an ephemeral port.
 *
 * This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py).
 */

private[spark] object PythonGatewayServer extends Logging {
  initializeLogIfNecessary(true)

  def main(args: Array[String]): Unit = {
    val secret = Utils.createSecret(new SparkConf())

    // Start a GatewayServer on an ephemeral port. Make sure the callback client is configured
    // with the same secret, in case the app needs callbacks from the JVM to the underlying
    // python processes.
    val localhost = InetAddress.getLoopbackAddress()
    val builder = new GatewayServer.GatewayServerBuilder()
      .javaPort(0)
      .javaAddress(localhost)
      .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
    if (sys.env.getOrElse("_PYSPARK_CREATE_INSECURE_GATEWAY""0") != "1") {
      builder.authToken(secret)
    } else {
      assert(sys.env.getOrElse("SPARK_TESTING""0") == "1",
        "Creating insecure Java gateways only allowed for testing")
    }
    val gatewayServer: GatewayServer = builder.build()

    gatewayServer.start()
    val boundPort: Int = gatewayServer.getListeningPort
    if (boundPort == -1) {
      logError("GatewayServer failed to bind; exiting")
      System.exit(1)
    } else {
      logDebug(s"Started PythonGatewayServer on port $boundPort")
    }

    // Communicate the connection information back to the python process by writing the
    // information in the requested file. This needs to match the read side in java_gateway.py.
    val connectionInfoPath = new File(sys.env("_PYSPARK_DRIVER_CONN_INFO_PATH"))
    val tmpPath = Files.createTempFile(connectionInfoPath.getParentFile().toPath(),
      "connection"".info").toFile()

    val dos = new DataOutputStream(new FileOutputStream(tmpPath))
    dos.writeInt(boundPort)

    val secretBytes = secret.getBytes(UTF_8)
    dos.writeInt(secretBytes.length)
    dos.write(secretBytes, 0, secretBytes.length)
    dos.close()

    if (!tmpPath.renameTo(connectionInfoPath)) {
      logError(s"Unable to write connection information to $connectionInfoPath.")
      System.exit(1)
    }

    // Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies:
    while (System.in.read() != -1) {
      // Do nothing
    }
    logDebug("Exiting due to broken pipe from Python driver")
    System.exit(0)
  }
}


/path/to/pyspark

1. pyspark是個shell腳本

2. 1會調用另一個shell命令spark-submit pyspark-shell-main

3. 2又會調用另一個shell命令spark-class

4. 3裏面會執行一個java class,org.apache.spark.launcher.Main重寫SparkSubmit參數

5. 3而後會啓動一個python進程,這個進程就是最終和用戶交互的pyspark

6. 這個python進程啓動的時候會先執行環境變量$PYTHONSTARTUP指定的python代碼,這個代碼就是pyspark/python/pyspark/shell.py,這個環境變量是在1這個shell腳本里設置的,而後咱們來看一下shell.py的代碼

from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

if os.environ.get("SPARK_EXECUTOR_URI"):
    SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

SparkContext._ensure_initialized()

try:
    spark = SparkSession._create_shell_session()
except Exception:
    import sys
    import traceback
    warnings.warn("Failed to initialize Spark session.")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)

sc = spark.sparkContext
sql = spark.sql
atexit.register(lambda: sc.stop())


7. shell.py調用SparkContext._ensure_initialized,接下來的過程和第二種方法同樣,選擇的class也是org.apache.spark.api.python.PythonGatewayServer,就是起一個py4j.GatewayServer,處理python端發起的請求


總結



文章結合代碼分析了三種啓動PySpark的方法,各有特點,原理是差很少。可是,不一樣的方法,均可以挖掘一些技巧,實現一些定製的功能、跟自家的產品集成。


本文分享自微信公衆號 - Hadoop實操(gh_c4c535955d0f)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索