Spark PySpark數據類型的轉換原理—Writable Converter

Spark目前支持三種開發語言:Scala、Java、Python,目前咱們大量使用Python來開發Spark App(Spark 1.2開始支持使用Python開發Spark Streaming App,咱們也準備嘗試使用Python開發Spark Streaming App),在這期間關於數據類型的問題曾經困擾咱們很長時間,故在此記錄一下心路歷程。
 
Spark是使用Scala語言開發的,Hadoop是使用Java語言開發的,Spark兼容Hadoop Writable,而咱們使用Python語言開發Spark (Streaming) App,Spark Programming Guides(Spark 1.5.1)其中有一段文字說明了它們相互之間數據類型轉換的關係:
 
 
也說是說,咱們須要處理兩個方向的轉換:
 
(1)Writable => Java Type => Python Type;
(2)Python Type => Java Type => Writable;
 
其中Java Type與Python Type之間數據類型的轉換依賴開源組件Pyrolite,相應的數據類型轉換以下:
 
(1)Python Type => Java Type;
 
 
(2)Java Type => Python Type;
 
 
也就是說,Pyrolite已經爲Java Type與Python Type之間的數據類型轉換創建了「標準」,咱們僅僅須要處理Writable與Java Type之間的數據轉換就能夠了。
 
從上圖「Writable Support」中能夠看出PySpark已經爲咱們解決了經常使用的數據類型轉換問題,但能夠理解爲「基本」數據類型,遇到複雜的狀況,仍是須要咱們特殊處理,PySpark已經爲咱們考慮到了這種業務場景,爲咱們提供接口Converter(org.apache.spark.api.python.Converter),使得咱們能夠根據本身的須要擴展數據類型轉換機制:
 
 
接口Converter僅僅只有一個方法convert,其中T表示源數據類型,U表示目標數據類型,參數obj表示源數據值,返回值表示目標數據值。
 
Spark Programming Guides(Spark 1.5.1)也爲咱們舉例說明了一個須要自定義Converter的場景:
 
 
ArrayWritable是Hadoop Writable的一種,由於Array涉及到元素數據類型的問題,所以使用時須要實現相應的子類,如元素數據類型爲整型:
 
 
從上面的描述可知,PySpark使用ArrayWritable時涉及到以下兩個方向的數據類型轉換:
 
(1)Tuple => Object[] => ArrayWritable;
(2)ArrayWritable => Object[] => Tuple;
 
咱們以IntArrayWritable爲例說明如何自定義擴展Converter,同理也須要處理兩個方向的數據類型轉換:Tuple => Object[] => ArrayWritable、ArrayWritable => Object[] => Tuple。
 
(1)Tuple => Object[] => IntArrayWritable;
 
假設咱們有一個list,list的元素類型爲tuple,而tuple的元素類型爲int,咱們須要將這個list中的全部數據以SequenceFile的形式保存至HDFS。對於list中的每個元素tuple,Pyrolite能夠幫助咱們完成Tuple => Object[]的轉換,而Object[] => IntArrayWritable則須要咱們自定義Converter實現。
 
 
PySpark中使用這個Converter寫入數據:
 
 
注意:SequenceFile的數據結構爲<key, value>,爲了簡單起見,key指定爲com.sina.dip.spark.converter.IntArrayWritable,value指定爲org.apache.hadoop.io.NullWritable(即空值)。
 
運行上述程序時,由於有使用到咱們自定義的類,所以須要將com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter編譯打包爲獨立的Jar:converter.jar,並經過參數指定,以下:
 
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_save_data_to_seqfile.py
 
(2)IntArrayWritable => Object[] => Tuple;
 
咱們須要將(1)中寫入SequenceFile的Key(IntArrayWritable)還原爲list,其中list的元素類型爲tuple,tuple的元素類型爲int,IntArrayWritable => Object[]也須要用到咱們自定義的Converter(Object[] => Tuple由Pyrolite負責):
 
 
PySpark使用這個Converter讀取數據:
 
 
同(1),咱們須要將com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter編譯打包爲獨立的Jar:converter.jar,並經過參數指定,以下:
 
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_read_data_from_seqfile.py
 
輸出結果:
 
 
能夠看出,經過自定義擴展的Converter:com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter,咱們實現了IntArrayWritable(com.sina.dip.spark.converter.IntArrayWritable)與Tuple(Python)之間的轉換。
相關文章
相關標籤/搜索