讓代碼分佈式運行是全部分佈式計算框架須要解決的最基本的問題。java
Spark是大數據領域中至關火熱的計算框架,在大數據分析領域有一統江湖的趨勢,網上對於Spark源碼分析的文章有不少,可是介紹Spark如何處理代碼分佈式執行問題的資料少之又少,這也是我撰寫文本的目的。node
Spark運行在JVM之上,任務的執行依賴序列化及類加載機制,所以本文會重點圍繞這兩個主題介紹Spark對代碼分佈式執行的處理。本文假設讀者對Spark、Java、Scala有必定的瞭解,代碼示例基於Scala,Spark源碼基於2.1.0版本。閱讀本文你能夠了解到:git
根據以上內容,讀者能夠基於JVM相關的語言構建一個本身的分佈式計算服務框架。github
序列化(Serialization)是將對象的狀態信息轉換爲能夠存儲或傳輸的形式的過程。所謂的狀態信息指的是對象在內存中的數據,Java中通常指對象的字段數據。咱們開發Java應用的時候或多或少都處理過對象序列化,對象常見的序列化形式有JSON、XML等。shell
JDK中內置一個ObjectOutputStream
類能夠將對象序列化爲二進制數據,使用ObjectOutputStream
序列化對象時,要求對象所屬的類必須實現java.io.Serializable
接口,不然會報java.io.NotSerializableException
的異常。apache
基本的概念先介紹到這。接下來咱們一塊兒探討一個問題:Java的方法可否被序列化?服務器
假設咱們有以下的SimpleTask
類(Java類):網絡
import java.io.Serializable; public abstract class Task implements Serializable { public void run() { System.out.println("run task!"); } } public class SimpleTask extends Task { @Override public void run() { System.out.println("run simple task!"); } }
還有一個用於將對象序列化到文件的工具類FileSerializer
:閉包
import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} object FileSerializer { def writeObjectToFile(obj: Object, file: String) = { val fileStream = new FileOutputStream(file) val oos = new ObjectOutputStream(fileStream) oos.writeObject(obj) oos.close() } def readObjectFromFile(file: String): Object = { val fileStream = new FileInputStream(file) val ois = new ObjectInputStream(fileStream) val obj = ois.readObject() ois.close() obj } }
簡單起見,咱們採用將對象序列化到文件,而後經過反序列化執行的方式來模擬代碼的分佈式執行。SimpleTask就是咱們須要模擬分佈式執行的代碼。咱們先將SimpleTask
序列化到文件中:app
val task = new SimpleTask() FileSerializer.writeObjectToFile(task, "task.ser")
而後將SimpleTask
類從咱們的代碼中刪除,此時只有task.ser
文件中含有task對象的序列化數據。接下來咱們執行下面的代碼:
val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task] task.run()
請各位讀者思考,上面的代碼執行後會出現什麼樣的結果?
run simple task!
?run task!
?實際執行會出現形以下面的異常:
Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.SimpleTask at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)
從異常信息來看,反序列過程當中找不到SimpleTask
類。由此能夠推斷序列化後的數據是不包含類的定義信息的。那麼,ObjectOutputStream
到底序列化了哪些信息呢?
對ObjectOutputStream
實現機制感興趣的同窗能夠去看下JDK中這個類的實現,ObjectOutputStream
序列化對象時,從父類的數據開始序列化到子類,若是override了writeObject方法,會反射調用writeObject來序列化數據。序列化的數據會按照如下的順序以二進制的形式輸出到OutputStream中:
回到咱們的問題上:Java的方法可否被序列化?經過咱們代碼示例及分析,想必你們對這個問題應該清楚了。經過ObjectOutputStream
序列化對象,僅包含類的描述(而非定義),對象的狀態數據,因爲缺乏類的定義,也就是缺乏SimpleTask
的字節碼,反序列化過程當中就會出現ClassNotFound的異常。
如何讓咱們反序列化的對象能正常使用呢?咱們還須要瞭解類加載器。
ClassLoader在Java中是一個抽象類,ClassLoader的做用是加載類,給定一個類名,ClassLoader會嘗試查找或生成類的定義,一種典型的加載策略是將類名對應到文件名上,而後從文件系統中加載class file。
在咱們的示例中,反序列化SimpleTask
失敗,是由於JVM找不到類的定義,所以要確保正常反序列化,咱們必須將SimpleTask
的class文件保存下來,反序列化的時候可以讓ClassLoader加載到SimpleTask
的class。
接下來,咱們對代碼作一些改造,添加一個ClassManipulator
類,用於將對象的class文件導出到當前目錄的文件中,默認的文件名就是對象的類名(不含包名):
object ClassManipulator { def saveClassFile(obj: AnyRef): Unit = { val classLoader = obj.getClass.getClassLoader val className = obj.getClass.getName val classFile = className.replace('.', '/') + ".class" val stream = classLoader.getResourceAsStream(classFile) // just use the class simple name as the file name val outputFile = className.split('.').last + ".class" val fileStream = new FileOutputStream(outputFile) var data = stream.read() while (data != -1) { fileStream.write(data) data = stream.read() } fileStream.flush() fileStream.close() } }
按照JVM的規範,假設對package.Simple
這樣的一個類編譯,編譯後的class文件爲package/Simple.class
,所以咱們能夠根據路徑規則,從當前JVM進程的Resource中獲得指定類的class數據。
在刪除SimpleTask
前,咱們除了將task序列化到文件外,還須要將task的class文件保存起來,執行完下面的代碼,SimpleTask
類就能夠從代碼中剔除了:
val task = new SimpleTask() FileSerializer.writeObjectToFile(task, "task.ser") ClassManipulator.saveClassFile(task)
因爲咱們保存class文件的方式比較特殊,既不在jar包中,也不是按package/ClassName.class這種標準的保存方式,所以還須要實現一個自定義的FileClassLoader
按照咱們保存class文件的方式來加載所需的類:
class FileClassLoader() extends ClassLoader { override def findClass(fullClassName: String): Class[_] = { val file = fullClassName.split('.').last + ".class" val in = new FileInputStream(file) val bos = new ByteArrayOutputStream val bytes = new Array[Byte](4096) var done = false while (!done) { val num = in.read(bytes) if (num >= 0) { bos.write(bytes, 0, num) } else { done = true } } val data = bos.toByteArray defineClass(fullClassName, data, 0, data.length) } }
ObjectInputStream
類用於對象的反序列化,在反序列化過程當中,它根據序列化數據中類的descriptor信息,調用resolveClass
方法加載對應的類,可是經過Class.forName
加載class使用的並非咱們自定義的FileClassLoader
,因此若是直接使用ObjectInputStream
進行反序列,依然會由於找不到類而報錯,下面是resolveClass
的源碼:
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { String name = desc.getName(); try { return Class.forName(name, false, latestUserDefinedLoader()); } catch (ClassNotFoundException ex) { Class<?> cl = primClasses.get(name); if (cl != null) { return cl; } else { throw ex; } } }
爲了能讓ObjectInputStream
在序列化的過程當中使用咱們自定義的ClassLoader,咱們還須要對FileSerializer
中的readObjectFromFile
方法作些改造,修改的代碼以下:
def readObjectFromFile(file: String, classLoader: ClassLoader): Object = { val fileStream = new FileInputStream(file) val ois = new ObjectInputStream(fileStream) { override def resolveClass(desc: ObjectStreamClass): Class[_] = Class.forName(desc.getName, false, classLoader) } val obj = ois.readObject() ois.close() obj }
最後,咱們將反序列化的代碼調整爲:
val fileClassLoader = new FileClassLoader() val task = FileSerializer.readObjectFromFile("task.ser", fileClassLoader).asInstanceOf[Task] task.run()
反序列化的過程當中可以經過fileClassLoader加載到所需的類,這樣咱們在執行就不會出錯了,最終的執行結果爲:run simple task!
。到此爲止,咱們已經完整地模擬了代碼分佈式執行的過程。完整的示例代碼,請參閱:https://github.com/stanzhai/jvm-exercise/tree/master/src/main/scala/site/stanzhai/exercise/serialization
咱們依然經過一個示例,快速瞭解下Scala對閉包的處理,下面是從Scala的REPL中執行的代碼:
scala> val n = 2 n: Int = 2 scala> val f = (x: Int) => x * n f: Int => Int = <function1> scala> Seq.range(0, 5).map(f) res0: Seq[Int] = List(0, 2, 4, 6, 8)
f
是採用Scala的=>
語法糖定義的一個閉包,爲了弄清楚Scala是如何處理閉包的,咱們繼續執行下面的代碼:
scala> f.getClass res0: Class[_ <: Int => Int] = class $anonfun$1 scala> f.isInstanceOf[Function1[Int, Int]] res1: Boolean = true scala> f.isInstanceOf[Serializable] res2: Boolean = true
能夠看出f
對應的類爲$anonfun$1
是Function1[Int, Int]
的子類,並且實現了Serializable
接口,這說明f
是能夠被序列化的。
Spark對於數據的處理基本都是基於閉包,下面是一個簡單的Spark分佈式處理數據的代碼片斷:
val spark = SparkSession.builder().appName("demo").master("local").getOrCreate() val sc = spark.sparkContext val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) val sum = distData.map(x => x * 2).sum() println(sum) // 30.0
對於distData.map(x => x * 2)
,map中傳的一個匿名函數,也是一個很是簡單的閉包,對distData
中的每一個元素*2,咱們知道對於這種形式的閉包,Scala編譯後是能夠序列化的,因此咱們的代碼能正常執行也合情合理。將入咱們將處理函數的閉包定義到一個類中,而後將代碼改造爲以下形式:
class Operation { val n = 2 def multiply = (x: Int) => x * n } ... val sum = distData.map(new Operation().multiply).sum() ...
咱們在去執行,會出現什麼樣的結果呢?實際執行會出現這樣的異常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ... Caused by: java.io.NotSerializableException: Operation
Scala在構造閉包的時候會肯定他所依賴的外部變量,並將它們的引用存到閉包對象中,這樣能保證在不一樣的做用域中調用閉包不出現問題。
出現Task not serializable
的異常,是因爲咱們的multiply
函數依賴Operation
類的變量n
,雖然multiply是支持序列化的,可是Operation
不支持序列化,這致使multiply
函數在序列化的過程當中出現了NotSerializable
的異常,最終致使咱們的Task序列化失敗。爲了確保multiply
能被正常序列化,咱們須要想辦法去除對Operation
的依賴,咱們將代碼作以下修改,在去執行就能夠了:
class Operation { def multiply = (x: Int) => x * 2 } ... val sum = distData.map(new Operation().multiply).sum() ...
Spark對閉包序列化前,會經過工具類org.apache.spark.util.ClosureCleaner
嘗試clean掉閉包中無關的外部對象引用,ClosureCleaner
對閉包的處理是在運行期間,相比Scala編譯器,能更精準的去除閉包中無關的引用。這樣作,一方面能夠儘量保證閉包可被序列化,另外一方面能夠減小閉包序列化後的大小,便於網絡傳輸。
咱們在開發Spark應用的時候,若是遇到Task not serializable
的異常,就須要考慮下,閉包中是否或引用了沒法序列化的對象,有的話,嘗試去除依賴就能夠了。
Spark中實現的序列化工具備多個:
從SparkEnv
類的實現來看,用於閉包序列化的是JavaSerializer
:
JavaSerializer
內部使用的是ObjectOutputStream
將閉包序列化:
private[spark] class JavaSerializationStream( out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) extends SerializationStream { private val objOut = new ObjectOutputStream(out) ... }
將閉包反序列化的核心代碼爲:
private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader) extends DeserializationStream { private val objIn = new ObjectInputStream(in) { override def resolveClass(desc: ObjectStreamClass): Class[_] = try { Class.forName(desc.getName, false, loader) } catch { case e: ClassNotFoundException => JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e) } } ... }
關於ObjectInputStream
咱們前面已有介紹,JavaDeserializationStream
有個關鍵的成員變量loader
,它是個ClassLoader,可讓Spark使用非默認的ClassLoader按照自定義的加載策略去加載class,這樣才能保證反序列化過程在其餘節點正常進行。
經過前面的介紹,想要代碼在另外一端執行,只有序列化還不行,還須要保證執行端可以加載到閉包對應的類。接下來咱們探討Spark加載class的機制。
一般狀況下咱們會將開發的Spark Application打包爲jar包,而後經過spark-submit
命令提交到集羣運行,下面是一個官網的示例:
./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ ... \ --jars /path/to/dep-libs.jar \ /path/to/examples.jar \
此時,咱們編寫的代碼中所包含的閉包,對應的類已經被編譯到jar包中了,因此Executor端只要能加載到這個jar包,從jar包中定位閉包的class文件,就能夠將閉包反序列化了。事實上Spark也是這麼作的。
Spark Application的Driver端在運行的時候會基於netty創建一個文件服務,咱們運行的jar包,及--jars
中指定的依賴jar包,會被添加到文件服務器中。這個過程在SparkContext
的addJar方法中完成:
/** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String) { if (path == null) { logWarning("null specified as parameter to addJar") } else { var key = "" if (path.contains("\\")) { // For local paths with backslashes on Windows, URI throws an exception key = env.rpcEnv.fileServer.addJar(new File(path)) } else { val uri = new URI(path) // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies Utils.validateURL(uri) key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => try { env.rpcEnv.fileServer.addJar(new File(uri.getPath)) } catch { case exc: FileNotFoundException => logError(s"Jar not found at $path") null } // A JAR file which exists locally on every worker node case "local" => "file:" + uri.getPath case _ => path } } if (key != null) { val timestamp = System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() } } } }
Executor端在執行任務的時候,會從任務信息中獲得依賴的jar包,而後updateDependencies
從Driver端的文件服務器下載缺失的jar包,並將jar包添加到URLClassLoader中,最後再將task反序列化,反序列化前所需的jar都已準備好,所以可以將task中的閉包正常反序列化,核心代碼以下:
override def run(): Unit = { ... try { val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask) // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskProps) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) ... } finally { runningTasks.remove(taskId) } }
這麼來看,整個Spark Application分佈式加載class的機制就比較清晰了。Executor端可以正常加載class,反序列化閉包,分佈式執行代碼天然就不存在什麼問題了。
spark-shell
是Spark爲咱們提供的一個REPL的工具,可讓咱們很是方便的寫一些簡單的數據處理腳本。下面是一個運行在spark-shell
的代碼:
scala> val f = (x: Int) => x + 1 f: Int => Int = <function1> scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26 scala> distData.map(f).sum() res0: Double = 20.0
咱們已知,閉包f
會被Scala編譯爲匿名類,若是要將f
序列化到Executor端執行,必需要加載f
對應的匿名類的class數據,才能正常反序列化。
Spark是如何獲得f
的class數據的?Executor又是如何加載到的?
源碼面前,了無祕密。咱們看一下Spark的repl項目的代碼入口,核心代碼以下:
object Main extends Logging { ... val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") def main(args: Array[String]) { doMain(args, new SparkILoop) } // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp val jars = Utils.getUserJars(conf, isShell = true).mkString(File.pathSeparator) val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-classpath", jars ) ++ args.toList val settings = new GenericRunnerSettings(scalaOptionError) settings.processArguments(interpArguments, true) if (!hasErrors) { interp.process(settings) // Repl starts and goes in loop of R.E.P.L Option(sparkContext).map(_.stop) } } ... }
Spark2.1.0的REPL基於Scala-2.11的scala.tools.nsc
編譯工具實現,代碼已經至關簡潔,Spark給interp
設置了2個關鍵的配置-Yrepl-class-based
和-Yrepl-outdir
,經過這兩個配置,咱們在shell中輸入的代碼會被編譯爲class文件輸出到執行的文件夾中。若是指定了spark.repl.classdir
配置,會用這個配置的路徑做爲class文件的輸出路徑,不然使用SPARK_LOCAL_DIRS
對應的路徑。下面是我測試過程當中輸出到文件夾中的class文件:
咱們已經清楚Spark如何將shell中的代碼編譯爲class了,那麼Executor端,如何加載到這些class文件呢?在org/apache/spark/executor/Executor.scala
中有段和REPL相關的代碼:
private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) /** * If the REPL is in use, add another ClassLoader that will read * new classes defined by the REPL as the user types code */ private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { val classUri = conf.get("spark.repl.class.uri", null) if (classUri != null) { logInfo("Using REPL class URI: " + classUri) try { val _userClassPathFirst: java.lang.Boolean = userClassPathFirst val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader") .asInstanceOf[Class[_ <: ClassLoader]] val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv], classOf[String], classOf[ClassLoader], classOf[Boolean]) constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst) } catch { case _: ClassNotFoundException => logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") System.exit(1) null } } else { parent } } override def run(): Unit = { ... Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() ... }
Executor啓動時會判斷是否爲REPL模式,若是是的話會使用ExecutorClassLoader
作爲反序列閉包時所使用的ClassLoader,ExecutorClassLoader
會經過網絡從Driver端(也就是執行spark-shell
的節點)加載所需的class文件。這樣咱們在spark-shell
中寫的代碼就能夠分佈式執行了。
Spark實現代碼的分佈式執行有2個關鍵點:
知足以上2個條件,咱們的代碼就能夠分佈式運行了。
固然,構建一個完整的分佈式計算框架,還須要有網絡通訊框架、RPC、文件傳輸服務等做爲支撐,在瞭解Spark代碼分佈式執行原理的基礎上,相信讀者已有思路基於JVM相關的語言構建分佈式計算服務。
類比其餘非JVM相關的語言,實現一個分佈式計算框架,依然是須要解決序列化,動態加載執行代碼的問題。