spark序列化
對於優化<網絡性能>極爲重要,將RDD以序列化格式來保存減小內存佔用. spark.serializer=org.apache.spark.serializer.JavaSerialization
Spark默認 使用Java自帶的ObjectOutputStream 框架來序列化對象,這樣任何實現了 java.io.Serializable 接口的對象,都能被序列化。同時,還能夠經過擴展 java.io.Externalizable 來控制序列化性能。Java序列化很靈活但性能差速度很慢,同時序列化後佔用的字節數也較多。
spark.serializer=org.apache.spark.serializer.KryoSerialization
KryoSerialization速度快,能夠配置爲任何org.apache.spark.serializer的子類。但Kryo也不支持全部實現了 java.io.Serializable 接口的類型,它須要你在程序中 register 須要序列化的類型,以獲得最佳性能。
LZO的支持要求先安裝 Hadoop-lzo包(每一個節點), 並放到 Spark本地庫中。若是是Debian包安裝,在調用spark-submit時加上 --driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/ 就能夠。 下載lzo http://cn.jarfire.org/hadoop.lzo.html
在 SparkConf 初始化的時候調用 conf.set(「spark.serializer」, 「org.apache.spark.serializer.KryoSerializer」) 使用 Kryo。這個設置不只控制各個worker節點之間的混洗數據序列化格式,同時還控制RDD存到磁盤上的序列化格式。須要在使用時註冊須要序列化的類型,建議在對網絡敏感的應用場景下使用Kryo。 若是你的自定義類型須要使用Kryo序列化,能夠用 registerKryoClasses 方法先註冊:
val conf = new SparkConf.setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf) 最後,若是你不註冊須要序列化的自定義類型,Kryo也能工做,不過每個對象實例的序列化結果都會包含一份完整的類名,這有點浪費空間。
在Scala中使用New API (Twitter Elephant Bird 包) lzo JsonInputFormat讀取 LZO 算法壓縮的 JSON 文件:
val input = sc.newAPIHadoopFile(inputFile, classOf[lzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
inputFile: 輸入路徑
接收第一個類:「格式」類,輸入格式
接收第二個類:「鍵」
接收第二個類:「值」
conf:設置一些額外的壓縮選項
在Scala中使用老API直接讀取 KeyValueTextInputFormat()最簡單的Hadoop輸入格式 :
val input = sc.HadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{ case (x, y) => (x.toString, y.toString) }
html
注:若是讀取單個壓縮過的輸入,作好不要考慮使用Spark的封裝(textFile/SequenceFile..),而是使用 newAPIHadoopFile 或者 HadoopFile,並指定正確的壓縮解碼器。 有些輸入格式(如SequenceFile)容許咱們只壓縮鍵值對數據中的值,這在查詢時頗有用。其它一些輸入格式也有本身的壓縮控制,如:Twitter Elephant Bird 包中的許多格式均可以使用LZO算法壓縮數據。java