dataframessql
1、create dataframesapache
val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.json"
2、interoperating with rddsjson
spark support two ways for converting existing rdds into dataframes.app
1.interring the schema using reflectionide
val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Person(name: String,age: Int) /* *[hadoop@hftclclw0001 resources]$ cat people.txt *Michael, 29 *Andy, 30 *Justin, 19 */ //people_rdd_1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:24 val people_rdd_1 = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")) //people_rdd_2: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[23] at map at <console>:26 val people_rdd_2 = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)) //people_df: org.apache.spark.sql.DataFrame = [name: string, age: int] val people_df = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF() people_df.registerTempTable("people) val teenagers = sqlContext.sql("select name,age from people where age >= 13 and age <= 19") teenagers.map(t=>"Name:" + t(0)).collect().foreach(println) teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println) teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)
2. programmatically specifying the schemaoop
a.create an rdd of rows from the original rddspa
b.create the schema represented by a structtype matching the structure of rows in rdd created in step a.scala
c.apply the schema to the rdd of rows via createdataframe method provided by sqlcontextcode
val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) //a.create an rdd val people = sc.textFile("file:///home/hadoop/spark-1.5.1-bin-hadoop2.6/examples/src/main/resources/people.txt") val schemastring = "name age" import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType} //b.create the schema based on the string of schema val schema = StructType(schemastring.split(" ").map(fieldName => StructField(fieldName,StringType,true))) // val rowRdd = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim)) //c.create data frame val peopleDataFrame = sqlContext.createDataFrame(rowRdd,schema) peopleDataFrame.registerTempTable("people") val results = sqlContext.sql("select name from people") results.map(t=>"Name:" + t(0)).collect().foreach(println)
Data Sourceshadoop