怎樣給Spark傳遞函數—怎樣讓你的Spark應用更高效更健壯

 相信不少人在開始用Spark的時候必定會遇到 Task not serializable的問題,這種問題大多數都是在RDD的算子中調用了不能序列化的對象引發的。爲何傳入算子中的對象必定要可以序列化呢?這就要從Spark自己提及,Spark是一個分佈式的計算框架,RDD(Resilient Distributed Datasets,彈性分佈式數據集)是對分佈式數據集的抽象,數據其實是分佈在集羣的各個節點的,經過RDD進行抽象,讓用戶感受好像是在本地交互同樣。可是實際的運算中,算子中的操做都要發送到計算節點(Executor)端來執行,這就要求傳入算子中的對象能夠進行序列化。 編程

       Spark的算子很大程度上是上經過向集羣上的驅動程序傳遞函數來實現的,編寫Spark應用的關鍵就是使用算子(或者稱爲轉換),給Spark傳遞函數來實現。經常使用的向Spark傳遞函數的方式有兩種(來自於Spark官方文檔,Spark編程指南): 緩存

        第一種:匿名函數,處理的代碼比較少的時候,能夠採用匿名函數,直接寫在算子裏面: 安全

        

myrdd.map(x => x+ 1)



        第二種:全局單例對象中的靜態方法:先定義object對象MyFunctions,以及靜態方法:funcOne,而後傳遞MyFunctions.funcOne給RDD算子。 框架

        

       

object MyFunctions {

      def funcOne(s: String): String = { ... }

 }


 myRdd.map(MyFunctions.funcOne)



       在業務員開發中,須要把RDD的引用傳遞給某一個類的實例的某個方法,傳遞給RDD的函數,爲類實例的實例方法: 分佈式

      

class MyClass {

     def funcOne(s: String): String = { ... }

     def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(funcOne }

 }



        在這個例子中,咱們定義了一個類MyClass,類的實例方法doStuff中傳入了一個RDD,RDD 算子中調用了類的另一個實例方法funcOne,在我麼New 一個MyClass 的實例並調用doStuff的方法的時候,須要講整個實例對象發給集羣,因此類MyClass必須能夠序列化,須要extends Serializable。 函數式編程

       類似的,訪問方法外部的對象變量也會引用整個對象,須要把整個對象發送到集羣: 函數

       

class MyClass {

    val field = "Hello"

    def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field   
      + x) }
}



       爲了不整個對象都發送給集羣,能夠定義一個局部變量來保存外部對象field的引用,這種狀況尤爲在一些大對象裏,能夠避免整個對象發送到集羣,提升效率。 this

      

def doStuff(rdd: RDD[String]): RDD[String] = {

    val field_ = this.field

    rdd.map(x => field_ + x)

}



    

    Spark應用最終是要在集羣中運行的,許多問題在單一的本地環境中沒法暴露出來,有時候常常會遇到本地運行結果和集羣運行結果不一致的問題,這就要求開發的時候多使用函數式編程風格,儘可能使的寫的函數都爲純函數。純函數的好處是:無狀態,線程安全,不須要線程同步,應用程序或者運行環境(Runtime)能夠對純函數的運算結果進行緩存,運算加快速度。 spa

    那麼什麼是純函數了? 線程

    純函數(Pure Function)是這樣一種函數——輸入輸出數據流全是顯式(Explicit)的。顯式(Explicit)的意思是,函數與外界交換數據只有一個惟一渠道——參數和返回值;函數從函數外部接受的全部輸入信息都經過參數傳遞到該函數內部;函數輸出到函數外部的全部信息都經過返回值傳遞到該函數外部。若是一個函數經過隱式(Implicit)方式,從外界獲取數據,或者向外部輸出數據,那麼,該函數就不是純函數,叫做非純函數(Impure Function)。隱式(Implicit)的意思是,函數經過參數和返回值之外的渠道,和外界進行數據交換。好比,讀取全局變量,修改全局變量,都叫做以隱式的方式和外界進行數據交換;好比,利用I/O API(輸入輸出系統函數庫)讀取配置文件,或者輸出到文件,打印到屏幕,都叫作隱式的方式和外界進行數據交換。

      在計算過程當中涉及到對象的交互時,儘可能選用無狀態的對象,好比對於一個bean,成員變量都爲val的,在須要數據交互的地方new 一個新的。

       關於(commutative and associative)交換律和結合律。在傳遞給reudce,reduceByKey,以及其餘的一些merge,聚合的操做中的函數必需要知足交換律和結合律,交換律和結合律就是咱們數學上學過的:

      a + b = b + a,a + b + c =  a + (b + c)

定義的函數func(a,b)和f(b,a)應該獲得相同的結果,f(f(a,b),c)和f(a,f(b,c))應該獲得相同的結果。

     最後說一下廣播變量和累加器的使用。在程序中不要定義一個全局的變量,若是須要在多個節點共享一個數據,能夠採用廣播變量的方法。若是須要一些全局的聚合計算,可使用累加器。

相關文章
相關標籤/搜索