將代碼從 spark 1.x 移植到 spark 2.x

1. SparkSession
sparkSession能夠視爲sqlContext和hiveContext以及StreamingContext的結合體,這些Context的API均可以經過sparkSession使用。
建立SparkSession
 
val spark = SparkSession.builder
    .master("local[2]")
    .appName("spark session example")
    .getOrCreate()1234
使用enableHiveSupport就可以支持hive,至關於hiveContext
 
val spark = SparkSession.builder
    .master("local[2]")
    .appName("spark session example")
    .enableHiveSupport()
    .getOrCreate()12345
API操做,與以前的Context基本一致
 
//讀取csv數據
val df0 = spark.read
  .option("header","true")
  .csv("src/main/resources/test.csv")
//讀取parquet數據
val df1 = spark.read.parquet("...")
//讀取json數據
val df2 = spark.read.json("...")
//sql查詢
val df3 = spark.sql("xxx")
1234567891011121314
Spark 2.0向後兼容,因此hiveContext以及sqlContext依舊可用,不過官方建議開發者開始使用SparkSession。
 
2. DataSet,RDD,DataFrame

RDD
類型安全,面向對象編程風格,序列化以及反序列化開銷大。
DataFrame
提供了查詢優化器,分區剪枝,自動判斷是否使用broadcast join等功能,對rdd進行了大量的優化。對spark瞭解不深的編程/分析人員很是友好。
能夠視爲Row類型的Dataset (Dataset[Row]),非類型安全,不是面向對象編程風格。
DataSet
繼承了RDD和DataFrame的優勢。數據以編碼的二進制形式存儲,將對象的schema映射爲SparkSQL類型,不須要反序列化就能夠進行shuffle等操做,每條記錄存儲的則是強類型值,類型安全,面向對象編程風格。
 

Dataset的建立
dataset能夠從rdd,dataFrame轉化,也能夠從原始數據直接生成。
經過toDS方法建立
 
val ds1 = Seq("a","b").toDS()
ds1.show
//+-----+
//|value|
//+-----+
//|    a|
//|    b|
//+-----+123456789
經過createDataSet建立
 
case class Person(name: String, age: Int)
val data = Seq(Person("lsw", 23), Person("yyy", 22))
val ds2 = spark.createDataset(data)
ds2.show
//+----+---+
//|name|age|
//+----+---+
//| lsw| 23|
//| yyy| 22|
//+----+---+1234567891011
 
DataSet與RDD使用上的區別
Dataset 結合了 rdd 和 DataFrame 上大多數的API,因此spark 1.x基於 rdd 或 DataFrame 的代碼能夠很容易的改寫成spark 2.x版本

數據讀取
RDDs
sparkContext.textFile("/path/to/data.txt")1
Datasets
//返回 DataFrame
val df = spark.read.text("/path/to/data.txt")
//返回 DataSet[String]
val ds1 = spark.read.textFile("/path/to/data.txt")
//或者讀取成DataFram再轉化成Dataset
val ds2 = spark.read.text("/path/to/data.txt").as[String]123456
經常使用API
RDDs
//flatMap,filter
val lines = sc.textFile("/path/to/data.txt")
val res = lines
  .flatMap(_.split(" "))
  .filter(_ != "")
//reduce
val rdd = sc.makeRDD(Seq(1, 2, 3, 4))
rdd.reduce((a, b) => a + b)123456789
Datasets
//flatMap,filter
val lines = spark.read.textFile("/path/to/data.txt")
val res = lines
  .flatMap(_.split(" "))
  .filter(_ != "")
//reduce
val ds = Seq(1, 2, 3, 4).toDs
ds.reduce((a, b) => a + b)123456789
reduceByKey
RDDs
val reduceCountByRDD = wordsPair
  .reduceByKey(_+_)12
Datasets
val reduceCountByDs = wordsPairDs
  .mapGroups((key,values) =>(key,values.length))12
RDD,DataFrame,Dataset的相互轉化
import spark.implicits._
//Dataset轉化爲RDD
val ds2rdd = ds.rdd
//Dataset轉爲DataFrame
val ds2df = ds.toDF
//RDD轉化爲Dataset
val rdd2ds = rdd.toDS
//RDD轉化爲DataFrame
val rdd2df = rdd.toDF
//DataFrame轉化爲RDD
val df2rdd = df.rdd
//DataFrame轉化爲DataSet
val df2ds = df.as[Type]
12345678910111213141516
wordCount
data.txt
hello world
hello spark12
RDDs
val rdd = sc.textFile("src/main/resources/data.txt")
val wordCount = rdd
  .map(word => (word,1))
  .reduceByKey(_+_)1234
Datasets
import spark.implicits._
val wordCount1 = lines
  .flatMap(r => r.split(" "))
  .groupByKey(r => r)
  .mapGroups((k, v) => (k, v.length))
wordCount1.show
//  +-----+--------+
//  |value|count(1)|
//  +-----+--------+
//  |hello|       2|
//  |spark|       1|
//  |world|       1|
//  +-----+--------+
//也能夠直接使用count函數
val wordCount2 = lines
  .flatMap(r => r.split(" "))
  .groupByKey(v => v)
  .count()
wordCount2.show
//  +-----+---+
//  |   _1| _2|
//  +-----+---+
//  |hello|  2|
//  |spark|  1|
//  |world|  1|
//  +-----+---+123456789101112131415161718192021222324252627

Dataset性能提高(來自官方)
 
 
 

3. Catalog
Spark 2.0中添加了標準的API(稱爲catalog)來訪問Spark SQL中的元數據。這個API既能夠操做Spark SQL,也能夠操做Hive元數據。
 
獲取catalog
從SparkSession中獲取catalog
 
val catalog = spark.catalog1
 
查詢臨時表和元數據中的表
返回Dataset[Table]
catalog.listTable.show
//  +----+--------+-----------+---------+-----------+
//  |name|database|description|tableType|isTemporary|
//  +----+--------+-----------+---------+-----------+
//  |table|   null|      null|TEMPORARY|        true|
//  |act | default|      null| EXTERNAL|       false|
//  +----+--------+-----------+---------+-----------+
1234567
 
建立臨時表
使用createTempView和createOrReplaceTempView取代registerTempTable。
例如   
df.createTempView("table")
df.createOrReplaceTempView("table")
12

createTempView
建立臨時表,若是已存在同名表則報錯。
createOrReplaceTempView
建立臨時表,若是存在則進行替換,與老版本的registerTempTable功能相同。
 

銷燬臨時表
使用dropTempView取代dropTempTable,銷燬臨時表的同事會清除緩存的數據。
spark.dropTempView("table")
1
 
緩存表
對數據進行緩存
//緩存表有兩種方式
df.cache
catalog.cacheTable("table")
//判斷數據是否緩存
catalog.isCached("table")
123456
catalog相較於以前的API,對metadata的操做更加的簡單,直觀。
相關文章
相關標籤/搜索