DataFrame的理解

DataFrame不是Spark SQL提出,而是在Pandas就有
DataSet:分佈式的數據集
DataFrame:以列的形式構成的分佈式數據集(RDD with schema)
能夠從各類source轉換成,如RDD、SQL、noSQL等
作了抽象的處理
 
DataFrame對比RDD
DataFrame有具體的列信息
 
運行效率上:
RDD:java/scala => jvm
           Python 本身的運行環境
DataFrame:不管哪一種語言都是同一個logic plan
 
 
DataFrame 的 API:
 
printschema() 輸出一個樹形結構
show() 輸出內容。括號內可限制輸出的條數
Select(COLUMN_NAME) 查詢某一列全部的數據
綜合應用:
peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 5).as("age after 5 years")).show()
查找兩列,並對其中一列進行運算後,更改其列名
 
過濾:
filter()
peopleDF.filter(peopleDF.col("age") > 24).show()
 
分組:
groupBy()
peopleDF.groupBy("age").count().show()
 
轉成臨時視圖(進行SQL操做):
createOrReplaceTempView()  便可轉成sql API進行操做
 
 
DataFrame 與 RDD 的相互操做:
兩種
 
都是要首先導入SparkSession,做爲入口
val spark = SparkSession.builder().appName("DataFrameRDD").master("local[2]").getOrCreate()
 
第一種:反射
代碼簡潔,前提是須要知道schema的構成
藉助case class,在這個類裏定義好schema對應的字段
  1. 建立case class,根據schema來寫
  2. 生成RDD,藉助SparkContext的textFile,獲取文件而後轉成RDD,String類型
  3. 導入Spark.Implicits._  隱式轉換包
  4. 分割RDD,split方法,分割後變成String數組,並和case class相對應起來(也就是把對應的變量傳入class中,記得傳入前進行類型轉換)
  5. toDF方法生成DataFrame
 
代碼:
//定義case class
case class Info(id: Int, name: String, age: Int) {}
//生成RDD
val rdd = spark.sparkContext.textFile("file:////usr/local/mycode/info.txt")
//切割,分類,轉換
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
ps:若分隔符是|或者其餘,有可能要加上轉義字符\\
 
 
第二種:直接構建Dataset
不知道schema的條件下使用
先轉成Rows,結合StructType,代碼量大一點
  1. 生成RDD
  2. 分割RDD,和第一種方法的第4步同樣,而後轉換成RowsRDD
  3. 定義StructType,用一個數組Array來定義,每一個變量的Type用StructField來定義
  4. 用createDataFrame方法關聯RDD和StructType
 
代碼:
//生成RDD
val rdd = spark.sparkContext.textFile("file:////usr/local/mycode/info.txt")
//分割,轉成rowRDD
val rowRdd = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
//定義StructType
val structType = StructType(Array(StructField("id", IntegerType,true),
  StructField("name", StringType, true),
  StructField("age", IntegerType,true)))
//關聯rowRDD和StructType
val infoDF = spark.createDataFrame(rowRdd, structType)
 
 
 
 
DataFrame API詳細:
Show方法:
默認只顯示前20條,可指定更大
若信息太多,默認截取顯示一部分,設置成false的話就不截取了
 
take方法:
take() 返回前面n行記錄
take().foreach 分行顯示
 
first、head方法:
頭幾行
 
select方法:
能夠選擇多列
 
filter方法:
條件裏能夠加其餘字段,好比說substring,可搜索行值中某幾個字符等於指定值的行
studentDF.filter("substr(name, 0, 1) = 'M'").show
 
sort方法:
有desc排序
studentDF.sort(studentDF.col("name").desc, studentDF.col("id").desc).show
 
As方法:
studentDF.select(studentDF.col("name").as("studentName")).show
 
Join方法:
studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id」))
判斷相等時用三個=號
 
 
 
 
Dataset:
 
初次出如今1.6版本 有Spark SQL優化 能使用lambda表達式,但不能用python語言使用Dataset的API
 
DF = DS[Row]
DS 強類型 typed     case class
DF:弱類型   Row
 
 
讀取csv文件變成DataFrame的方法:
val salesDF = spark.read.option("header", "true").option("inferSchema", "true」).csv(path)
header是指解析頭文件,這樣能知道列名
inferSchema是獲取每一列的屬性
 
DF轉DS的方法:
  1. 建立case class
  2. as方法
 
val salesDS = salesDF.as[Sales]
case class Sales(transactionId: Int, customerId: Int, itemId: Int, amountPaid: Double)
選擇某列輸出:
salesDS.map(line => line.itemId).show()
 
SQL、DF、DS的區別
報錯的時機不一樣,DS最敏感,可以更早發現錯誤,即便列名寫錯了也會立刻發現
(編譯時,SQL是命令和列名寫錯都不會報錯;DF命令寫錯會報錯,但列名寫錯不會報錯。前面不報錯的狀況會在運行時報錯)
相關文章
相關標籤/搜索