在《Spark2.1.0——運行環境準備》一文介紹瞭如何準備基本的Spark運行環境,並在《Spark2.1.0——Spark初體驗》一文經過在spark-shell中執行word count的過程,讓讀者瞭解到可使用spark-shell提交Spark做業。如今讀者應該很想知道spark-shell究竟作了什麼呢?html
在Spark安裝目錄的bin文件夾下能夠找到spark-shell,其中有代碼清單1-1所示的一段腳本。java
代碼清單1-1 spark-shell腳本web
function main() { if $cygwin; then stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi }
咱們看到腳本spark-shell裏執行了spark-submit腳本,那麼打開spark-submit腳本,發現代碼清單1-2中所示的腳本。sql
代碼清單1-2 spark-submit腳本shell
if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
能夠看到spark-submit中又執行了腳本spark-class。打開腳本spark-class,首先發現如下一段腳本:apache
# Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi
上面的腳本是爲了找到Java命令。在spark-class腳本中還會找到如下內容:bash
build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <(build_command "$@")
根據代碼清單1-2,腳本spark-submit在執行spark-class腳本時,給它增長了參數SparkSubmit 。因此讀到這,應該知道Spark啓動了以SparkSubmit爲主類的JVM進程。session
爲便於在本地對Spark進程進行遠程監控,在spark-shell腳本中找到如下配置:架構
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"
並追加如下jmx配置:app
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
若是Spark安裝在其餘機器,那麼在本地打開jvisualvm後須要添加遠程主機,如圖1所示:
右鍵單擊已添加的遠程主機,添加JMX鏈接,如圖2:
若是Spark安裝在本地,那麼打開jvisualvm後就會在應用程序窗口看到org.apache.spark.deploy.SparkSubmit進程,只需雙擊便可。
選擇右側的「線程」選項卡,選擇main線程,而後點擊「線程Dump」按鈕,如圖3。
圖3 查看Spark線程
從線程Dump的內容中找到線程main的信息如代碼清單1-3所示。
代碼清單1-3 main線程的Dump信息
"main" #1 prio=5 os_prio=31 tid=0x00007fa012802000 nid=0x1303 runnable [0x000000010d11c000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.read0(Native Method) at java.io.FileInputStream.read(FileInputStream.java:207) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246) at jline.internal.InputStreamReader.read(InputStreamReader.java:261) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.internal.InputStreamReader.read(InputStreamReader.java:198) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269) at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveReader.scala:44) at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28) at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:68) at org.apache.spark.repl.Main$.main(Main.scala:51) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
從main線程的棧信息中看出程序的調用順序:SparkSubmit.main→repl.Main→Iloop.process。
咱們根據上面的線索,直接閱讀Iloop的process方法的源碼(Iloop是Scala語言自身的類庫中的用於實現交互式shell的實現類,提供對REPL(Read-eval-print-loop)的實現),見代碼清單1-4。
代碼清單1-4 process的實現
def process(settings: Settings): Boolean = savingContextLoader { this.settings = settings createInterpreter() // sets in to some kind of reader depending on environmental cues in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) globalFuture = future { intp.initializeSynchronous() loopPostInit() !intp.reporter.hasErrors } loadFiles(settings) printWelcome() try loop() match { case LineResults.EOF => out print Properties.shellInterruptedString case _ => } catch AbstractOrMissingHandler() finally closeInterpreter() true }
根據代碼清單1-4,Iloop的process方法調用了loadFiles方法。Spark中的SparkILoop繼承了Iloop並重寫了loadFiles方法,其實現以下:
override def loadFiles(settings: Settings): Unit = { initializeSpark() super.loadFiles(settings) }
根據上面展現的代碼,loadFiles方法調用了SparkILoop的initializeSpark方法,initializeSpark的實現見代碼清單1-5。
代碼清單1-5 initializeSpark的實現
def initializeSpark() { intp.beQuietDuring { processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } @transient val sc = { val _sc = spark.sparkContext if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) if (proxyUrl != null) { println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") } else { println(s"Spark Context Web UI is available at Spark Master Public URL") } } else { _sc.uiWebUrl.foreach { webUrl => println(s"Spark context Web UI available at ${webUrl}") } } println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }
咱們看到initializeSpark向交互式shell發送了一大串代碼,Scala的交互式shell將調用org.apache.spark.repl.Main的createSparkSession方法(見代碼清單1-6)建立SparkSession。咱們看到常量spark將持有SparkSession的引用,而且sc持有SparkSession內部初始化好的SparkContext。因此咱們纔可以在spark-shell的交互式shell中使用sc和spark。
代碼清單1-6 createSparkSession的實現
def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri != null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") != null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } val builder = SparkSession.builder.config(conf) if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { sparkSession = builder.getOrCreate() logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext sparkSession }
根據代碼清單1-6,createSparkSession方法經過SparkSession的API建立SparkSession實例。本書將有關SparkSession等API的內容在《Spark內核設計的藝術》一書的第10章講解,初次接觸Spark的讀者如今只須要了解便可。