spark SQL是spark的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象就是DataFrame。html
DataFrame:它能夠根據不少源進行構建,包括:結構化的數據文件,hive中的表,外部的關係型數據庫,以及RDDjava
數據文件students.jsonsql
{"id":1, "name":"leo", "age":18} {"id":2, "name":"jack", "age":19} {"id":3, "name":"marry", "age":17}
spark-shell裏建立DataFrameshell
//將文件上傳到hdfs目錄下 hadoop@master:~/wujiadong$ hadoop fs -put students.json /student/2016113012/spark //啓動spark shell hadoop@slave01:~$ spark-shell //導入SQLContext scala> import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext //聲明一個SQLContext的對象,以便對數據進行操做 scala> val sql = new SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@27acd9a7 //讀取數據 scala> val students = sql.read.json("hdfs://master:9000/student/2016113012/spark/students.json") students: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field] //顯示數據 scala> students.show +---+---+-----+ |age| id| name| +---+---+-----+ | 18| 1| leo| | 19| 2| jack| | 17| 3|marry| +---+---+-----+
scala> students.show +---+---+-----+ |age| id| name| +---+---+-----+ | 18| 1| leo| | 19| 2| jack| | 17| 3|marry| +---+---+-----+ scala> students.printSchema root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) scala> students.select("name").show +-----+ | name| +-----+ | leo| | jack| |marry| +-----+ scala> students.select(students("name"),students("age")+1).show +-----+---------+ | name|(age + 1)| +-----+---------+ | leo| 19| | jack| 20| |marry| 18| +-----+---------+ scala> students.filter(students("age")>18).show +---+---+----+ |age| id|name| +---+---+----+ | 19| 2|jack| +---+---+----+ scala> students.groupBy("age").count().show +---+-----+ |age|count| +---+-----+ | 19| 1| | 17| 1| | 18| 1| +---+-----+
package wujiadong_sparkSQL import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2017/3/5. */ object RDDDataFrameReflection { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("rdddatafromareflection") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val fileRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/students.txt") val lineRDD = fileRDD.map(line => line.split(",")) //將RDD和case class關聯 val studentsRDD = lineRDD.map(x => Students(x(0).toInt,x(1),x(2).toInt)) //在scala中使用反射方式,進行rdd到dataframe的轉換,須要手動導入一個隱式轉換 import sqlContext.implicits._ val studentsDF = studentsRDD.toDF() //註冊表 studentsDF.registerTempTable("t_students") val df = sqlContext.sql("select * from t_students") df.rdd.foreach(row => println(row(0)+","+row(1)+","+row(2))) df.rdd.saveAsTextFile("hdfs://master:9000/student/2016113012/data/out") } } //放到外面 case class Students(id:Int,name:String,age:Int)
運行結果數據庫
hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.RDDDataFrameReflection --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar 17/03/05 22:46:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/03/05 22:46:48 INFO Slf4jLogger: Slf4jLogger started 17/03/05 22:46:48 INFO Remoting: Starting remoting 17/03/05 22:46:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.131:34921] 17/03/05 22:46:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 17/03/05 22:46:51 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 17/03/05 22:47:00 INFO FileInputFormat: Total input paths to process : 1 17/03/05 22:47:07 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 17/03/05 22:47:07 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 17/03/05 22:47:07 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 17/03/05 22:47:07 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 17/03/05 22:47:07 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 1,leo,17 2,marry,17 3,jack,18 4,tom,19 17/03/05 22:47:10 INFO FileOutputCommitter: Saved output of task 'attempt_201703052247_0001_m_000000_1' to hdfs://master:9000/student/2016113012/data/out/_temporary/0/task_201703052247_0001_m_000000
package wujiadong_sparkSQL import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2017/3/5. */ object RDDDataFrameBianchen { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDDDataFrameBianchen") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //指定地址建立rdd val studentsRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/students.txt").map(_.split(",")) //將rdd映射到rowRDD val RowRDD = studentsRDD.map(x => Row(x(0).toInt,x(1),x(2).toInt)) //以編程方式動態構造元素據 val schema = StructType( List( StructField("id",IntegerType,true), StructField("name",StringType,true), StructField("age",IntegerType,true) ) ) //將schema信息映射到rowRDD val studentsDF = sqlContext.createDataFrame(RowRDD,schema) //註冊表 studentsDF.registerTempTable("t_students") val df = sqlContext.sql("select * from t_students order by age") df.rdd.collect().foreach(row => println(row)) } }
運行結果apache
hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.RDDDataFrameBianchen --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar 17/03/06 11:07:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/03/06 11:07:27 INFO Slf4jLogger: Slf4jLogger started 17/03/06 11:07:27 INFO Remoting: Starting remoting 17/03/06 11:07:28 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.131:49756] 17/03/06 11:07:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 17/03/06 11:07:38 INFO FileInputFormat: Total input paths to process : 1 17/03/06 11:07:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 17/03/06 11:07:44 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 17/03/06 11:07:44 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 17/03/06 11:07:44 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 17/03/06 11:07:44 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id [1,leo,17] [2,marry,17] [3,jack,18] [4,tom,19] 17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
1)在spark中,DataFrame是一種以RDD爲基礎的分佈式數據集,相似於傳統數據庫中的二維表格編程
2)DataFrame與RDD的主要區別就是,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型json
參考資料
http://9269309.blog.51cto.com/9259309/1851673app
參考資料
http://blog.csdn.net/ronaldo4511/article/details/53406069tcp
參考資料 http://spark.apache.org/docs/latest/sql-programming-guide.html#overview