Spark 序列化問題

在Spark應用開發中,很容易出現以下報錯:php

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2058) ... Caused by: java.io.NotSerializableException

 

該報錯意思是用戶代碼的transformation操做中包含不可序列化的對象引用。html

本文主要從如下三個方面解釋Spark 應用中序列化問題 。 
一、Java序列化含義? 
二、Spark代碼爲何須要序列化? 
三、如何解決Spark序列化問題?java

一、Java序列化含義?

Spark是基於JVM運行的進行,其序列化必然遵照Java的序列化規則。git

序列化就是指將一個對象轉化爲二進制的byte流(注意,不是bit流),而後以文件的方式進行保存或經過網絡傳輸,等待被反序列化讀取出來。序列化常被用於數據存取和通訊過程當中。github

對於java應用實現序列化通常方法:apache

  • class實現序列化操做是讓class 實現Serializable接口,但實現該接口不保證該class必定能夠序列化,由於序列化必須保證該class引用的全部屬性能夠序列化。網絡

  • 這裏須要明白,static和transient修飾的變量不會被序列化,這也是解決序列化問題的方法之一,讓不能序列化的引用用static和transient來修飾。(static修飾的是類的狀態,而不是對象狀態,因此不存在序列化問題。transient修飾的變量,是不會被序列化到文件中,在被反序列化後,transient變量的值被設爲初始值,如int是0,對象是null)閉包

  • 此外還能夠實現readObject()方法和writeObject()方法來自定義實現序列化。(具體用例見參考連接)分佈式

二、Spark的transformation操做爲何須要序列化?

Spark是分佈式執行引擎,其核心抽象是彈性分佈式數據集RDD,其表明了分佈在不一樣節點的數據。Spark的計算是在executor上分佈式執行的,故用戶開發的關於RDD的map,flatMap,reduceByKey等transformation 操做(閉包)有以下執行過程: 
1. 代碼中對象在driver本地序列化 
2. 對象序列化後傳輸到遠程executor節點 
3. 遠程executor節點反序列化對象 
4. 最終遠程節點執行 
故對象在執行中須要序列化經過網絡傳輸,則必須通過序列化過程。ide

三、如何解決Spark序列化問題?

若是出現NotSerializableException報錯,能夠在spark-default.xml文件中加入以下參數來開啓SerializationDebugger功能類,從而能夠在日誌中打印出序列化出問題的類和屬性信息。

spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true spark.driver.extraJavaOption -Dsun.io.serialization.extendedDebugInfo=true

 

對於scala語言開發,解決序列化問題主要以下幾點:

  • 在Object中聲明對象 (每一個class對應有一個Object)
  • 若是在閉包中使用SparkContext或者SqlContext,建議使用SparkContext.get() and SQLContext.getActiveOrCreate()
  • 使用static或transient修飾不可序列化的屬性從而避免序列化。 
    注:scala語言中,class的Object

對於java語言開發,對於不可序列化對象,若是自己不須要存儲或傳輸,則可以使用static或trarnsient修飾;若是須要存儲傳輸,則實現writeObject()/readObject()使用自定義序列化方法。

此外注意

對於Spark Streaming做業,注意哪些操做在driver,哪些操做在executor。由於在driver端(foreachRDD)實例化的對象,極可能不能在foreach中運行,由於對象不能從driver序列化傳遞到executor端(有些對象有TCP連接,必定不能夠序列化)。因此這裏通常在foreachPartitions或foreach算子中來實例化對象,這樣對象在executor端實例化,沒有從driver傳輸到executor的過程。

dstream.foreachRDD { rdd =>
  val where1 = "on the driver" rdd.foreach { record => val where2 = "on different executors" } } }

 

參考資料: 
Avoid NotSerializable Error in Spark Job 
spark not serializable problem 
Spark Streaming / Tips on Running Streaming Apps inside Databricks 
Java 序列化的高級認識 
什麼是writeObject 和readObject?可定製的序列化過程

相關文章
相關標籤/搜索