【sparkSQL】建立DataFrame及保存

首先咱們要建立SparkSessionjava

val spark = SparkSession.builder()
                        .appName("test")
                        .master("local")
                        .getOrCreate()
import spark.implicits._ //將RDD轉化成爲DataFrame並支持SQL操做        

而後咱們經過SparkSession來建立DataFramemysql

1.使用toDF函數建立DataFramesql

 經過導入(importing)spark.implicits, 就能夠將本地序列(seq), 數組或者RDD轉爲DataFrame。shell

 只要這些數據的內容能指定數據類型便可。數據庫

import spark.implicits._
val df = Seq(
  (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
).toDF("id", "name", "created_time")

注意:若是直接用toDF()而不指定列名字,那麼默認列名爲"_1", "_2"apache

能夠經過df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")進行修改列名json

2.使用createDataFrame函數建立DataFrame數組

經過schema + row 來建立app

咱們能夠通俗的理解爲schema爲表的表頭,row爲表的數據記錄函數

import org.apache.spark.sql.types._
//定義dataframe的結構的schema
val schema = StructType(List(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = true),
    StructField("create_time", DateType, nullable = true)
))
//定義dataframe內容的rdd
val rdd = sc.parallelize(Seq(
  Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
))
//建立dataframe
val df = spark.createDataFrame(rdd, schema)

不過,咱們能夠把文件結構當作參數來使用,經過rdd自動產生schema和row,不用本身手動生成。

import org.apache.spark.sql.types._

//傳入屬性參數
val schemaString = " id name create_time" 
//解析參數變成StructField
val fields = schemaString.split(" ")
                         .map(fieldName => StructField(fieldname, StringType, nullable = true))
//定義dataframe的結構的schema
val schema = StructType(fields)

//定義dataframe內容的rdd
val lines = sc.textFile("file:///people.txt")
val rdd = lines.spilt(_.split(","))
               .map(attributes=>ROW(attributes(0),attributes(1).trim) )

//建立dataframe
val df = spark.createDataFrame(rdd, schema)       

3.經過反射機制建立DataFrame

首先要定義一個case class,由於只有case class才能被Spark隱式轉化爲DataFrame

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._
//建立匹配類
case class Person(id:Int,name:String,age:Long)
//讀取文件生成rdd
val rdd = sc.textFile("file:///")
//經過匹配類把rdd轉化成dataframe
val df = rdd.map(_.split(","))
            .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()  

4.經過文件直接建立DataFrame

 (1)使用parquet文件read建立  

val df = spark.read.parquet("hdfs:/path/to/file")

 (2)使用json文件read建立

val df = spark.read.json("examples/src/main/resources/people.json")

 (3)使用csv文件load建立

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path")

 (4)使用Hive表建立

spark.table("test.person") // 庫名.表名 的格式
     .registerTempTable("person")  // 註冊成臨時表
spark.sql(
      """
        | select *
        | from person
        | limit 10
      """.stripMargin).show()

記得,最後咱們要調用spark.stop()來關閉SparkSession。  

5.保存

 (1)經過df.write.format().save("file:///")保存

  write.format()支持輸出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式

  ,save()定義保存的位置

  當咱們保存成功後能夠在保存位置的目錄下看到文件,可是這個文件並非一個文件而是一個目錄。

  裏面的內容通常爲

  

  不用擔憂,這是沒錯的。

  咱們讀取的時候,並不須要使用文件夾裏面的part-xxxx文件,直接讀取目錄便可。

 (2)經過df.rdd.saveAsTextFile("file:///")轉化成rdd再保存

咱們對於不一樣格式的文件讀寫來講,咱們通常使用兩套對應方式

val df = spark.read.格式("file:///")//讀取文件
df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//讀取文件
df.write.save("file:///")//保存文件

具體read和load方法有什麼不一樣,我還不是很清楚,弄明白了回來補充。

6.經過JDBC建立DataFrame 

咱們在啓動Spark-shell或者提交任務的時候須要添加相應的jar包

spark-shell(spark-submit)

--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \

--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar

val jdbcDf = spark.read.format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")   //驅動
    .option("url", "jdbc:mysql://ip:3306")  //數據庫地址
    .option("dbtable", "db.user_test") //表名:數據庫名.表名
    .option("user", "test") //用戶名
    .option("password", "123456")  //密碼
    .load()
jdbcDf.show()
相關文章
相關標籤/搜索