1. SparkSession
sparkSession能夠視爲sqlContext和hiveContext以及StreamingContext的結合體,這些Context的API均可以經過sparkSession使用。
建立SparkSession
val spark = SparkSession.builder
.master("local[2]")
.appName("spark session example")
.getOrCreate()1234
使用enableHiveSupport就可以支持hive,至關於hiveContext
val spark = SparkSession.builder
.master("local[2]")
.appName("spark session example")
.enableHiveSupport()
.getOrCreate()12345
API操做,與以前的Context基本一致
//讀取csv數據
val df0 = spark.read
.option("header","true")
.csv("src/main/resources/test.csv")
//讀取parquet數據
val df1 = spark.read.parquet("...")
//讀取json數據
val df2 = spark.read.json("...")
//sql查詢
val df3 = spark.sql("xxx")
1234567891011121314
Spark 2.0向後兼容,因此hiveContext以及sqlContext依舊可用,不過官方建議開發者開始使用SparkSession。
2. DataSet,RDD,DataFrame
RDD
類型安全,面向對象編程風格,序列化以及反序列化開銷大。
DataFrame
提供了查詢優化器,分區剪枝,自動判斷是否使用broadcast join等功能,對rdd進行了大量的優化。對spark瞭解不深的編程/分析人員很是友好。
能夠視爲Row類型的Dataset (Dataset[Row]),非類型安全,不是面向對象編程風格。
DataSet
繼承了RDD和DataFrame的優勢。數據以編碼的二進制形式存儲,將對象的schema映射爲SparkSQL類型,不須要反序列化就能夠進行shuffle等操做,每條記錄存儲的則是強類型值,類型安全,面向對象編程風格。
Dataset的建立
dataset能夠從rdd,dataFrame轉化,也能夠從原始數據直接生成。
經過toDS方法建立
val ds1 = Seq("a","b").toDS()
ds1.show
//+-----+
//|value|
//+-----+
//| a|
//| b|
//+-----+123456789
經過createDataSet建立
case class Person(name: String, age: Int)
val data = Seq(Person("lsw", 23), Person("yyy", 22))
val ds2 = spark.createDataset(data)
ds2.show
//+----+---+
//|name|age|
//+----+---+
//| lsw| 23|
//| yyy| 22|
//+----+---+1234567891011
DataSet與RDD使用上的區別
Dataset 結合了 rdd 和 DataFrame 上大多數的API,因此spark 1.x基於 rdd 或 DataFrame 的代碼能夠很容易的改寫成spark 2.x版本
數據讀取
RDDs
sparkContext.textFile("/path/to/data.txt")1
Datasets
//返回 DataFrame
val df = spark.read.text("/path/to/data.txt")
//返回 DataSet[String]
val ds1 = spark.read.textFile("/path/to/data.txt")
//或者讀取成DataFram再轉化成Dataset
val ds2 = spark.read.text("/path/to/data.txt").as[String]123456
經常使用API
RDDs
//flatMap,filter
val lines = sc.textFile("/path/to/data.txt")
val res = lines
.flatMap(_.split(" "))
.filter(_ != "")
//reduce
val rdd = sc.makeRDD(Seq(1, 2, 3, 4))
rdd.reduce((a, b) => a + b)123456789
Datasets
//flatMap,filter
val lines = spark.read.textFile("/path/to/data.txt")
val res = lines
.flatMap(_.split(" "))
.filter(_ != "")
//reduce
val ds = Seq(1, 2, 3, 4).toDs
ds.reduce((a, b) => a + b)123456789
reduceByKey
RDDs
val reduceCountByRDD = wordsPair
.reduceByKey(_+_)12
Datasets
val reduceCountByDs = wordsPairDs
.mapGroups((key,values) =>(key,values.length))12
RDD,DataFrame,Dataset的相互轉化
import spark.implicits._
//Dataset轉化爲RDD
val ds2rdd = ds.rdd
//Dataset轉爲DataFrame
val ds2df = ds.toDF
//RDD轉化爲Dataset
val rdd2ds = rdd.toDS
//RDD轉化爲DataFrame
val rdd2df = rdd.toDF
//DataFrame轉化爲RDD
val df2rdd = df.rdd
//DataFrame轉化爲DataSet
val df2ds = df.as[Type]
12345678910111213141516
wordCount
data.txt
hello world
hello spark12
RDDs
val rdd = sc.textFile("src/main/resources/data.txt")
val wordCount = rdd
.map(word => (word,1))
.reduceByKey(_+_)1234
Datasets
import spark.implicits._
val wordCount1 = lines
.flatMap(r => r.split(" "))
.groupByKey(r => r)
.mapGroups((k, v) => (k, v.length))
wordCount1.show
// +-----+--------+
// |value|count(1)|
// +-----+--------+
// |hello| 2|
// |spark| 1|
// |world| 1|
// +-----+--------+
//也能夠直接使用count函數
val wordCount2 = lines
.flatMap(r => r.split(" "))
.groupByKey(v => v)
.count()
wordCount2.show
// +-----+---+
// | _1| _2|
// +-----+---+
// |hello| 2|
// |spark| 1|
// |world| 1|
// +-----+---+123456789101112131415161718192021222324252627
Dataset性能提高(來自官方)
3. Catalog
Spark 2.0中添加了標準的API(稱爲catalog)來訪問Spark SQL中的元數據。這個API既能夠操做Spark SQL,也能夠操做Hive元數據。
獲取catalog
從SparkSession中獲取catalog
val catalog = spark.catalog1
查詢臨時表和元數據中的表
返回Dataset[Table]
catalog.listTable.show
// +----+--------+-----------+---------+-----------+
// |name|database|description|tableType|isTemporary|
// +----+--------+-----------+---------+-----------+
// |table| null| null|TEMPORARY| true|
// |act | default| null| EXTERNAL| false|
// +----+--------+-----------+---------+-----------+
1234567
建立臨時表
使用createTempView和createOrReplaceTempView取代registerTempTable。
例如
df.createTempView("table")
df.createOrReplaceTempView("table")
12
createTempView
建立臨時表,若是已存在同名表則報錯。
createOrReplaceTempView
建立臨時表,若是存在則進行替換,與老版本的registerTempTable功能相同。
銷燬臨時表
使用dropTempView取代dropTempTable,銷燬臨時表的同事會清除緩存的數據。
spark.dropTempView("table")
1
緩存表
對數據進行緩存
//緩存表有兩種方式
df.cache
catalog.cacheTable("table")
//判斷數據是否緩存
catalog.isCached("table")
123456
catalog相較於以前的API,對metadata的操做更加的簡單,直觀。