一,必備知識apache
1.1 經典14問安全
1.2 問題前提網絡
二,序列化問題函數
2.1 Spark序列化出現狀況工具
2.2 Spark序列化問題解決spa
三,線程安全問題線程
3.1 Spark線程安全出現狀況code
3.2 Spark線程安全問題解決orm
1.SparkContext哪一端生成的? Driver端 2.DAG是在哪一端被構建的? Driver端 3.RDD是在哪一端生成的? Driver端 4.廣播變量是在哪一端調用的方法進行廣播的? Driver端 5.要廣播的數據應該在哪一端先建立好再廣播呢? Driver端 6.調用RDD的算子(Transformation和Action)是在哪一端調用的 Driver端 7.RDD在調用Transformation和Action時須要傳入一個函數,函數是在哪一端聲明和傳入的? Driver端 8.RDD在調用Transformation和Action時須要傳入函數,請問傳入的函數是在哪一端執行了函數的業務邏輯? Executor中的Task執行的 9.自定義的分區器這個類是在哪一端實例化的? Driver端 10.分區器中的getParitition方法在哪一端調用的呢? Executor中的Task中調用的 11.Task是在哪一端生成的呢? Driver端 12.DAG是在哪一端構建好的並被切分紅一到多個State的 Driver端 13.DAG是哪一個類完成的切分Stage的功能? DAGScheduler 14.DAGScheduler將切分好的Stage以什麼樣的形式給TaskScheduler TaskSet
在上面的12問的7-8問中,函數的申明和調用分別在Driver和Execute中進行,這其中就會牽扯到序列化問題和線程安全問題。接下來會對其進行解釋。blog
工具類:
package cn.edu360.spark05 // 隨意定義一工具類 class MyUtil { def get(msg: String): String ={ msg+"aaa" } }
Spark實現類:
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) // 對類進行實例化 val util = new MyUtil // 調用實例的方法 val value: RDD[String] = words.map(word => util.get(word)) value.collect() sc.stop() } }
報錯信息以下:
上述報錯信息就說明是MyUtil實例的序列化問題。該實例是在Driver端建立,經過網絡發送到Worker的Executer端。可是這個實例併爲序列化,因此會報這些錯誤。
解決方案一:實現序列化接口
package cn.edu360.spark05 // 繼承Serializable class MyUtil extends Serializable { def get(msg: String): String ={ msg+"aaa" } }
弊端:須要本身實現序列化接口,相對麻煩
解決方案二:不實現序列化接口,在Executer進行MyUtil內進行實例化
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) val value: RDD[String] = words.map(word => { // 在這裏進行實例化,這裏的操做是在Executer中 val util = new MyUtil util.get(word) }) val result: Array[String] = value.collect() print(result.toBuffer) sc.stop() } }
弊端:每一次調用都須要建立一個新的實例,浪費資源,浪費內存。
解決方案三:採用單例模式
MyUtil類:
package cn.edu360.spark05 // 將class 改成 object的單例模式 object MyUtil { def get(msg: String): String ={ msg+"aaa" } }
Spark實現類:
package cn.edu360.spark05 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SequenceTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]") var sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/") val words = lines.flatMap(_.split(" ")) val value: RDD[String] = words.map(word => { // 調用方法 MyUtil.get(word) }) val result: Array[String] = value.collect() print(result.toBuffer) sc.stop() } }
有共享成員變量:
1. 工具類使用object,說明工具類是單例的,有線程安全問題。在函數內部使用,是在Executer中被初始化,一個Executer中有一個實例,因此 就出現了線程安全問題。
2. 工具類使用Class,說明是多例的,沒有線程安全問題。每一個task都會持有一份工具類的實例。
沒有共享成員變量:
1. 工具類Object,沒有線程安全問題
2. 工具類使用class,實現序列化便可
工具類優先使用object,但儘量不使用成員變量,若實在有這方面的需求,能夠定義類的類型,或者把成員變量變成線程安全的成員變量,例如加鎖等。