Spark SQL學習——DataFrame和DataSet

其餘更多java基礎文章:
java基礎學習(目錄)html


學習資料:Spark-SQL之DataFrame基本操做
Spark SQL 函數全集
再談RDD、DataFrame、DataSet關係以及相互轉換(JAVA API)
DataFrame經常使用方法(代碼詳細)
DataSet經常使用方法(代碼詳細)java

DataFrame基本原理

咱們知道,RDD是spark早期很重要的一個概念,是數據的immutable distributed的集合,由不一樣節點上的partition組成。DataFrame和RDD相似,也是數據的不可變分佈式集合。不一樣的是,數據被組織成帶名字的列,就像關係型數據庫中的表。是一種有結構的高級別抽象,與之相應的提供了一種領域特定語言(DSL)API來操做這些分佈式數據。sql

DataFrame直觀上很像是RDDs的增強版,它和RDDs在數據存儲上最大的區別就在於,DataFrame是有Schema的,通俗的講,就是上圖中藍色框住的那個表頭。不要小看這一點,對於複雜的數據類型,DataFrame的這種結構可使編程大大簡化。

在spark2.0後,DataFrame的API和DataSet的API合併統一了,DataFrame至關於DataSet[Row]。如今只須要處理DataSet相關API便可。數據庫

DataFrame的限制

  • 沒有編譯階段的類型檢查: 不能在編譯時刻對安全性作出檢查,並且限制了用戶對於未知結構的數據進行操做。好比下面代碼在編譯時沒有錯誤,可是在執行時會出現異常:
case class Person(name : String , age : Int) 
val dataframe = sqlContect.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name
複製代碼
  • 不能保留類對象的結構: 一旦把一個類結構的對象轉成了Dataframe,就不能轉回去了。下面這個栗子就是指出了:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
複製代碼

DataSet

Dataset API是對DataFrame的一個擴展,使得能夠支持類型安全的檢查,而且對類結構的對象支持程序接口。它是強類型的,不可變collection,並映射成一個相關的schema。 Dataset API的核心是一個被稱爲Encoder的概念。它是負責對JVM的對象以及表格化的表達(tabular representation)之間的相互轉化。 表格化的表達在存儲時使用了Spark內置的Tungsten二進制形式,容許對序列化數據操做並改進了內存使用。在Spark 1.6版本以後,支持自動化生成Encoder,能夠對普遍的primitive類型(好比String,Integer,Long等)、Scala的case class以及Java Bean自動生成對應的Encoder。編程

DataFrame也能夠叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什麼類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段json

而Dataset中,每一行是什麼類型是不必定的,在自定義了case class以後能夠很自由的得到每一行的信息安全

DataFrame和DataSet的建立

park2.0之前,使用不一樣的數據類型,須要採用不一樣的入口類,如RDD對應的入口類是SparkContext,DataFrame對應的入口類是SQLContext或者HiveContext,入口類用於指定Spark集羣的各類參數,並與資源管理器作交互。bash

Spark2.0開始及以後,提出了一個統一的入口類SparkSession,封裝了SparkContext、SQLContext和HiveContext。具體代碼能夠看學習資料中的再談RDD、DataFrame、DataSet關係以及相互轉換(JAVA API)分佈式

  • 若是要建立RDD,須要經過SparkSession入口類獲取SparkContext入口類
  • 若是要建立DataFrame和DataSet,經過 SparkSession入口類便可。

建立DataFrame的幾種方式

SparkSQL初始和建立DataFrame的幾種方式函數

  • 一、讀取json格式的文件建立DataFrame
  • 二、經過json格式的RDD建立DataFrame
  • 非json格式的RDD建立DataFrame(重要)
    • 經過反射的方式將非json格式的RDD轉換成DataFrame
    • 動態建立Schema將非json格式的RDD轉換成DataFrame(建議使用)

經過反射的方式將非json格式的RDD轉換成DataFrame

/**
* 傳入進去Person.class的時候,sqlContext是經過反射的方式建立DataFrame
* 在底層經過反射的方式得到Person的全部field,結合RDD自己,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.registerTempTable("person");
sqlContext.sql("select name from person where id = 2").show();
複製代碼

動態建立Schema將非json格式的RDD轉換成DataFrame

/**
* 動態構建DataFrame中的元數據,通常來講這裏的字段能夠來源自字符串,也能夠來源於外部數據庫
*/
List<StructField> asList =Arrays.asList(//這裏字段順序必定要和上邊對應起來
   DataTypes.createStructField("id", DataTypes.StringType, true),
   DataTypes.createStructField("name", DataTypes.StringType, true),
   DataTypes.createStructField("age", DataTypes.IntegerType, true)
);

StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

df.show();
複製代碼
相關文章
相關標籤/搜索