摘要java
咱們在使用spark的一個流程是:利用spark.sql()函數把數據讀入到內存造成DataSet[Row](DataFrame)因爲Row是新的spark數據集中沒法實現自動的編碼,須要對這個數據集進行編碼,才能利用這些算子進行相關的操做,如何編碼是一個問題,在這裏就把這幾個問題進行總結一下。報的錯誤:error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.sql
報這個錯誤通常就是咱們在使用算子時其返回值的數據類型每每不是spark經過自身的反射能完成的自動編碼部分,好比經過map算子,咱們在map算子的函數的返回值類型是Map類型的,就會出現上面的問題,由於Map集合類不在:基本的類型和String,case class和元組的範圍以內,spark內部不能經過反射完成自動編碼。apache
出現這個問題的緣由api
spark2.0之後的版本採用的是新的分佈式數據集DataSet,其中DataFrame是DataSet[Row]的別名形式。而新的數據集採用了不少的優化,其中一個就是利用了Tungsten execution engine的計算引擎,這個計算引擎採用了不少的優化。其中一個就是本身維護了一個內存管理器,從而使計算從java jvm解脫出來了,使得內存的優化獲得了很大的提高。同時新的計算引擎,把數據存儲在內存中是以二進制的形式存儲的,大部分全部的計算都是在二進制數據流上進行的,不須要把二進制數據流反序列化成java對象,而後再把計算的結果序列化成二進制數據流,而是直接在二進制流上進行操做,這樣的狀況就須要咱們存在一種機制就是java對象到二進制數據流的映射關係,否則咱們不知道二進制流對應的數據對象是幾個字節,spark這個過程是經過Encoders來完成的,spark自身經過反射完成了一部分的自動編碼過程:基本的類型和String,case class和元組,對於其餘的集合類型或者咱們自定義的類,他是沒法完成這樣的編碼的。須要咱們本身定義這樣的編碼也就是讓其擁有一個schema。jvm
解決這個問題方式分佈式
方法一:函數
這樣就是把其轉化爲RDD,利用RDD進行操做,可是不建議用這個,相對於RDD,DataSet進行了不少的底層優化,擁有很不錯性能性能
val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).rdd.map(myfunction)
方法二:優化
讓其自動把DataSet[Row]轉化爲DataSet[P],若是Row裏面有複雜的類型出現的話。編碼
case class Orders(id: String, user_id: String) //這個case class要定義在咱們的單例對象的外面 object a { def main(args: Array[String]): Unit ={ import spark.implicits._ val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).as[Orders].map(myfunction) } }
方式三:
自定義一個schema,而後利用RowEncoder進行編碼。這只是一個例子,裏面的類型其實均可以經過spark的反射自動完成編碼過程。
import spark.implicits._ val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true)))) val encoders = RowEncoder(schema) val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).map(row => row)(encoders)
方法四:
直接利用scala的模式匹配的策略case Row來進行是能夠經過的,緣由是case Row()scala模式匹配的知識,這樣能夠知道集合Row裏面擁有多少個基本的類型,則能夠經過scala就能夠完成對Row的自動編碼,而後能夠進行相應的處理。
import spark.implicits._ val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)} 這個獲得的schema爲: orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string] 若是換成這樣: val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)} 獲得的schema爲: orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array<string>] 能夠看出:spark是把元祖當作case class一種特殊形式擁有,schame的字段名稱爲_1,_2這樣的特殊case clase