Spark 學習(六) Spark 的線程安全和序列化問題

一,必備知識apache

  1.1 經典14問安全

  1.2 問題前提網絡

二,序列化問題函數

  2.1 Spark序列化出現狀況工具

  2.2 Spark序列化問題解決spa

三,線程安全問題線程

  3.1 Spark線程安全出現狀況code

  3.2 Spark線程安全問題解決orm

 

 

 

 

正文

一,必備知識

  1.1 經典14問

1.SparkContext哪一端生成的?
    Driver端

2.DAG是在哪一端被構建的?
    Driver端
    
3.RDD是在哪一端生成的?
    Driver端

4.廣播變量是在哪一端調用的方法進行廣播的?
    Driver端

5.要廣播的數據應該在哪一端先建立好再廣播呢? 
    Driver端

6.調用RDD的算子(Transformation和Action)是在哪一端調用的
    Driver端
    
7.RDD在調用Transformation和Action時須要傳入一個函數,函數是在哪一端聲明和傳入的?
    Driver端

8.RDD在調用Transformation和Action時須要傳入函數,請問傳入的函數是在哪一端執行了函數的業務邏輯?
    Executor中的Task執行的

9.自定義的分區器這個類是在哪一端實例化的?
    Driver端

10.分區器中的getParitition方法在哪一端調用的呢?
    Executor中的Task中調用的

11.Task是在哪一端生成的呢? 
    Driver端

12.DAG是在哪一端構建好的並被切分紅一到多個State的
    Driver端

13.DAG是哪一個類完成的切分Stage的功能?
    DAGScheduler
    
14.DAGScheduler將切分好的Stage以什麼樣的形式給TaskScheduler
    TaskSet

  1.2 需求前提

  在上面的12問的7-8問中,函數的申明和調用分別在Driver和Execute中進行,這其中就會牽扯到序列化問題和線程安全問題。接下來會對其進行解釋。blog

二,序列化問題

  2.1 Spark序列化出現狀況

  工具類:

package cn.edu360.spark05

// 隨意定義一工具類
class MyUtil {
    def get(msg: String): String ={
        msg+"aaa"
    }
}

  Spark實現類:

package cn.edu360.spark05
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SequenceTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
        var sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
        val words = lines.flatMap(_.split(" "))
        // 對類進行實例化
        val util = new MyUtil
        // 調用實例的方法
        val value: RDD[String] = words.map(word => util.get(word))
        value.collect()
        sc.stop()
    }
}

  報錯信息以下:

  

  

  上述報錯信息就說明是MyUtil實例的序列化問題。該實例是在Driver端建立,經過網絡發送到Worker的Executer端。可是這個實例併爲序列化,因此會報這些錯誤。

  2.2 Spark序列化問題解決

  解決方案一:實現序列化接口

package cn.edu360.spark05

// 繼承Serializable
class MyUtil extends Serializable {
    def get(msg: String): String ={
        msg+"aaa"
    }
}

  弊端:須要本身實現序列化接口,相對麻煩

  解決方案二:不實現序列化接口,在Executer進行MyUtil內進行實例化

package cn.edu360.spark05
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SequenceTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
        var sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
        val words = lines.flatMap(_.split(" "))
        val value: RDD[String] = words.map(word => {
            // 在這裏進行實例化,這裏的操做是在Executer中
            val util = new MyUtil
            util.get(word)
        })
        val result: Array[String] = value.collect()
        print(result.toBuffer)
        sc.stop()
    }
}

  弊端:每一次調用都須要建立一個新的實例,浪費資源,浪費內存。

  解決方案三:採用單例模式

  MyUtil類:

package cn.edu360.spark05

// 將class 改成 object的單例模式
object MyUtil {
    def get(msg: String): String ={
        msg+"aaa"
    }
}

  Spark實現類:

package cn.edu360.spark05
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SequenceTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[3]")
        var sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/wordcount/input/")
        val words = lines.flatMap(_.split(" "))
        val value: RDD[String] = words.map(word => {
            // 調用方法
            MyUtil.get(word)
        })
        val result: Array[String] = value.collect()
        print(result.toBuffer)
        sc.stop()
    }
}

三,線程安全問題

  3.1 Spark線程安全出現狀況、

  有共享成員變量:

    1. 工具類使用object,說明工具類是單例的,有線程安全問題。在函數內部使用,是在Executer中被初始化,一個Executer中有一個實例,因此 就出現了線程安全問題。

    2. 工具類使用Class,說明是多例的,沒有線程安全問題。每一個task都會持有一份工具類的實例。

  沒有共享成員變量:

    1. 工具類Object,沒有線程安全問題

    2. 工具類使用class,實現序列化便可

  3.2 Spark線程安全問題解決

    工具類優先使用object,但儘量不使用成員變量,若實在有這方面的需求,能夠定義類的類型,或者把成員變量變成線程安全的成員變量,例如加鎖等。

相關文章
相關標籤/搜索