浪尖 浪尖聊大數據 java
spark的類加載及參數傳遞過程仍是很複雜的,主要是由於他運行環境太複雜了,不一樣的集羣管理器徹底不同,即便是同一集羣管理器cluster和client也不同,再加上這塊探究仍是須要必定的java功底和耐心的,會使得不少人望而卻步。
下圖是yarn-cluster模式參數傳遞過程:
下圖是yarn-client模式參數傳遞過程:shell
可是java代碼,尤爲是整合框架,公司大了假如沒有統一的java依賴引用規範,解決衝突是必然的。並且有些時候,因爲歷史及發展緣由,也會共存不一樣的依賴版本,那這個時候就須要咱們去了解一下類加載機制了。網絡
關於底層運行環境,歡迎關注浪尖b站,點擊閱讀原文便可跳轉。
https://space.bilibili.com/33162030app
對於spark的依賴管理,你們比較熟悉的參數,拿spark 2.4 來講首先:框架
spark.driver.extraLibraryPath spark.executor.extraClassPath
上面者兩個參數,企業中,通常用來配置spark的lib或者jars目錄及一些通用的依賴,好比hbase,hadoop等。對於on yarn模式,經常使用的配置參數還有yarn.application.classpath,這個對於運行與yarn上的任務都比較適用。好比對於spark on yarn能夠配置以下:ide
<property> <description>Classpath for typical applications.</description> <name>yarn.application.classpath</name> <value>/opt/modules/spark-2.1.2/jars/*,$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/* </value> </property>
其實,上面的配置參數指定的jar包最終都是放到了系統類加載器的classpath裏,由系統類加載器完成加載。函數
有時候用戶本身也會引入一些依賴,這些依賴可能和spark的依賴相互衝突的,這個時候最簡單的辦法是想讓程序先加載用戶的依賴,然後加載spark的依賴。或者用戶本身針對性的改了底層源碼這個其實很常見,又不想幹擾其餘用戶。因此,spark引入了兩個參數:oop
spark.driver.userClassPathFirst spark.executor.userClassPathFirst
在spark中分用戶的jar和spark本身的依賴,翻看源碼你會發現,指定用戶jar的方式,有如下三種:
a. --jars參數。
b. spark.jars配置。
c. sparkContext.addjar。性能
這幾個參數配置的jar,最終會會存儲到SparkContext的一個hashmap裏:學習
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
而後在Task調度過程當中,TaskSetManager類裏,獲取:
// SPARK-21563 make a copy of the jars/files so they are consistent across the TaskSet private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*) private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*) 對於Task調度的過程當中其實是經過TaskDescription類來進行描述傳播的。具體在TaskSetManager的resourceOffer函數,封裝了TaskDescription。 new TaskDescription( taskId, attemptNum, execId, taskName, index, task.partitionId, addedFiles, addedJars, task.localProperties, serializedTask) }
task確定是在executor內部的線程池裏執行咯,可是executor首先要下載jar包到本地,而後放到executor線程的類加載器裏,才能正常運行jar。具體位置就是Executor的TaskRunner的run方法裏:
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
其實,在星球裏看過浪尖源碼視頻的球友應該都瞭解,實際上task調度以後,被CoarseGrainedExecutorBackend接收LaunchTask消息,而後反序列化獲得TaskDescription。
用戶jar添加到executor的類加載器的過程以下:
/** * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars) { val localName = new URI(name).getPath.split("/").last val currentTimeStamp = currentJars.get(name) .orElse(currentJars.get(localName)) .getOrElse(-1L) if (currentTimeStamp < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentJars(name) = timestamp // Add it to our class loader val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL if (!urlClassLoader.getURLs().contains(url)) { logInfo("Adding " + url + " to class loader") urlClassLoader.addURL(url) } } } } }
也就是說task須要的jar,假如是新增的jar包每次都會被executor獲取。
假設用戶打的jar包比較大或者指定的jar包比較大,可是task又很少,或者不少任務會公用的jar包,就應該放到spark的依賴環境裏,避免頻繁隨着task調度而傳輸,很浪費性能的。
爲啥會有新增jar包?
能夠想如下Spark JobServer或者你本身的spark任務服務。
關於類加載器 細節能夠閱讀:java類加載器學習必備
主要要看懂下面這張圖,瞭解類加載器的雙親委託機制。
雙親委託機制是:
ClassLoader使用的是雙親委託模型來搜索類的,每一個ClassLoader實例都有一個父類加載器的引用(不是繼承的關係,是一個包含的關係),虛擬機內置的類加載器(Bootstrap ClassLoader)自己沒有父類加載器,但能夠用做其它ClassLoader實例的的父類加載器。當一個ClassLoader實例須要加載某個類時,它會試圖親自搜索某個類以前,先把這個任務委託給它的父類加載器,這個過程是由上至下依次檢查的,首先由最頂層的類加載器Bootstrap ClassLoader試圖加載,若是沒加載到,則把任務轉交給Extension ClassLoader試圖加載,若是也沒加載到,則轉交給App ClassLoader 進行加載,若是它也沒有加載獲得的話,則返回給委託的發起者,由它到指定的文件系統或網絡等URL中加載該類。若是它們都沒有加載到這個類時,則拋出ClassNotFoundException異常。不然將這個找到的類生成一個類的定義,並將它加載到內存當中,最後返回這個類在內存中的Class實例對象。
executor端建立的類加載器,主要有兩個:
// Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
第一個就是咱們本文會說到的類加載器,而第二個是spark-shell命令或者livy裏會出現的交互式查詢的情境下的類加載器。
/** * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes * created by the interpreter to the search path */ private def createClassLoader(): MutableURLClassLoader = { // Bootstrap the list of jars with the user class path. val now = System.currentTimeMillis() userClassPath.foreach { url => currentJars(url.getPath().split("/").last) = now } val currentLoader = Utils.getContextOrSparkClassLoader // For each of the jars in the jarSet, add them to the class loader. // We assume each of the files has already been fetched. val urls = userClassPath.toArray ++ currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL } if (userClassPathFirst) { new ChildFirstURLClassLoader(urls, currentLoader) } else { new MutableURLClassLoader(urls, currentLoader) } }
能夠看到,假設配置了優先從用戶的classpath里加載類,會使用ChildFirstURLClassloader類加載器。該類具體以下:
/** * A mutable class loader that gives preference to its own URLs over the parent class loader * when loading classes and resources. */ public class ChildFirstURLClassLoader extends MutableURLClassLoader { static { ClassLoader.registerAsParallelCapable(); } private ParentClassLoader parent; public ChildFirstURLClassLoader(URL[] urls, ClassLoader parent) { super(urls, null); this.parent = new ParentClassLoader(parent); } @Override public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { try { return super.loadClass(name, resolve); } catch (ClassNotFoundException cnf) { return parent.loadClass(name, resolve); } } @Override public Enumeration<URL> getResources(String name) throws IOException { ArrayList<URL> urls = Collections.list(super.getResources(name)); urls.addAll(Collections.list(parent.getResources(name))); return Collections.enumeration(urls); } @Override public URL getResource(String name) { URL url = super.getResource(name); if (url != null) { return url; } else { return parent.getResource(name); } } }
看一下源碼你就知道了,這個類內部又專門new了一個ParentClassLoader。而後用來做爲ChildFirstURLClassLoader的父類加載器,加載Spark的相關依賴,而用戶的依賴加載是經過ChildFirstURLClassLoader本身加載的。
具體過程就在loadClass方法中。
@Override public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { try { return super.loadClass(name, resolve); } catch (ClassNotFoundException cnf) { return parent.loadClass(name, resolve); } }
就此實現了,能夠擺脫雙親委託機制,優先從用戶jar里加載類。