sparkR原理

 sparkR在spark2.0裏面,RDD後端代碼位於org.apache.spark.rdd中,R語言相關的位於org.apache.spark.api.r中。java

從入口開始,./bin/sparkR裏面只有四句話,調用的是這個python

exec "$SPARK_HOME"/bin/spark-submit sparkr-shell-main "$@"

spark-submit裏面是個一句話的shell腳本shell

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

 

好了,入口是org.apache.spark.deploy.SparkSubmit這個類,該類中的main方法中調用具體方法apache

複製代碼
case SparkSubmitAction.SUBMIT => submit(appArgs)

/**

 * Submit the application using the provided parameters.
 *
 
 * This runs in two steps. First, we prepare the launch environment by setting up
 
 * the appropriate classpath, system properties, and application arguments for
 
 * running the child main class based on the cluster manager and the deploy mode.

 * Second, we use this launch environment to invoke the main method of the child
        * main class.
 
*/

private def submit(args: SparkSubmitArguments): Unit = {
複製代碼

 

submit方法準備classpath、系統屬性、運行參數,而後按照這些調用下面的方法運行後端

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

 

該方法主要兩步,第一步調用下面方法進行準備api

val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

第二部會調用app

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

進行執行。jvm

 

 

在第一步中將sparkR的R相關代碼打包成zip文件,而後設置將要運行的主類ide

若是是SPARKR-SHELL則調用org.apache.spark.api.r.RBackendui

若是是純粹client模式,則調用org.apache.spark.deploy.RRunner其調用形式以下,例如

Usage: RRunner <main R file> [app arguments]
sun.java.command=com.aliyun.odps.cupid.runtime.Main --class org.apache.spark.deploy.RRunner --primary-r-file testOdpsRdd.R --arg testOdpsRdd.R
  

RBackend基於netty用來在R和java之間的通信

Runner裏面會調用啓動RBackend,而後啓動processBuilder去執行R腳本,也就是這句話:

new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)

如何讓spark worker識別sparkR代碼呢?在R語言中變量R_PROFILE_USER ,用來初始化R運行環境,sparkR相關代碼被打包提交到計算集羣之後,在計算節點上面首先設置這個數值指向到初始化腳本${SPARK_HOME}/sparkr/SparkR/profile/general.R,這個腳本中識別路徑,而且把解壓後sparkR的代碼安裝到當前R環境中。下面是其代碼

.First <- function() {
  packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
  .libPaths(c(packageDir, .libPaths()))
  Sys.setenv(NOAWT=1)
}

下面的代碼來自於prepareSubmitEnvironment

複製代碼
// In YARN mode for an R app, add the SparkR package archive to archives

// that can be distributed with the job

if (args.isR && clusterManager == YARN) {
  
  val rPackagePath = RUtils.localSparkRPackagePath

  if (rPackagePath.isEmpty) {
    
     printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
  
   }
 
   val rPackageFile =
    RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
 
    if (!rPackageFile.exists()) {
    
       printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
  
    }
 
    val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)

 
    // Assigns a symbol link name "sparkr" to the shipped package.
  
    args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")

}


// If we're running a R app, set the main class to our specific R runner


if (args.isR && deployMode == CLIENT) {

  if (args.primaryResource == SPARKR_SHELL) {

    args.mainClass = "org.apache.spark.api.r.RBackend"

  } else {

    // If a R file is provided, add it to the child arguments and list of files to deploy.
       // Usage: RRunner <main R file> [app arguments]
   
    args.mainClass = "org.apache.spark.deploy.RRunner"

    args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
            args.files = mergeFileLists(args.files, args.primaryResource)
  
}

}


    if (isYarnCluster && args.isR) {
 
 // In yarn-cluster mode for a R app, add primary resource to files
 
 // that can be distributed with the job

  args.files = mergeFileLists(args.files, args.primaryResource)

}
複製代碼

 

對於普通scala/java做業,standalone狀況下直接調用下面類

// In legacy standalone cluster mode, use Client as a wrapper around the user 
class
childMainClass = "org.apache.spark.deploy.Client"

 

在client模式下直接提交用戶應用主類運行,這裏的主類若是是SPARKR_SHELL的話就是org.apache.spark.api.r.RBackend

直接提交文件執行則調用org.apache.spark.deploy.RRunner

 

複製代碼

// In client mode, launch the application main class directly

// In addition, add the main application jar and any added jars (if any) to the classpath

if (deployMode == CLIENT) {

  childMainClass = args.mainClass

  if (isUserJar(args.primaryResource)) {

    childClasspath += args.primaryResource
  
}
  if (args.jars != null) {
 childClasspath ++= args.jars.split(",") 
}
  if (args.childArgs != null) {
 childArgs ++= args.childArgs 
}

}
複製代碼

 

yarnCluster模式調度狀況下,使用org.apache.spark.deploy.yarn.Client

這個類包裝用戶的類進行提交

複製代碼
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class

if (isYarnCluster) {

  childMainClass = "org.apache.spark.deploy.yarn.Client"

  if (args.isPython) {

    childArgs += ("--primary-py-file", args.primaryResource)

    if (args.pyFiles != null) {

      childArgs += ("--py-files", args.pyFiles)

    }

    childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
  
} else if (args.isR) {

    val mainFile = new Path(args.primaryResource).getName
 
   childArgs += ("--primary-r-file", mainFile)

    childArgs += ("--class", "org.apache.spark.deploy.RRunner")
  
} else {

    if (args.primaryResource != SPARK_INTERNAL) {

      childArgs += ("--jar", args.primaryResource)

    }

    childArgs += ("--class", args.mainClass)

  }
  if (args.childArgs != null) {

    args.childArgs.foreach { arg => childArgs += ("--arg", arg) 
}

  }

}
複製代碼

org.apache.spark.deploy.yarn.Client

 

Py調用spark過程,在python/pyspark/context.py下面存在

class SparkContext(object)

其中的_jvm成員做爲py4j的調用存在,其初始化

233 if not SparkContext._gateway:
234    SparkContext._gateway=gateway or launch_gateway()
235    SparkContext._jvm=SparkContext._gateway.jvm

其調用後端方法

207         # Create a temporary directory inside spark.local.dir:
208         local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
209         self._temp_dir = \
210             self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir, "pyspark") \
211                 .getAbsolutePath()
相關文章
相關標籤/搜索