Spark submit依賴包管理!

Spark submit依賴包管理!
java


使用spark-submit時,應用程序的jar包以及經過—jars選項包含的任意jar文件都會被自動傳到集羣中。shell

spark-submit --class   --master  --jars 服務器

Spark使用了下面的URL格式容許不一樣的jar包分發策略。網絡

一、文件file方式:app

絕對路徑且file:/URIs是做爲driver的HTTP文件服務器,且每一個executor會從driver的HTTP服務器拉取文件;maven

二、hdfs方式:ide

http:,https:,ftp:,從這些給定的URI中拉取文件和JAR包;oop

三、本地local方式:fetch

以local:/開始的URI應該是每一個worker節點的本地文件,這意味着沒有網絡IO開銷,而且推送或經過NFS/GlusterFS等共享到每一個worker大文件/JAR文件或能很好的工做。url


注意每一個SparkContext的JAR包和文件都會被複製到executor節點的工做目錄下,這將用掉大量的空間,而後還須要清理乾淨。

在YARN下,清理是自動進行的。在Spark Standalone下,自動清理能夠經過配置spark.worker.cleanup.appDataTtl屬性作到,此配置屬性的默認值是7*24*3600。

用戶能夠用--packages選項提供一個以逗號分隔的maven清單來包含任意其餘依賴。

其它的庫(或SBT中的resolvers)能夠用--repositories選項添加(一樣用逗號分隔),這些命令均可以用在pyspark,spark-shell和spark-submit中來包含一些Spark包。

對Python而言,--py-files選項能夠用來向executors分發.egg,.zip和.py庫。


源碼走讀:


一、

object SparkSubmit


二、

appArgs.{
  SparkSubmitAction.=> (appArgs)
  SparkSubmitAction.=> (appArgs)
  SparkSubmitAction.=> (appArgs)
}

三、

(args: SparkSubmitArguments): = {
  (childArgschildClasspathsysPropschildMainClass) = (args)

  (): = {
    (args.!= ) {
      proxyUser = UserGroupInformation.createProxyUser(args.UserGroupInformation.getCurrentUser())
      {
        proxyUser.doAs(PrivilegedExceptionAction[]() {
          (): = {
            (childArgschildClasspathsysPropschildMainClassargs.)
          }
        })

四、

(jar <- childClasspath) {
  (jarloader)
}

五、

(localJar: loader: MutableURLClassLoader) {
  uri = Utils.(localJar)
  uri.getScheme {
    | =>
      file = File(uri.getPath)
      (file.exists()) {
        loader.addURL(file.toURI.toURL)
      } {
        (file)
      }
    _ =>
      (uri)
  }
}

以後線索就斷了,迴歸到java的class類調用jar包。

六、誰調用,executor。

(newFiles: HashMap[]newJars: HashMap[]) {
  hadoopConf = SparkHadoopUtil..newConfiguration()
  synchronized {
    ((nametimestamp) <- newFiles .getOrElse(name-) < timestamp) {
      logInfo(+ name + + timestamp)
      Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal)
      (name) = timestamp
    }
    ((nametimestamp) <- newJars) {
      localName = name.split().last
      currentTimeStamp = .get(name)
        .orElse(.get(localName))
        .getOrElse(-)
      (currentTimeStamp < timestamp) {
        logInfo(+ name + + timestamp)
        Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal)
        (name) = timestamp
        url = File(SparkFiles.()localName).toURI.toURL
        (!.getURLs().contains(url)) {
          logInfo(+ url + )
          .addURL(url)
        }
      }
    }
  }
}

Utils.fetchFile方法,進入

 /**
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
* the executors, and not in local mode.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
(!cachedFile.exists()) {
  (urllocalDircachedFileNameconfsecurityMgrhadoopConf)
}

可見,支持本地files,Hadoop的hdfs,還有http格式的文件。


其中目錄目前支持hdfs!


完畢!

相關文章
相關標籤/搜索