df.select("id","name").show();
df.select($"id",$"name").where($"name" === "bbb").show()
orderBy/sort($"列名") 升序排列java
orderBy/sort($"列名".desc) 降序排列linux
orderBy/sort($"列1" , $"列2".desc) 按兩列排序sql
例如:apache
df.select($"id",$"name").orderBy($"name".desc).show df.select($"id",$"name").sort($"name".desc).show tabx.select($"id",$"name").sort($"id",$"name".desc).show
groupBy("列名", ...).max(列名) 求最大值json
groupBy("列名", ...).min(列名) 求最小值bash
groupBy("列名", ...).avg(列名) 求平均值服務器
groupBy("列名", ...).sum(列名) 求和spa
groupBy("列名", ...).count() 求個數.net
groupBy("列名", ...).agg 能夠將多個方法進行聚合scala
例如:
scala>val rdd = sc.makeRDD(List((1,"a","bj",100),(2,"b","sh",80),(3,"c","gz",50),(4,"d","bj",45),(5,"e","gz",90))); scala>val df = rdd.toDF("id","name","addr","score"); scala>df.groupBy("addr").count().show() scala>df.groupBy("addr").agg(max($"score"), min($"score"), count($"*")).show
scala>val dept=sc.parallelize(List((100,"caiwubu"),(200,"yanfabu"))).toDF("deptid","deptname") scala>val emp=sc.parallelize(List((1,100,"zhang"),(2,200,"li"),(3,300,"wang"))).toDF("id","did","name") scala>dept.join(emp,$"deptid" === $"did").show scala>dept.join(emp,$"deptid" === $"did","left").show
左向外聯接的結果集包括 LEFT OUTER子句中指定的左表的全部行,而不單單是聯接列所匹配的行。若是左表的某行在右表中沒有匹配行,則在相關聯的結果集行中右表的全部選擇列表列均爲空值。
scala>dept.join(emp,$"deptid" === $"did","right").show
val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num"); df.select($"num" * 100).show
val df = sc.makeRDD(List(("zhang",Array("bj","sh")),("li",Array("sz","gz")))).toDF("name","addrs") df.selectExpr("name","addrs[0]").show
使用結構體:
{"name":"王二小","address":{"city":"大土坡","street":"南二環甲字1號"}} {"name":"流放","address":{"city":"天涯海角","street":"南二環甲字2號"}}
val df = sqlContext.read.json("file:///root/work/users.json") dfs.select("name","address.street").show
其餘
df.count//獲取記錄總數 val row = df.first()//獲取第一條記錄 val value = row.getString(1)//獲取該行指定列的值 df.collect //獲取當前df對象中的全部數據爲一個Array 其實就是調用了df對象對應的底層的rdd的collect方法
df.registerTempTable("tabName")
sqlContext.sql("show tables").show
val sqc = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr"); df.registerTempTable("stu"); sqc.sql("select * from stu").show()
val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr"); df.registerTempTable("stu"); sqc.sql("select * from stu where addr = 'bj'").show()
val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr"); df.registerTempTable("stu"); sqlContext.sql("select * from stu order by addr").show() sqlContext.sql("select * from stu order by addr desc").show()
val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List((1,"a","bj"),(2,"b","sh"),(3,"c","gz"),(4,"d","bj"),(5,"e","gz"))).toDF("id","name","addr"); df.registerTempTable("stu"); sqlContext.sql("select addr,count(*) from stu group by addr").show()
val sqlContext = new org.apache.spark.sql.SQLContext(sc); val dept=sc.parallelize(List((100,"財務部"),(200,"研發部"))).toDF("deptid","deptname") val emp=sc.parallelize(List((1,100,"張財務"),(2,100,"李會計"),(3,300,"王研發"))).toDF("id","did","name") dept.registerTempTable("deptTab"); emp.registerTempTable("empTab"); sqlContext.sql("select deptname,name from deptTab inner join empTab on deptTab.deptid = empTab.did").show()
val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num"); df.registerTempTable("tabx") sqlContext.sql("select * from tabx limit 3").show();
val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List(1,2,3,4,5)).toDF("num"); df.registerTempTable("tabx") sqlContext.sql("select num * 100 from tabx").show();
scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala>hiveContext.sql("create table if not exists zzz (key int, value string) row format delimited fields terminated by '|'") scala>hiveContext.sql("load data local inpath 'file:///home/software/hdata.txt' into table zzz") scala>hiveContext.sql("select key,value from zzz").show
val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.textFile("file:///root/work/words.txt").flatMap{ _.split(" ") }.toDF("word") df.registerTempTable("wordTab") sqlContext.sql("select word,count(*) from wordTab group by word").show
能夠經過java API使用sparksql。
打開scala IDE開發環境,建立一個scala工程。
導入spark相關依賴jar包。
建立包路徑以object類。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object Driver { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("sql") val sc = new SparkContext(conf) //獲取Sparksql上下文對象 val sqc = new SQLContext(sc) val r1 = sc.makeRDD(List(("tom", 23), ("rose", 25), ("jim", 15), ("jary", 30))) //導入sql上下文對象的隱藏類,目的是讓rdd具備toDF方法 import sqc.implicits._ val t1 = r1.toDF("name", "age") t1.registerTempTable("stu") val result = sqc.sql("select * from stu") //DataFrame轉成RDD,通常用於結果的存儲 val resultRDD = result.toJavaRDD resultRDD.saveAsTextFile("D://sqlresult") } }
打jar包,並上傳到linux虛擬機上,在spark的bin目錄下執行以下命令:
sh spark-submit --class cn.tedu.sparksql.Demo01 ./sqlDemo01.jar
最後檢驗。
上一篇:SparkSQL簡介及入門
下一篇: