SparkSession 是 SparkSQL 的入口,而後能夠基於 sparkSession 來獲取或者是讀取源數據來生存 DataFrameReader,在 Spark 2.x 版本中已經沒有 DataFrame 的 API,它變成了 DataSet[Row] 類型的數據。html
val spark = SparkSession .builder() .master("local") .appName("Spark SQL Basic Examples") .getOrCreate()
import spark.implicits._ import org.apache.spark.sql.types._ // 自定義schema時導入
建立 DataFrame 即 DataSet[Row] 類型數據。
sql
val df = spark.read.json("src/main/resources/general/people.json")
df.show() //打印數據,默認是前20條數據 df.printSchema() df.select("name").show() //提取column是name的數據 df.select($"name",$"age" + 1).show() //提取column是name和age+1的數據 df.filter($"age" > 25).select("name").show() df.groupBy($"age").count().show()
val personDF = sc.textFile("src/main/resources/general/people.txt") //personRDD .map(x => x.split(",")) //Array[String] = Array(name, age) .map(attr => Person(attr(0),attr(1).trim().toInt)) .toDF()
val schemaString = "name,age" val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val personRDD = sc.textFile("src/main/resources/general/people.txt") //personRDD val rowRDD = personRDD.map(_.split(",")).map(attr => Row(attr(0),attr(1).trim())) val personDF = spark.createDataFrame(rowRDD,schema)
或者是調用 createOrReplaceTempView 方法來建立臨時表運行 SQL
apache
personDF.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.map(people => "Name: " + people(0)).show()
資料來源來至 Spark 官方網站json