Java代碼使用Spark on Yarn 方式提交任務到帶Kerberos認證的Hadoop集羣

        項目中遇到Spark Yarn方式提交到Hadoop集羣,訪問集羣HDFS時發現使用的當前用戶,沒有訪問權限,通過排查後發現Hadoop集羣是帶Kerberos認證的集羣,須要像hadoop同樣使用Kerberos的認證用戶登錄,而後查相關資料傻臉了,沒有相關的內容,查了半天也只查到了在服務器上使用Spark-Submit命令提交時 加入參數 --keytab  /Kerberos/user.keytab \  --principal user ,可是我使用的java api的 SparkLauncer(),發現API裏面沒有setKeyTab與setPrincipal方法,而後就卡住了。後面想着去看看SparkLauncer的源碼java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.launcher;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.spark.launcher.CommandBuilderUtils.*;

/**
 * Launcher for Spark applications.
 * <p>
 * Use this class to start Spark applications programmatically. The class uses a builder pattern
 * to allow clients to configure the Spark application and launch it as a child process.
 * </p>
 */
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {

  /** The Spark master. */
  public static final String SPARK_MASTER = "spark.master";

  /** The Spark deploy mode. */
  public static final String DEPLOY_MODE = "spark.submit.deployMode";

  /** Configuration key for the driver memory. */
  public static final String DRIVER_MEMORY = "spark.driver.memory";
  /** Configuration key for the driver class path. */
  public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
  /** Configuration key for the driver VM options. */
  public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
  /** Configuration key for the driver native library path. */
  public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";

  /** Configuration key for the executor memory. */
  public static final String EXECUTOR_MEMORY = "spark.executor.memory";
  /** Configuration key for the executor class path. */
  public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
  /** Configuration key for the executor VM options. */
  public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
  /** Configuration key for the executor native library path. */
  public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
  /** Configuration key for the number of executor CPU cores. */
  public static final String EXECUTOR_CORES = "spark.executor.cores";

  static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";

  static final String PYSPARK_PYTHON = "spark.pyspark.python";

  static final String SPARKR_R_SHELL = "spark.r.shell.command";

  /** Logger name to use when launching a child process. */
  public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";

  /**
   * A special value for the resource that tells Spark to not try to process the app resource as a
   * file. This is useful when the class being executed is added to the application using other
   * means - for example, by adding jars using the package download feature.
   */
  public static final String NO_RESOURCE = "spark-internal";

  /**
   * Maximum time (in ms) to wait for a child process to connect back to the launcher server
   * when using @link{#start()}.
   */
  public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";

  /** Used internally to create unique logger names. */
  private static final AtomicInteger COUNTER = new AtomicInteger();

  /** Factory for creating OutputRedirector threads. **/
  static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");

  static final Map<String, String> launcherConfig = new HashMap<>();

  /**
   * Set a configuration value for the launcher library. These config values do not affect the
   * launched application, but rather the behavior of the launcher library itself when managing
   * applications.
   *
   * @since 1.6.0
   * @param name Config name.
   * @param value Config value.
   */
  public static void setConfig(String name, String value) {
    launcherConfig.put(name, value);
  }

  // Visible for testing.
  File workingDir;
  boolean redirectErrorStream;
  ProcessBuilder.Redirect errorStream;
  ProcessBuilder.Redirect outputStream;

  public SparkLauncher() {
    this(null);
  }

  /**
   * Creates a launcher that will set the given environment variables in the child.
   *
   * @param env Environment variables to set.
   */
  public SparkLauncher(Map<String, String> env) {
    if (env != null) {
      this.builder.childEnv.putAll(env);
    }
  }

  /**
   * Set a custom JAVA_HOME for launching the Spark application.
   *
   * @param javaHome Path to the JAVA_HOME to use.
   * @return This launcher.
   */
  public SparkLauncher setJavaHome(String javaHome) {
    checkNotNull(javaHome, "javaHome");
    builder.javaHome = javaHome;
    return this;
  }

  /**
   * Set a custom Spark installation location for the application.
   *
   * @param sparkHome Path to the Spark installation to use.
   * @return This launcher.
   */
  public SparkLauncher setSparkHome(String sparkHome) {
    checkNotNull(sparkHome, "sparkHome");
    builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
    return this;
  }

  /**
   * Sets the working directory of spark-submit.
   *
   * @param dir The directory to set as spark-submit's working directory.
   * @return This launcher.
   */
  public SparkLauncher directory(File dir) {
    workingDir = dir;
    return this;
  }

  /**
   * Specifies that stderr in spark-submit should be redirected to stdout.
   *
   * @return This launcher.
   */
  public SparkLauncher redirectError() {
    redirectErrorStream = true;
    return this;
  }

  /**
   * Redirects error output to the specified Redirect.
   *
   * @param to The method of redirection.
   * @return This launcher.
   */
  public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
    errorStream = to;
    return this;
  }

  /**
   * Redirects standard output to the specified Redirect.
   *
   * @param to The method of redirection.
   * @return This launcher.
   */
  public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
    outputStream = to;
    return this;
  }

  /**
   * Redirects error output to the specified File.
   *
   * @param errFile The file to which stderr is written.
   * @return This launcher.
   */
  public SparkLauncher redirectError(File errFile) {
    errorStream = ProcessBuilder.Redirect.to(errFile);
    return this;
  }

  /**
   * Redirects error output to the specified File.
   *
   * @param outFile The file to which stdout is written.
   * @return This launcher.
   */
  public SparkLauncher redirectOutput(File outFile) {
    outputStream = ProcessBuilder.Redirect.to(outFile);
    return this;
  }

  /**
   * Sets all output to be logged and redirected to a logger with the specified name.
   *
   * @param loggerName The name of the logger to log stdout and stderr.
   * @return This launcher.
   */
  public SparkLauncher redirectToLog(String loggerName) {
    setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
    return this;
  }

  // The following methods just delegate to the parent class, but they are needed to keep
  // binary compatibility with previous versions of this class.

  @Override
  public SparkLauncher setPropertiesFile(String path) {
    return super.setPropertiesFile(path);
  }

  @Override
  public SparkLauncher setConf(String key, String value) {
    return super.setConf(key, value);
  }

  @Override
  public SparkLauncher setAppName(String appName) {
    return super.setAppName(appName);
  }

  @Override
  public SparkLauncher setMaster(String master) {
    return super.setMaster(master);
  }

  @Override
  public SparkLauncher setDeployMode(String mode) {
    return super.setDeployMode(mode);
  }

  @Override
  public SparkLauncher setAppResource(String resource) {
    return super.setAppResource(resource);
  }

  @Override
  public SparkLauncher setMainClass(String mainClass) {
    return super.setMainClass(mainClass);
  }

  @Override
  public SparkLauncher addSparkArg(String arg) {
    return super.addSparkArg(arg);
  }

  @Override
  public SparkLauncher addSparkArg(String name, String value) {
    return super.addSparkArg(name, value);
  }

  @Override
  public SparkLauncher addAppArgs(String... args) {
    return super.addAppArgs(args);
  }

  @Override
  public SparkLauncher addJar(String jar) {
    return super.addJar(jar);
  }

  @Override
  public SparkLauncher addFile(String file) {
    return super.addFile(file);
  }

  @Override
  public SparkLauncher addPyFile(String file) {
    return super.addPyFile(file);
  }

  @Override
  public SparkLauncher setVerbose(boolean verbose) {
    return super.setVerbose(verbose);
  }

  /**
   * Launches a sub-process that will start the configured Spark application.
   * <p>
   * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
   * Spark, since it provides better control of the child application.
   *
   * @return A process handle for the Spark app.
   */
  public Process launch() throws IOException {
    ProcessBuilder pb = createBuilder();

    boolean outputToLog = outputStream == null;
    boolean errorToLog = !redirectErrorStream && errorStream == null;

    String loggerName = getLoggerName();
    if (loggerName != null && outputToLog && errorToLog) {
      pb.redirectErrorStream(true);
    }

    Process childProc = pb.start();
    if (loggerName != null) {
      InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream();
      new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY);
    }

    return childProc;
  }

  /**
   * Starts a Spark application.
   *
   * <p>
   * Applications launched by this launcher run as child processes. The child's stdout and stderr
   * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
   * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
   * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
   * option is not set, the code will try to derive a name from the application's name or main
   * class / script file. If those cannot be determined, an internal, unique name will be used.
   * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
   * easily into the configuration of commonly-used logging systems.
   *
   * @since 1.6.0
   * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
   * @param listeners Listeners to add to the handle before the app is launched.
   * @return A handle for the launched application.
   */
  @Override
  public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
    LauncherServer server = LauncherServer.getOrCreateServer();
    ChildProcAppHandle handle = new ChildProcAppHandle(server);
    for (SparkAppHandle.Listener l : listeners) {
      handle.addListener(l);
    }

    String secret = server.registerHandle(handle);

    String loggerName = getLoggerName();
    ProcessBuilder pb = createBuilder();

    boolean outputToLog = outputStream == null;
    boolean errorToLog = !redirectErrorStream && errorStream == null;

    // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
    // redirection.
    if (loggerName == null && (outputToLog || errorToLog)) {
      String appName;
      if (builder.appName != null) {
        appName = builder.appName;
      } else if (builder.mainClass != null) {
        int dot = builder.mainClass.lastIndexOf(".");
        if (dot >= 0 && dot < builder.mainClass.length() - 1) {
          appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
        } else {
          appName = builder.mainClass;
        }
      } else if (builder.appResource != null) {
        appName = new File(builder.appResource).getName();
      } else {
        appName = String.valueOf(COUNTER.incrementAndGet());
      }
      String loggerPrefix = getClass().getPackage().getName();
      loggerName = String.format("%s.app.%s", loggerPrefix, appName);
    }

    if (outputToLog && errorToLog) {
      pb.redirectErrorStream(true);
    }

    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(server.getPort()));
    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret);
    try {
      Process child = pb.start();
      InputStream logStream = null;
      if (loggerName != null) {
        logStream = outputToLog ? child.getInputStream() : child.getErrorStream();
      }
      handle.setChildProc(child, loggerName, logStream);
    } catch (IOException ioe) {
      handle.kill();
      throw ioe;
    }

    return handle;
  }

  private ProcessBuilder createBuilder() throws IOException {
    List<String> cmd = new ArrayList<>();
    cmd.add(findSparkSubmit());
    cmd.addAll(builder.buildSparkSubmitArgs());

    // Since the child process is a batch script, let's quote things so that special characters are
    // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
    // weird.
    if (isWindows()) {
      List<String> winCmd = new ArrayList<>();
      for (String arg : cmd) {
        winCmd.add(quoteForBatchScript(arg));
      }
      cmd = winCmd;
    }

    ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
    for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
      pb.environment().put(e.getKey(), e.getValue());
    }

    if (workingDir != null) {
      pb.directory(workingDir);
    }

    // Only one of redirectError and redirectError(...) can be specified.
    // Similarly, if redirectToLog is specified, no other redirections should be specified.
    checkState(!redirectErrorStream || errorStream == null,
      "Cannot specify both redirectError() and redirectError(...) ");
    checkState(getLoggerName() == null ||
      ((!redirectErrorStream && errorStream == null) || outputStream == null),
      "Cannot used redirectToLog() in conjunction with other redirection methods.");

    if (redirectErrorStream) {
      pb.redirectErrorStream(true);
    }
    if (errorStream != null) {
      pb.redirectError(errorStream);
    }
    if (outputStream != null) {
      pb.redirectOutput(outputStream);
    }

    return pb;
  }

  @Override
  SparkLauncher self() {
    return this;
  }

  // Visible for testing.
  String findSparkSubmit() {
    String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
    return join(File.separator, builder.getSparkHome(), "bin", script);
  }

  private String getLoggerName() throws IOException {
    return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
  }

}

