跟關係數據庫的表(Table)同樣,DataFrame是Spark中對帶模式(schema)行列數據的抽象。DateFrame普遍應用於使用SQL處理大數據的各類場景。建立DataFrame有不少種方法,好比從本地List建立、從RDD建立或者從源數據建立,下面簡要介紹建立DataFrame的三種方法。java
經過導入(importing)Spark sql implicits, 就能夠將本地序列(seq), 數組或者RDD轉爲DataFrame。只要這些數據的內容能指定、sql
數據類型便可。數據庫
import sqlContext.implicits._val df = Seq(apache
(1, "First Value", java.sql.Date.valueOf("2010-01-01")),json
(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))api
).toDF("int_column", "string_column", "date_column")數組
注意:若是直接用toDF()而不指定列名字,那麼默認列名爲"_1", "_2", ...app
// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._函數
// Define the schema using a case class.// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interface.case class Person(name: String, age: Int)大數據
// Create an RDD of Person objects and register it as a table.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 執行 sql 語句.val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函數的執行結果也是DataFrame,支持各類經常使用的RDD操做.// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
在SqlContext中使用createDataFrame也能夠建立DataFrame。跟toDF同樣,這裏建立DataFrame的數據形態也能夠是本地數組或者RDD。
import org.apache.spark.sql.types._val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))val df = sqlContext.createDataFrame(rdd, schema)
val df = sqlContext.read.parquet("hdfs:/path/to/file")
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
//首先初始化一個SparkSession對象val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
//而後使用SparkSessions對象加載CSV成爲DataFrameval df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
df.show()