首先咱們要建立SparkSessionjava
val spark = SparkSession.builder() .appName("test") .master("local") .getOrCreate() import spark.implicits._ //將RDD轉化成爲DataFrame並支持SQL操做
而後咱們經過SparkSession來建立DataFramemysql
1.使用toDF
函數建立DataFramesql
經過導入(importing)spark.implicits, 就能夠將本地序列(seq), 數組或者RDD轉爲DataFrame。shell
只要這些數據的內容能指定數據類型便可。數據庫
import spark.implicits._ val df = Seq( (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15")) ).toDF("id", "name", "created_time")
注意:若是直接用toDF()而不指定列名字,那麼默認列名爲"_1", "_2"apache
能夠經過df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")進行修改列名json
2.使用createDataFrame
函數建立DataFrame數組
經過schema + row 來建立app
咱們能夠通俗的理解爲schema爲表的表頭,row爲表的數據記錄函數
import org.apache.spark.sql.types._ //定義dataframe的結構的schema val schema = StructType(List( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("create_time", DateType, nullable = true) )) //定義dataframe內容的rdd val rdd = sc.parallelize(Seq( Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15")) )) //建立dataframe val df = spark.createDataFrame(rdd, schema)
不過,咱們能夠把文件結構當作參數來使用,經過rdd自動產生schema和row,不用本身手動生成。
import org.apache.spark.sql.types._ //傳入屬性參數 val schemaString = " id name create_time" //解析參數變成StructField val fields = schemaString.split(" ") .map(fieldName => StructField(fieldname, StringType, nullable = true)) //定義dataframe的結構的schema val schema = StructType(fields) //定義dataframe內容的rdd val lines = sc.textFile("file:///people.txt") val rdd = lines.spilt(_.split(",")) .map(attributes=>ROW(attributes(0),attributes(1).trim) ) //建立dataframe val df = spark.createDataFrame(rdd, schema)
3.經過反射機制建立DataFrame
首先要定義一個case class,由於只有case class才能被Spark隱式轉化爲DataFrame
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import spark.implicits._ //建立匹配類 case class Person(id:Int,name:String,age:Long) //讀取文件生成rdd val rdd = sc.textFile("file:///") //經過匹配類把rdd轉化成dataframe val df = rdd.map(_.split(",")) .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()
4.經過文件直接建立DataFrame
(1)使用parquet文件read建立
val df = spark.read.parquet("hdfs:/path/to/file")
(2)使用json文件read建立
val df = spark.read.json("examples/src/main/resources/people.json")
(3)使用csv文件load建立
val df = spark.read .format("com.databricks.spark.csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("csv/file/path")
(4)使用Hive表建立
spark.table("test.person") // 庫名.表名 的格式 .registerTempTable("person") // 註冊成臨時表 spark.sql( """ | select * | from person | limit 10 """.stripMargin).show()
記得,最後咱們要調用spark.stop()來關閉SparkSession。
5.保存
(1)經過df.write.format().save("file:///")保存
write.format()支持輸出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式
,save()定義保存的位置
當咱們保存成功後能夠在保存位置的目錄下看到文件,可是這個文件並非一個文件而是一個目錄。
裏面的內容通常爲
不用擔憂,這是沒錯的。
咱們讀取的時候,並不須要使用文件夾裏面的part-xxxx文件,直接讀取目錄便可。
(2)經過df.rdd.saveAsTextFile("file:///")轉化成rdd再保存
咱們對於不一樣格式的文件讀寫來講,咱們通常使用兩套對應方式
val df = spark.read.格式("file:///")//讀取文件 df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//讀取文件 df.write.save("file:///")//保存文件
具體read和load方法有什麼不一樣,我還不是很清楚,弄明白了回來補充。
6.經過JDBC建立DataFrame
咱們在啓動Spark-shell或者提交任務的時候須要添加相應的jar包
spark-shell(spark-submit)
--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar
val jdbcDf = spark.read.format("jdbc") .option("driver", "com.mysql.jdbc.Driver") //驅動 .option("url", "jdbc:mysql://ip:3306") //數據庫地址 .option("dbtable", "db.user_test") //表名:數據庫名.表名 .option("user", "test") //用戶名 .option("password", "123456") //密碼 .load() jdbcDf.show()