忽然發現有個addSparkArg(String name, String value)的方法,眼前一亮感受有但願,繼續跟蹤代碼發現父類AbstractLauncher實現python

/**
   * Adds an argument with a value to the Spark invocation. If the argument name corresponds to
   * a known argument, the code validates that the argument actually expects a value, and throws
   * an exception otherwise.
   * <p>
   * It is safe to add arguments modified by other methods in this class (such as
   * {@link #setMaster(String)} - the last invocation will be the one to take effect.
   * <p>
   * Use this method with caution. It is possible to create an invalid Spark command by passing
   * unknown arguments to this method, since those are allowed for forward compatibility.
   *
   * @since 1.5.0
   * @param name Name of argument to add.
   * @param value Value of the argument.
   * @return This launcher.
   */
  public T addSparkArg(String name, String value) {
    SparkSubmitOptionParser validator = new ArgumentValidator(true);
    if (validator.MASTER.equals(name)) {
      setMaster(value);
    } else if (validator.PROPERTIES_FILE.equals(name)) {
      setPropertiesFile(value);
    } else if (validator.CONF.equals(name)) {
      String[] vals = value.split("=", 2);
      setConf(vals[0], vals[1]);
    } else if (validator.CLASS.equals(name)) {
      setMainClass(value);
    } else if (validator.JARS.equals(name)) {
      builder.jars.clear();
      for (String jar : value.split(",")) {
        addJar(jar);
      }
    } else if (validator.FILES.equals(name)) {
      builder.files.clear();
      for (String file : value.split(",")) {
        addFile(file);
      }
    } else if (validator.PY_FILES.equals(name)) {
      builder.pyFiles.clear();
      for (String file : value.split(",")) {
        addPyFile(file);
      }
    } else {
      validator.parse(Arrays.asList(name, value));
      builder.sparkArgs.add(name);
      builder.sparkArgs.add(value);
    }
    return self();
  }

