Spark SQL使用時須要有若干「表」的存在,這些「表」能夠來自於Hive,也能夠來自「臨時表」。若是「表」來自於Hive,它的模式(列名、列類型等)在建立時已經肯定,通常狀況下咱們直接經過Spark SQL分析表中的數據便可;若是「表」來自「臨時表」,咱們就須要考慮兩個問題:
(1)「臨時表」的數據是哪來的?
(2)「臨時表」的模式是什麼?
經過Spark的官方文檔能夠了解到,生成一張「臨時表」須要兩個要素:
(1)關聯着數據的RDD;
(2)數據模式;
也就是說,咱們須要將數據模式應用於關聯着數據的RDD,而後就能夠將該RDD註冊爲一張「臨時表」。在這個過程當中,最爲重要的就是數據(模式)的數據類型,它直接影響着Spark SQL計算過程以及計算結果的正確性。
目前pyspark.sql.types支持的數據類型:NullType、StringType、BinaryType、BooleanType、DateType、TimestampType、DecimalType、DoubleType、FloatType、ByteType、IntegerType、LongType、ShortType、ArrayType、MapType、StructType(StructField),其中ArrayType、MapType、StructType咱們稱之爲「複合類型」,其他稱之爲「基本類型」,「複合類型」在是「基本類型」的基礎上構建而來的。
這些「基本類型」與Python數據類型的對應關係以下:
NullType |
None |
StringType |
basestring |
BinaryType |
bytearray |
BooleanType |
bool |
DateType |
datetime.date |
TimestampType |
datetime.datetime |
DecimalType |
decimal.Decimal |
DoubleType |
float(double precision floats) |
FloatType |
float(single precision floats) |
ByteType |
int(a signed integer) |
IntegerType |
int(a signed 32-bit integer) |
LongType |
long(a signed 64-bit integer) |
ShortType |
int(a signed 16-bit integer) |
下面咱們分別介紹這幾種數據類型在Spark SQL中的使用。
1. 數字類型(ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType、DecimalType)
數字類型可分爲兩類,整數類型:ByteType、ShortType、IntegerType、LongType,使用時須要注意各自的整數表示範圍;浮點類型:FloatType、DoubleType、DecimalType,使用時不但須要注意各自的浮點數表示範圍,還須要注意各自的精度範圍。
咱們以常見的數據類型IntegerType來講明數字類型的使用方法:
a. 模擬「一行兩列」的數據,並經過parallelize方法將其轉換爲一個RDD source,這個RDD就是關聯着數據的RDD;
b. 建立數據模式,須要分別爲這兩列指定列名、列類型、能否包含空(Null)值;其中模式須要使用StructType表示,每一列的各個屬性(列名稱、列類型、能否包含空(Null)值)須要使用StructField表示;第一列的列名爲col1,列類型爲IntegerType,不可包含空(Null)值(False);第二列的列名爲col2,列類型爲IntegerType,不可包含空(Null)值(False);(注意:實際使用中每列的數據類型並不必定相同)
c. 經過applySchema方法將數據模式schema應用於RDD source,這會產生一個SchemaRDD(具備模式的RDD) table;
d. 將SchemaRDD table註冊爲一張表:temp_table;
到此咱們就完成了建立RDD、建立Schema、註冊Table的整個過程,接下來就可使用這張表(temp_table)經過Spark(Hive) SQL完成分析。其它數字類型的使用方式相似。
實際上本例中「一行兩列」的數據實際就是IntergerType的表示範圍:[-2147483648, 2147483647],其它數字類型的表示範圍以下:
ByteType |
[-128, 127] |
ShortType |
[-32768, 32767] |
IntegerType |
[-2147483648, 2147483647] |
LongType |
[-9223372036854775808, 9223372036854775807] |
FloatType |
[1.4E-45, 3.4028235E38] |
DoubleType |
[4.9E-324, 1.7976931348623157E308] |
能夠看出,雖然咱們使用Python編寫程序,這些數據類型的表示範圍與Java中的Byte、Short、Integer、Long、Float、Double是一致的,由於Spark是Scala實現的,而Scala運行於Java虛擬機之上,所以Spark SQL中的數據類型ByteType、ShortType、IntegerType、LongType、FloatType、DoubleType、DecimalType在運行過程當中對應的數據其實是由Java中的Byte、Short、Integer、Long、Float、Double表示的。
在使用Python編寫Spark Application時須要牢記:爲分析的數據選擇合適的數據類型,避免由於數據溢出致使輸入數據異常,但這僅僅可以解決數據輸入的溢出問題,還不能解決數據在計算過程當中可能出現的溢出問題。
咱們將上述例子中的示例數據修改成(9223372036854775807, 9223372036854775807),數據類型修改成LongType,如今的示例數據實際是LongType所能表示的最大值,若是咱們將這兩例值相加,是否會出現溢出的狀況呢?
輸出結果:
能夠看出,實際計算結果與咱們預想的徹底同樣,這是由於col1與col2的類型爲LongType,那麼col1 + col2的類型也應爲LongType(緣由見後),然而col1 + col2的結果值18446744073709551614已經超過LongType所能表示的範圍([-9223372036854775808, 9223372036854775807]),必然致使溢出。
由於咱們使用的是HiveContext(SQLContext目前不被推薦使用),不少時候咱們會想到使用「bigint」,
輸出結果依然是:
要解釋這個緣由,須要瞭解一下Hive中數字類型各自的表示範圍:
經過比對能夠發現Hive BIGINT的表示範圍與LongType是一致的,畢竟Hive是Java實現的,所以咱們能夠猜測Hive tinyint、smallint、int、bigint、float、double與Java Byte、Short、Integer、Long、Float、Double是一一對應的(僅僅是猜測,並無實際查看源碼驗證),因此咱們將LongType的數據類型轉換爲BIGINT的方式是行不通的,它們的數值範圍是同樣的。
那麼咱們應該如何解決溢出問題呢?注意到Hive Numeric Types中的最後一個數字類型:DECIMAL,從Hive 0.11.0引入,Hive 0.13.0開始支持用戶能夠自定義「precision」和「scale」。Decimal基於Java BigDecimal實現,能夠表示不可變的任務精度的十進制數字,支持常規的數學運算(+,-,*,/)和UDF(floor、ceil、round等),也能夠與其它數字類型相互轉換(cast)。使用示例以下:
使用Decimal時須要注意「precision」和「scale」值的選取,Java BigDecimal(BigInteger,後續會提到)取值範圍理論上取決於(虛擬)內存的大小,可見它們是比較消耗內存資源的,所以咱們須要根據咱們的實際須要爲它們選取合適的值,而且須要知足下述條件:
整數部分位數(precision - scale) + 小數部分位數(scale) = precision
LongType所能表示的最大位數:19,由於在咱們的示例中會致使溢出問題,所以咱們將數值轉換爲Decimal,並指定precision爲38,scale爲0,這樣咱們即可以獲得正確的結果:
須要注意的是計算結果類型也變成decimal.Decimal(Python),使用Python編寫Spark Application時,pyspark也提供了DecimalType,它是一種比較特殊的數據類型,它不是Python內建的數據類型,使用時須要導入模塊decimal,使用方式以下:
使用數據類型DecimalType時有兩個地方須要注意:
(1)建立RDD時須要使用模塊decimal中的Decimal生成數據;
(2)DecimalType在Spark 1.2.0環境下使用時會出現異常:java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal,在Spark 1.5.0環境下能夠正常使用,但須要將模塊名稱由「pyspark.sql」修改成「pyspark.sql.types」。
咱們明確指定數據的類型是什麼,那麼什麼決定咱們常規數學運算(+,-,*,/)以後的結果類型呢?這些數學運行在Hive中實際都是由UDF實現的(org.apache.hadoop.hive.ql.exec.FunctionRegistry),
(1)+
(2)-
(3)*
(4)/
(5)%
能夠看出,「+」,「-」,「*」,「%」經過重載支持的數據類型:byte、short、int、long、float、double、decimal,「/」經過重載僅僅支持數據類型:double、decimal,計算的結果類型與輸入類型是相同的,這也意味着:
(1)數學運算「+」、「-」,「*」,「%」時可能會出現隱式轉換(如int + long => long + long);
(2)數學運算「/」則統一將輸入數據轉換爲數據類型double或decimal進行運算,這一點也意味着,計算結果相應地爲數據類型double或decimal。
2. 時間類型(DateType,TimestampType)
DateType能夠理解爲年、月、日,TimestampType能夠理解爲年、月、日、時、分、秒,它們分別對着着Python datetime中的date,datetime,使用示例以下:
輸出結果:
3. StringType、BooleanType、BinaryType、NoneType
這幾種數據類型的使用方法大體相同,就不一一講解了,注意BinaryType對應着使用了Python中的bytearray。
輸出結果:
4. 複合數據類型(ArrayType、MapType、StructType)
複合數據類型共有三種:數組(ArrayType)、字典(MapType)、結構體(StructType),其中數組(ArrayType)要求數組元素類型一致;字典(MapType)要求全部「key」的類型一致,全部「value」的類型一致,但「key」、「value」的類型能夠不一致;結構體(StructType)的元素類型能夠不一致。
(1)ArrayType
ArrayType要求指定數組元素類型。
(2)MapType
MapType要求指定鍵(key)類型和值(value)類型。
(3)StructType
StructType包含的元素類型可不一致,須要根據元素的次序依次爲其指定合適的名稱與數據類型。
綜上所述,Spark(Hive)SQL爲咱們提供了豐富的數據類型,咱們須要根據分析數據的實際狀況爲其選取合適的數據類型(基本類型、複合類型)、尤爲是數據類型各自的表示(精度)範圍以及數據溢出的狀況處理。