參數放入了 final SparkSubmitCommandBuilder builder, 到這一步基本能夠肯定這個方法是能夠使用的,shell

而後代碼中加入 express

SparkAppHandle handle = new SparkLauncher().setSparkHome("/**/spark-2.2.0")
				.setAppResource("/**/spark-2.2.0/lib/spark.jar")
				.setMainClass("***.SimpleApp")
				.setMaster("yarn").setDeployMode("cluster")
				.addSparkArg("keytab", "/**/user.keytab") //此value爲kerberos根據用戶生成的公鑰
				.addSparkArg("principal ", "user")//此value爲生成公鑰時 使用的用戶名
				.....................

修改後,發現有問題 並且全亂套了,後面通過一系列跟源碼,才發現又是一個比較二的問題,漏了前面的--符號,想固然的認爲能夠省略,後面發現API徹底沒給補上,修改後apache

SparkAppHandle handle = new SparkLauncher().setSparkHome("/**/spark-2.2.0")
				.setAppResource("/**/spark-2.2.0/lib/spark.jar")
				.setMainClass("***.SimpleApp")
				.setMaster("yarn").setDeployMode("cluster")
				.addSparkArg("--keytab", "/**/user.keytab") //此value爲kerberos根據用戶生成的公鑰
				.addSparkArg("--principal ", "user")//此value爲生成公鑰時 使用的用戶名
				.....................

完美經過kerberos認證,執行計劃的用戶名同樣變成了使用的kerberos認證的用戶名api

相關文章
相關標籤/搜索