Spark SQL 1、sparkSQL的特色 1.支持多種數據源:hive RDD Partquet JSON JDBC 2.多種性能優化技術:in-memory columnar storage \ byte-code generation \ cost model 動態評估 3.組件擴展性:對於SQL的語法解析器、分析器、以及優化器,用戶均可以本身從新開發,而且動態擴展 Spark sql 的性能優化技術簡介 1.內存列存儲(in-memory columnar storage) 內存列存儲意味着spark sql的數據,不是使用java對象的方式進行存儲,而是使用面向列存儲的方式進行存儲,也就是說,每一列,做爲一個數據存儲的單位,從而大大優化內存使用的效率。使用列存儲以後,減小對內存的消耗,也就避免了對GC(垃圾回收)大量數據的性能開銷 2.字節碼生成技術(byte-code generation ) Spark sql在其catalyst模塊的Expressions中增長一codegen模塊,對於sql語句中的計算表達式,好比select num + num from t 這種sql,就可使用動態字節碼生成技術來優化其性能。 3.Scala代碼編寫的優化 對於Scala代碼編寫中,可能會形成大量的性能的開銷,本身重寫,使用更加複雜的方式,來獲取更好的性能。好比option樣例類、for循環、map/filter/foreach等高階函數,以及不可變對象,都改爲用null,while循環來實現,而且重用可變的對象 2、dataframe的使用 1.spark sql 和 dataframe引言 Spark sql 是spark中的一個模塊,主要是進行結構化數據的處理。他提供的最核心的編程抽象,就是dataframe。同時spark sql 還能夠做爲分佈式的sql查詢引擎。Spark sql最重要的功能之一就是從hive中查詢數據 Dataframe,能夠理解爲時,以列的形式組織的,分佈式的數據集合,他其實和關係型數據庫中的表很是相似,可是底層作了很對的優化。Dataframe能夠經過不少來源,包括:結構化數據文件,hive表,外部關係型數據庫以及RDD 2.SQLContext 要使用spark sql ,首先就得建立一個SQLContext對象,或者是他的子類的對象(HiveContext),好比HiveContext對象; Java版本: JavaSparkContext sc = ....; SQLContext SQLContext = new SQLContext(sc); Scala版本: Val sc = SparkContext.. Val SQLContext = new SQLContext(sc) Import SQLContext.implicits._ 3.HiveContext 除了基本的SQLContext之外,還可使用它的子類---HiveContext。HiveContext的功能除了包含SQLContext提供的全部的功能外,還包括額外的專門針對hive的一些功能。這些額外的功能包括:使用hive語法編寫和執行sql,使用hive的UDF函數,從hive表中讀取數據 要使用HiveContext,就必須預先安裝好hive,SQLContext支持的數據源,HiveContext也一樣支持,而不僅是支持hive,對spark1.3.x以上的版本,都推薦使用HiveContext,由於其功能更加豐富和完善 Spark sql 還支持使用spark.sql.dialect參數設置sql方言,使用SQLContext的serConf()便可進行設置。對與SQLContext,他只支持」sql」一種方言。對於HiveContext,它默認的方言是「hiveql」 4.建立Dataframe 使用SQLContext,能夠從RDD、hive表中或者其餘額數據源,來建立dataframe。 Java版本: package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * 使用json文件建立dataframe * @author Administrator */ public class DataFrameCreate { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("DataFrameCreate"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext SQLContext = new SQLContext(sc); DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/spark_input/students.json"); df.show(); } } 提交至集羣:運行打包,上傳。編寫腳本進行提交 【這裏是一臺機器測試】 bin/spark-submit \ --class com.spark.spark_sql.DataFrameCreate \ --files /opt/modules/apache-hive-0.13.1-bin/conf/hive-site.xml \ --driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar \ /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/sql/sparksql_01.jar 運行的結果 +---+---+--------+ |age| id| name| +---+---+--------+ | 10| 1| leo| | 25| 2| kity| | 30| 4| lucy| | 20| 3| tom| | 18| 7| jack| | 23| 10| edison| | 36| 5| owen| | 20| 8| jiny| | 40| 6| lisi| | 45| 9|zhangsan| +---+---+--------+ Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object DataFrameCreate { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("DataFrameCreate") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json") df.show } } Dataframe經常使用的操做 java package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * Dataframe的經常使用的操做 * @author Administrator * */ public class DataFrameOperation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("DataFrameOperation"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext SQLContext = new SQLContext(sc); //建立出來的dataframe徹底能夠能夠理解爲一張表 DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/input/students.txt"); //打印dataframe中全部的數據 df.show(); //打印dataframe中元數據的信息 df.printSchema(); //查詢某列全部的數據 df.select("name").show(); //查詢某幾列全部的數據,並對列進行計算 df.select(df.col("name"), df.col("age").plus(1)).show();//將查詢出來的age進行加一 //根據某一列的值進行過濾 df.filter(df.col("age").gt(30)).show(); //年齡大於30的進行過濾 //根據某一列進行分組聚合 df.groupBy(df.col("age")).count().show(); } } Scala package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object DataframeOperation { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("DataframeOperation") val sc = new SparkContext(conf) val SQLContext = new SQLContext(sc) val df = SQLContext.read.json("hdfs://hadoop01:8020/spark_input/student.json" df.show df.printSchema() df.select("name").show df.select(df("name"),df("age")+1).show df.filter(df("age") > 30).show df.groupBy("age").count().show } } 5.RDD與Dataframe之間轉換 爲何要將RDD轉換爲Dataframe?由於這樣的話,咱們就能夠直接針對hdfs上的任何能夠構建爲RDD的數據,使用spark sql 進行sql查詢,這個功能無比強大。想象一下,針對hdfs中的數據,直接就可使用sql進行查詢 Spark sql 支持兩種方式來將RDD轉換爲Dataframe 第一種方式,是使用反射來推斷包含了特定數據類型的RDD的元數據,這種基於反射的方式,代碼比較簡單,當你已經知道你的RDD的元數據時,是一種很是不錯的方式。 第二種方式,是經過編程接口來建立dataframe,你能夠在程序運行的時候動態構建一份元數據,而後將其應用到已經存在的RDD上,這種方式的代碼比較冗長,可是若是在編寫程序時,還不知道RDD的元數據,只有在程序運行時,才能動態得知元數據,那麼只能經過這種動態構建元數據的方式。 1.使用反射的方式推斷元數據 Java版本:spark sql是支持將包含javaBean的RDD轉換爲dataframe的,Javabean的信息,就定義了元數據。Spark sql 如今是不支持包含嵌套javabean或者list等複雜元數據的Javabean。 Scala版本:而scala因爲具備隱式轉換的特性,因此spark sql的scala接口,是支持自動將包含case class 的RDD轉換爲Dataframe的。Case class 就定義了元數據。Spark sql 會經過反射讀取傳遞給 case class 的參數的名稱,而後將其做爲列名。與java不一樣的是,Spark sql是支持將包含了嵌套的數據結構的case class做爲元數據的,好比包含了Array等。 Java版本: package com.spark.spark_sql; import java.io.Serializable; public class Student implements Serializable { private static final long serialVersionUID = 1L; private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Student() { } public Student(int id, String name, int age) { super(); this.id = id; this.name = name; this.age = age; } @Override public String toString() { return "Student [id=" + id + ", name=" + name + ", age=" + age + "]"; } } package com.spark.spark_sql; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; /** * 使用反射的方式將RDD轉換爲dataframe * @author Administrator * */ public class RDD2DataframeReflection { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeReflection"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext SQLContext = new SQLContext(sc); JavaRDD<String> lines = sc.textFile("E://student.txt"); JavaRDD<Student> students = lines.map(new Function<String, Student>() { private static final long serialVersionUID = 1L; @Override public Student call(String line) throws Exception { String[] split = line.split(","); Student stu = new Student(); stu.setId(Integer.parseInt(split[0])); stu.setName(split[1]); stu.setAge(Integer.parseInt(split[2])); return stu; } }); //使用反射的方式將RDD轉換爲dataframe //將student.class傳入進去其實就是經過反射的方式來建立dataframe //由於student.class 自己就是反射的一個應用 //而後底層還得經過student class 進行反射。來獲取其中的fields //這裏要求Javabean要實現Serializable接口,能夠序列化 DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //拿到一個dataframe以後,就能夠將其註冊爲一張臨時表,而後針對其中的數據進行sql語句 studentDF.registerTempTable("student"); //針對student臨時表執行sql語句,查詢年齡大於20歲的學生 DataFrame df =sqlContext.sql("select * from student where age > 20"); //將查詢出來的dataframe再次轉換爲RDD JavaRDD<Row> teenagerRDD = df.javaRDD(); //將RDD中的數據,進行映射,給每一個人的年齡,而後映射爲student JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() { private static final long serialVersionUID = 1L; @Override public Student call(Row row) throws Exception { //row中的順序是按照字典順序進行排列的 Student student = new Student(); student.setAge(row.getInt(0)); student.setId(row.getInt(1)); student.setName(row.getString(2)); return student; } }); //將數據collect 回來,打印出來 List<Student> studentList = teenagerStudentRDD.collect(); for (Student student : studentList) { System.out.println(student); } } } Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext /** * 若是要使用scala開發spark程序 * 而後在其中還要實現基於反射的RDD到dataframe的轉換,就必須得用object extends App的方式 * 不能使用def main()方法的方式來運行程序,不然就會報錯no typetag for ... class */ object RDD2DaframeReflection extends App{ val conf = new SparkConf().setMaster("local").setAppName("RDD2DaframeReflection") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines = sc.textFile("e://student.txt", 1) //在scala中使用反射的方式,進行RDD到Dataframe的轉換,須要手動的導入一個隱士轉換 import SQLContext.implicits._ case class Student(id : Int ,name : String ,age : Int) //這裏其實就是一個普通的,元素是case class 的rdd val students = lines.map(line =>line.split(",")).map(arr => Student(arr(0).trim().toInt,arr(1),arr(2).trim().toInt)) //直接使用RDD的toDF,便可將其轉換爲dataframe val studentDF=students.toDF() //註冊爲一個臨時表 studentDF.registerTempTable("student") // val teenagerDF = sqlContext.sql("select * from student where age > 20") val teenagerRDD = teenagerDF.rdd //在scala中,row中的數據的順序,反而是按照咱們指望的來排列的,這個是跟java是不同的 teenagerRDD.map{row => Student(row(0).toString().toInt,row(1).toString(),row(2).toString().toInt ) }.collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age)) //在scala中,對row的使用比java中的row更加的豐富 //在scala中,能夠用row的getAs()方法,獲取指定列名的列 teenagerRDD.map(row =>Student(row.getAs[Int]("id"),row.getAs("name"),row.getAs("age"))).collect() .foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age)) //還能夠經過row的getValuesMap,方法,獲取指定幾列的值,返回的是一個map teenagerRDD.map(row => { val map = row.getValuesMap(Array("id","name","age")) Student(map("id").toString().toInt,map("name").toString(),map("age").toString().toInt) }).collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age)) } 2.經過編程接口來建立dataframe Java版本: package com.spark.spark_sql; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * 以編程的方式動態的執行元數據,將RDD轉化爲dataframe * @author Administrator * */ public class RDD2DataProgrammatically { public static void main(String[] args) { //建立sparkconf SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataProgrammatically"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); //第一步,建立一個普通的RDD,可是必須將其轉換爲RDD<Row>的格式 JavaRDD<String> lines = sc.textFile("e://student.txt"); JavaRDD<Row> rows = lines.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String line) throws Exception { String[] split = line.split(","); return RowFactory.create(Integer.parseInt(split[0]),split[1],Integer.parseInt(split[2])); } }); //第二步,動態元數構造據 //好比說,id name age 等fields的名稱和類型都是在程序運行過程當中, //動態的從MySQL等DB中或者是配置文件中,加載出來的,是不固定的 //因此特別適合這種編程的方式來構造元數據 List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(fields); //第三部,使用動態構造元數據,將RDD轉換爲dataframe DataFrame studentDF = sqlContext.createDataFrame(rows, structType); //後面就能夠直接使用這個df了 //註冊臨時表 studentDF.registerTempTable("student"); DataFrame stus = sqlContext.sql("select * from student where age > 20"); List<Row> list = stus.javaRDD().collect(); for (Row row : list) { System.out.println(row); } } } Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType object RDD2DataframeProgramatically extends App{ val conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeProgramatically"); val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //第一步:構造出元素爲Row的普通的RDD val studentRDD = sc.textFile("e://student.txt", 1) .map(line => Row( line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt)) //第二步:以編程的方式構造元數據 val structType =StructType(Array( StructField("id",IntegerType,true), StructField("name",StringType,true), StructField("age",IntegerType,true))) //第三步:進行RDD到dataframe的轉換 val studentDF = SQLContext.createDataFrame(studentRDD,structType ) studentDF.registerTempTable("student"); val teenagerDF = sqlContext.sql("select * from student where age > 20") teenagerDF.rdd.collect.foreach(row => println(row)) } 6.通用的load和save操做 1.dataframe的load和save 對於的spark sql的dataframe來講,不管是從什麼數據源建立出來的dataframe,都有一些共同的load和save操做,load操做主要是用於加載數據,建立出來的dataframe:save操做,主要用於將dataframe中的數據集保存到文件中(保存到的是一個目錄中) Java版本: Dataframe df = sqlContext.read().load(「users.parquet」); df.select(「name」,」facourite_color」).write().save(「nameAndFav_color_dir」); Scala版本: Val df = sqlContext.read.load(「users.parquet」) Df.select(「name」,」facourite_color」).write().save(「nameAndFav_color_dir」) Java版本: package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * 通用的load和save操做 * @author Administrator */ public class GenericLoadSave { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("GenericLoadSave"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame usersDF = SQLContext.read().load("C://Users//Administrator//Desktop//users.parquet"); usersDF.printSchema(); usersDF.show(); usersDF.select("name","favourite_color").write().save("e://users2"); } } Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import scala.tools.scalap.Main import org.apache.spark.sql.DataFrame object GenericLoadSave { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("GenericLoadSave") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val usersDF = sqlContext.read.load("hdfs://hadoop01:8020/spark_input/users.parquet") usersDF.select("name","favourite_color").write.save("hdfs://hadoop01:8020/spark_output") } } 2.手動指定數據源: 也能夠手動指定用來操做的數據源,數據源一般是使用其權限定名來指定,好比parquet是org.apache.spark.sql.parquet。可是spark sql內置了一些數據源類型,好比json,parquet.jdbc等等,實際上,經過這個功能,就能夠在不一樣類型的數據源之間進行轉換了。好比將json文件中的數據存儲到parquet文件中。默認狀況下,若是不指定數據源,默認就是parquet Java版本: DataFrame df =SQLContext.read().format(「json」).load(「people,json」) Df.select(「name」,」age」).write().format(「parquet」).save(「out」) Scala版本: Val df = SQLContext.read.format(「json」).load(「people,json」) Df.select(「name」,」age」).write.format(「parquet」).save(「out」) Java版本 package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** * 手動指定數據源 * @author Administrator */ public class ManuallySpecifyOptions { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("ManuallySpecifyOptions"); SparkContext sc = new SparkContext(conf); SQLContext SQLContext = new SQLContext(sc); DataFrame df = SQLContext.read().format("json").load("e://users.parquet"); df.write().format("json").save("e://out"); } } 3.Save mode Spark sql 對於save操做,提供了不一樣的save mode.主要用來處理,當目標位置已經有數據時,應該如何處理。並且save操做並不會執行鎖操做,而且不是原子的,所以是有必定風險出現髒數據的 Save mode 意義 SaveMode.ErrorIfExists(默認) 若是目標位置存在數據,那麼就拋出異常 SaveMode.Append 若是目標位置存在數據,那麼就將數據追加進去 SaveMode.Overwrite 若是目標位置存在數據,那麼就將已經存在的數據刪除,用新的數據進行覆蓋 SaveMode.Ignore 若是目標位置存在數據,那麼就忽略,不作任何的操做 Java版本: package com.spark.spark_sql; import org.apache.derby.impl.tools.sysinfo.Main; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; /** * savemode 示例 * @author Administrator */ public class SaveModeDemo { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SaveModeDemo"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.read().format("json").load("e://users.parquet"); df.save("e://out", SaveMode.Append); } } 3、Parquet數據源 1.使用編程方式加載數據 Parquet是面向分析型業務的列式存儲格式,由Twitter和cloudera合做開發。2015年成爲Apache的頂級項目 列式存儲和行式存儲相比較有哪些優勢; 1.能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量 2.壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding 和 Delta Encoding)進一步節約磁盤空間。 3.只讀取須要的列,支持向量運算,可以獲取更好的掃描性能 Java版本: package com.spark.spark_sql; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; /** * parquet數據源之使用編程方式加載數據 */ public class ParquetLoadData { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetLoadData"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); //讀取parquet文件中的數據,建立一個dataframe DataFrame userDF = sqlContext.read().parquet("e://users.parquet"); //將其註冊爲臨時表,使用sql查詢所須要的數據 userDF.registerTempTable("users"); DataFrame userNamesDF = sqlContext.sql("select name from users"); //對查詢出來的dataframe進行transformation操做,處理數據,而後打印出來 List<String> userNames = userNamesDF.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { return "Name: "+row.getString(0); } }).collect(); for (String name : userNames) { System.out.println(name); } } } Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object PatquetLoadData { val conf = new SparkConf().setAppName("PatquetLoadData") val sc = new SparkContext(conf) val sqlContext =new SQLContext(sc) val usersDF = sqlContext.read.parquet("hdfs://hadoop01:8020/spark_input/users.parquet"); usersDF.registerTempTable("user"); val userNameDF = sqlContext.sql("select * from user") userNameDF.rdd.map(row => "Name: "+row(0)).collect.foreach(userName =>println(userName)) } 2.自動分區推斷 表分區是一張常見的優化的方式,好比hive中就提供分區表的特性。在一個分區表中,不一樣分區的數據一般存儲在不一樣的目錄中,分區列的值一般就包含在了分區的目錄名中。Spark sql中的parquet數據源,支持自動根據目錄名推斷出分區信息。例如,若是將入口數據存儲在分區表中,而且使用性別和國家做爲分區列。那麼目錄結構多是以下所示: |----tableName |----gender=male |----country=US |..... |----country=ZH |----gender=female |--country=... |... 若是將tableName傳入SQLContext.read.parquet()或者SQLContext.read.load()方法,那麼sparksql就會自動根據目錄的結構,推斷出分區的信息,是gender和country。即便數據文件中包含兩列的值name和age,可是sparksql返回的dataframe,調用printSchema()方法時,會打印出四個列的值:name age country gender .這就是自動分區推斷的功能 此外,分區的列的數據類型,也是自動被推斷出來的。目前,spark sql僅支持自動推斷出數字類型和字符串類型。有時,用戶也許不但願spark sql自動推斷分區列的數據類型。此時只要設置一個配置便可,spark.sql.sources.partitionColumnTypeInference.enabled,默認是true,即自動推斷分區列的類型,設置爲false,即不會自動推斷類型。進行自定推斷分區列的類型時,全部的分區列的類型,就統一默認的是String 案列: 1.hdfs上建立相對應的目錄結構: bin/hdfs dfs -mkdir -p /spark_input/gender=male/country=US 2.將文件上傳到目錄下 bin/hdfs dfs -put users.parquet /spark_input/gender=male/country=US/ 3.查詢出schema package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; /** *parquet數據源之 自動推斷分區 */ public class ParquetPartitionDiscovery { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("ParquetPartitionDiscovery").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame userDF = sqlContext.read().parquet("hdfs://hadoop01:8020/spark_input/gender=male/country=US/users.parquet"); userDF.printSchema(); userDF.show(); } } 4.結果: root |-- name: binary (nullable = true) |-- favourite_color: binary (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true) +----------------+-------------------+------+-------+ | name| favourite_color|gender|country| +----------------+-------------------+------+-------+ | [6C 65 6F]| [72 65 64]| male| US| | [6A 61 63 6B]|[79 65 6C 6C 6F 77]| male| US| |[6B 69 74 74 79]| [77 68 69 74 65]| male| US| | [74 6F 6D]| [67 72 65 65 6E]| male| US| | [61 6C 6C]| [70 69 6E 6B]| male| US| +----------------+-------------------+------+-------+ 【注意】前面是由於parquet數據的問題,本身從hive中導出來的。能夠利用json格式的數據load,而後save的時候以parquet的方式生成一個parquet文件,用於測試 3.合併元數據 如同ProtocolBuffer,Avro,Thrift同樣,Parquet也是支持元數據合併的。用戶能夠一開始就定義一個簡單的元數據,而後隨着業務須要。逐漸往元數據中添加更多的列,在這種狀況下,用戶可能會建立多個parquet文件,有着多個不一樣的卻互相兼容的元數據。Parquet數據源支持自動推斷這種狀況,而且進行多個parquet文件的元數據的合併 由於元數據合併是一種相對耗時的操做,並且在大多數的狀況下不是一種必要的特性,從spark1.5.0版本開始,默認是關閉parquet文件的自動合併元數據的。能夠經過如下的兩種方式開啓parquet數據源的自動合併元數據的特性 1.讀取parquet文件時,將數據源的選項,mergeSchema,設置爲true 2.根據sqlContext.setConf()方法,將spark.paquet.mergeSchema參數設置爲true 案例: 合併學生的基本信息和成績信息的元數據 package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode object ParquetMergeSchema { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ParquetMergeSchema") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import SQLContext.implicits._ //建立一個dataframe,做爲學生的基本信息,並寫入一個parquet文件中 val studentsWithNameAge =Array(("leo",23),("jack",25)) val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name","age") studentsWithNameAgeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append) //建立一個dataframe,做爲學生的成績信息,並寫入一個parquet文件中 val studentsWithNameGrade =Array(("marry","A"),("jack","B")) val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name","age") studentsWithNameGradeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append) //首先,第一個dataframe和第二個dataframe的元數據確定是不同的 //一個包含了name和age兩個列,一個是包含name和grade兩個列 //因此,這指望的是,讀取出來的表數據,自動合併兩個文件的元數據,出現三列,name age grade //用mergeSchema的方式,讀取students表中的數據,進行元數據的合併 val studentsDF = sqlContext.read.option("mergeSchema", "true").parquet("hdfs://hadoop01:8020/spark_out/students") studentsDF.printSchema(); studentsDF.show(); } } 4、JSON數據源 Spark sql 能夠自動推斷JSON文件的元數據,而且加載其數據,建立一個dataframe。可使用SQLContext.read.json()方法,針對一個元素爲String的RDD,或者是一個JSON文件 可是要注意的是,這裏使用的JSON文件與傳統意義上的JSON文件是不同的。每行都必須也只能包含一個,單獨的,自包含的,有效的JSON對象。不能讓json對象分散在多行。不然會報錯 綜合性複雜案例:查詢成績爲80分以上的學生的基本信息與成績信息 Java 版本 package com.spark.spark_sql; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; /** * json數據源 */ public class JSONDataSource { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JSONDataSource").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 針對json文件,建立出dataframe DataFrame studentDF = sqlContext.read().json("e://student.json"); //hdfs://hadoop01:8020/spark_input/student.json // 針對學生成績信息的dataframe,註冊臨時表,查詢分數大於80分的學生的姓名和分數 studentDF.registerTempTable("student"); DataFrame stuNameDF = sqlContext.sql("select name,score from student where score > 80"); List<String> goodName = stuNameDF.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; @Override public String call(Row row) throws Exception { return row.getString(0); } }).collect(); // 針對JavaRDD<String> 建立dataframe List<String> studentInfoJSONs = new ArrayList<String>(); studentInfoJSONs.add("{\"name\":\"leo\",\"age\":18}"); studentInfoJSONs.add("{\"name\":\"marry\",\"age\":15}"); studentInfoJSONs.add("{\"name\":\"jack\",\"age\":30}"); JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs); DataFrame studentInfoDF = sqlContext.read().json(studentInfoJSONsRDD); // 針對學生的基本信息的dataframe,註冊臨時表,而後查詢分數大於80分的學生的基本信息 studentInfoDF.registerTempTable("info"); String sql = "select name , age from info where name in ("; for (int i = 0; i < goodName.size(); i++) { sql += "'" + goodName.get(i) + "'"; if (i < goodName.size() - 1) { sql += ","; } } sql += ")"; DataFrame goodStuInfosDF = sqlContext.sql(sql); // 將兩份數據的dataframe轉換爲javapairRDD,執行join transformation JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = goodStuInfosDF.javaRDD() .mapToPair(new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1))); } }).join(stuNameDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1))); } })); //將封裝在RDD中好學生的所有信息,轉換爲一個javaRDD<Row>的格式 JavaRDD<Row> rows = pairs.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception { return RowFactory.create(t._1,t._2._2,t._2._1); } }); //建立一份元數據,將JavaRDD<Row>轉換爲dataframe List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame goodStudentsDF =sqlContext.createDataFrame(rows, structType); //將好學生的所有信息保存到json文件中去 goodStudentsDF.write().format("json").save("e://good"); //hdfs://hadoop01:8020/spark_out/goodStudent } } Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.IntegerType object JSONDatasource { val conf = new SparkConf().setAppName("JSONDatasource") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val studentScoreDF =sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json") studentScoreDF.registerTempTable("stu_score") val goodStuScoreDF = sqlContext.sql("select name , score from stu_score where score > 80") val goodStuNames = goodStuScoreDF.map(row => row(0)).collect() val studentInfoJSON =Array("{\"name\":\"Leo\",\"age\":18}","{\"name\":\"marry\",\"age\":15}","{\"name\":\"jack\",\"age\":30}") val studentInfoRDD =sc.parallelize(studentInfoJSON, 3) val studentInfoDF = sqlContext.read.json(studentInfoRDD)//json格式特有的,能夠接受RDD studentInfoDF.registerTempTable("stu_info") var sql = "select name , age from stu_info where name in (" for(i <- 0 until goodStuNames.length){ sql += "'"+goodStuNames(i)+"'" if(i < goodStuNames.length-1){ sql += "," } } sql += ")" val goodStuInfoDF = sqlContext.sql(sql) val goodStuRDD = goodStuInfoDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("age"))}.join (goodStuScoreDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("score"))}) val goodStuRows = goodStuRDD.map(info => Row(info._1,info._2._2.toInt,info._2._1.toInt)) val structType = StructType(Array(StructField("name",StringType,true),StructField("score",IntegerType,true),StructField("age",IntegerType,true))) val goodStudentDF = sqlContext.createDataFrame(goodStuRows, structType) goodStudentDF.write.format("json").save("hdfs://hadoop01:8020/spark_out/good_scala") } 5、Hive數據源 Spark sql 支持對hive中存儲的數據進行讀寫,操做hive中的數據時,就必須建立HiveContext,而不是SQLContext。HiveContext繼承SQLContext,可是增長了在hive元數據庫中查找表,以及用hiveql語法編寫SQL的功能。處理sql()方法外,HiveContext還提供hql()方法,從而用hive語法來編譯sql。 使用HiveContext,能夠執行Hive的大部分功能,包括建立表、往表裏導入數據以及用SQL語句查詢表中的數據,查詢出來的數據是一個row數組 將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下 HiveContext sqlContext = new HiveContext(sc); sqlContext.sql(「create table if not exists student (name string ,age int)」) sqlContext.sql(「load data local inpath ‘/usr/local/student.txt’into table students」); Row[] teenagers = sqlContext.sql(「select name ,age from students where age <= 18」).collect(); 將數據保存到表中 Spark sql還容許將數據保存到hive表中。調用Dataframe的saveAsTable,便可將dataframe中的數據保存到hive表中。與registerTempTable不一樣,saveAsTable是會將dataframe彙總的數據物化到hive表中,並且還會在hive元數據庫中建立表的元數據 默認狀況下,saveAsTable會建立一張hive managed table,也就是說,數據的位置都是有元數據庫中的信息控制的。當managed table 被刪除時,表中的數據也會一併內物理刪除 RegisterTempTable只是註冊一個臨時的表,只要spark application重啓或者中止了,那麼表就沒有了。而SaveAsTable建立的是物化的表,不管是spark application重啓仍是中止,表都會一直存在 調用HiveContext.table()方法,還能夠直接針對hive中的表,建立一個dataframe 案例:查詢分數大於80分的學生的信息 package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; /** * hive數據源 * @author Administrator */ public class HiveDataSource { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("HiveDataSource"); JavaSparkContext sc = new JavaSparkContext(conf); //建立HiveContext,注意,這裏接收的是sparkContext做爲參數,不是Javasparkcontext HiveContext hiveContext = new HiveContext(sc.sc()); //第一個功能,使用HiveContext的sql()/hql()方法,能夠執行hive中能夠執行的hiveQL語句 //判斷是否存在student_info表,若是存在則刪除 hiveContext.sql("drop table if exists student_info"); //判斷student_info表是否不存在,不存在就建立該表 hiveContext.sql("create table if not exists student_info(name string,age int) row format delimited fields terminated by '\t'"); //將學生基本信息導入到Student_info表 hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info"); //用一樣的方式向student_scores導入數據 hiveContext.sql("drop table if exists student_score"); hiveContext.sql("create table if not exists student_score(name string,score int)row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score"); //第二個功能個,執行sql還能夠返回dataframe,用於查詢 //執行sql查詢,關聯兩種表,查詢成績大於80 分的學生信息 DataFrame goodstudentDF = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80"); //第三個功能,能夠將dataframe中的數據,理論上來講,dataframe對應的RDD的元素是ROW就能夠 //將Dataframe中的數據保存到hive表中 //將dataframe中的數據保存到good_student_info表中 hiveContext.sql("drop table if exists good_student_info"); goodstudentDF.saveAsTable("good_student_info"); //第四個功能,可使用table()方法,針對hive表直接建立dataframe //針對good_student_info表,直接建立dataframe Row[] rows = hiveContext.table("good_student_info").collect(); for (Row row : rows) { System.out.println(row); } sc.close(); } } Scala版本: package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame import org.apache.spark.SparkContext object HiveDataSource { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("HiveDataSource"); val sc = new SparkContext(conf) //建立HiveContext,注意,這裏接收的是sparkContext做爲參數,不是Javasparkcontext val hiveContext = new HiveContext(sc); //第一個功能,使用HiveContext的sql()/hql()方法,能夠執行hive中能夠執行的hiveQL語句 //判斷是否存在student_info表,若是存在則刪除 hiveContext.sql("drop table if exists student_info"); //判斷student_info表是否不存在,不存在就建立該表 hiveContext.sql("create table if not exists student_info(name string,age int) row format delimited fields terminated by '\t'"); //將學生基本信息導入到Student_info表 hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info"); //用一樣的方式向student_scores導入數據 hiveContext.sql("drop table if exists student_score"); hiveContext.sql("create table if not exists student_score(name string,score int)row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score"); //第二個功能個,執行sql還能夠返回dataframe,用於查詢 //執行sql查詢,關聯兩種表,查詢成績大於80 分的學生信息 val goodstudentDF = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80"); //第三個功能,能夠將dataframe中的數據,理論上來講,dataframe對應的RDD的元素是ROW就能夠 //將Dataframe中的數據保存到hive表中 //將dataframe中的數據保存到good_student_info表中 hiveContext.sql("drop table if exists good_student_info"); goodstudentDF.saveAsTable("good_student_info"); //第四個功能,可使用table()方法,針對hive表直接建立dataframe //針對good_student_info表,直接建立dataframe val rows = hiveContext.table("good_student_info").collect(); for ( row <- rows) { println(row); } } } 6、內置函數 在spark 1.5.x版本,增長了一系列內置函數到dataframe API中,而且實現了code-generation的優化。與普通的函數不一樣,dataframe的函數並不會執行後當即返回一個結果值,而是返回一個Column對象,用於在並行做業中進行求值。Column能夠用在dataframe的操做之中,好比select, filter,groupBy等操做。函數的輸入值,也能夠是Column。 種類 函數 聚合函數 approxCountDistinct,avg,count, countDistinct,first,last,max,mean,min,sum,sumDistinct 集合函數 Array_contains,explode,size,sort_array 日期/時間函數 日期時間轉換 Unix_timestamp,from_unixtime,to_date Quarter,day,dayofyear,weekofyear, From_utc_timestamp,to_utc_timestamp 從日期中提取字段 Year,month,dayofmonth,hour,minute, second 日期/時間函數 日期時間計算 Dateiff,date_add,date_sub,add_months, last_day,next_day,months_between 獲取當前時間 Current_date,current_timestamp,trunc, date_format 數學函數 Abs,scros,asin,atan,atan2,bin,cbrt, ceil,conv,cos,sosh,exp,expm1,factorial,floor,hex,hypot,log,log10,log1p,log2, Pmod,pow,rint,round,shiftLeft, shiftRight,shiftRightUnsigned,sifnum, Sin,sinh,sqrt,tan,tanh,toDegrees, toRadians,unhex 混合函數 Array,bitwiseNOT,callUDF,coalesce, crc32,greatest,if,inputFileName,isNaN, isnotnull,isnull,least,lit,md5, monotonicalliIncreasingId,nanvl,negate,not,rand,randn,randn sha,sha1,sparkPartitionId,struct,when 字符串函數 Ascii,base64,concat,concat_ws,decode,encode,format_number,format_string,get_json_object,initcap,instr,length,levenshtein,locate,lower,lpad,ltrim,printf,redexp_extract,regexp_replace,repeat,reverse,rpad,rtrim,soundex,space,split,substring,substring_index,translate,trim,unbase64,upper 窗口函數 cumeDist,denseRank,lag,lead,ntile,percentRank,rank,rowNumber 案例: 根據天天的用戶訪問和購買日誌,統計每日的UV和銷售額 統計每日的UV package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.functions._ object DailyUV { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("DailyUV") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //構造用戶 訪問日誌數據,並建立dataframe //要使用spark內置函數,就必須在這裏代入sparkcontext下的隱式轉換 import sqlContext.implicits._ //模擬用戶訪問日誌,日誌用逗號隔開,第一列是日期,第二列是用戶id val userAccessLog = Array("2015-10-01,1122","2015-10-01,1122", "2015-10-01,1123", "2015-10-01,1125","2015-10-01,1125", "2015-10-02,1122", "2015-10-02,1124","2015-10-02,1122", "2015-10-02,1123") val userAccessLogRDD= sc.parallelize(userAccessLog, 1); //將模擬出來的用戶日誌RDD,轉換爲dataframe //首先將普通的RDD轉換爲row的RDD val userAccessLogRowRDD = userAccessLogRDD.map(log =>Row(log.split(",")(0),log.split(",")(1).toInt)) //構造dataframe元數據 val structType = StructType(Array(StructField("date",StringType,true),StructField("userid",IntegerType,true))) val userAccessLogRowDF=sqlContext.createDataFrame(userAccessLogRowRDD, structType) //這裏正式使用spark1.5.x版本的提供的最新特性,內置函數,countDistinct //天天都有不少用戶來訪問,可是每一個用戶可能天天都會訪問不少次 //UV:指的是對用戶進行去重之後的訪問總數 //聚合函數的用法; //首先對dataframe調用groupBy()方法,對某一列進行分組 //而後調用agg()方法,第一個參數,必須必須傳入以前在groupBy方法中出現的字段 //第二個參數,傳入countDistinct、sum,first等,spark提供的內置函數 //內置函數中,傳入的參數也是用單引號做爲前綴的,其餘的字段 userAccessLogRowDF.groupBy("date").agg('date, countDistinct('userid)).map(row => Row(row(1),row(2))).collect.foreach(println) } } 統計每日的銷售額 package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.functions._ object DailySale { val conf = new SparkConf().setMaster("local").setAppName("DailySale") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //模擬元數據 //實際上,有些時候,會出現日誌的上報錯誤,好比日誌裏丟了用戶的信息,那麼這種,就一概不統計了 val userSaleLog = Array("2015-10-01,100,1122","2015-10-01,100,1133","2015-10-01,100,","2015-10-02,300,1122","2015-10-02,200,1122","2015-10-02,100,","2015-10-02,100,1122") val userSaleLogRDD = sc.parallelize(userSaleLog, 1); //進行過濾 val filteredUserSaleRowRDD = userSaleLogRDD.filter(log => if(log.split(",").length ==3) true else false) val userSaleLogRowRDD = filteredUserSaleRowRDD.map(log => Row(log.split(",")(0),log.split(",")(1).toDouble)) val structType = StructType(Array(StructField("date",StringType,true),StructField("sale_amount",DoubleType,true))) val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType) //開始統計每日銷售額的統計 userSaleLogDF.groupBy("date").agg('date, sum('sale_amount)).map(row => Row(row(1),row(2))).collect().foreach(println) } 7、開窗函數 Spark1.4.x版本之後,爲spark sql和dataframe引入了開窗函數,好比最經典最經常使用的row_number(),可讓咱們實現分組取topN的邏輯 案例:統計每一個種類的銷售額排名前3 的產品 package com.spark.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; /** * 開窗函數row_number * @author Administrator */ public class RowNumberWindowFunction { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("NewNumberWindowFunction"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); //建立銷售表sales表 hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales (product string,category string ,revenue bigint) row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/home/hadoop/sales.txt' into table sales"); //開始編寫咱們的統計邏輯,使用row_number()開窗函數 //先說明一下,row_number()開窗函數的做用 //其實就是給每個分組的數據,按照其排序順序,打上一個分組內的行號 //好比說,有一個分組date=20151001,裏面有三條數據,1122 1121 1124 //那麼對這個分組的每一行使用row_number()開窗函數之後,三行依次會得到一個組內的行號 //行號從1開始遞增,好比 1122 1, 1121 2, 1124 3 DataFrame top3SalesDF = hiveContext.sql("select product,category,revenue from " + "(select product,category,revenue ,row_number() over (partition by category order by revenue desc ) rank from sales) tmp_sales where rank <=3"); //row_number()開窗函數的語法說明 //首先,能夠在select查詢時,使用row_number()函數 //其次,row_number()函數後面先跟上over關鍵字 //而後括號中是partition by 也就是根據那個字段進行分組 //其次是可使用order by進行組內排序 //而後row_number()就能夠給每個組內的行,一個組一個行號 //將每組排名前三的數據,保存到一個表中 hiveContext.sql("drop table if exists top3_sales"); top3SalesDF.saveAsTable("top3_sales"); sc.close(); } } 函數名(列) OVER(選項) 8、UDF自定義函數 UDF:User Defined Function.用戶自定義函數 package com.spark.spark_sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType object UDF { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("UDF") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //構造模擬數據 val names =Array("Leo","Marry","Jack","Tom") val namesRDD = sc.parallelize(names, 1) val namesRowRDD = namesRDD.map(name => Row(name)) val structType =StructType(Array(StructField("name",StringType))) val namesRowDF =sqlContext.createDataFrame(namesRowRDD, structType) //註冊一張name表 namesRowDF.registerTempTable("names") //定義和註冊自定義函數 //定義函數 //註冊函數 sqlContext.udf.register("strLen",(str:String) => str.length) //使用本身的函數 sqlContext.sql("select name,strLen(name) from names").collect.foreach(println) } } 9、UDAF自定義聚合函數 UDAF:User Defined Aggregate Function。用戶自定義聚合函數,是spark1.5.x引入的最新特性。 UDF針對的是單行輸入,返回一個輸出, UDAF,則能夠針對多行輸入,進行聚合計算,返回一個輸出,功能更加的強大 Scala代碼: import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ class StringCount extends UserDefinedAggregateFunction { //inputschema 指的是,輸入數據的數據 override def inputSchema: StructType = { StructType(Array(StructField("str",StringType,true))) } //bufferSchema,指的是,中間進行聚合時,所處理的數據類型 override def bufferSchema: StructType = { StructType(Array(StructField("count",IntegerType,true))) } //dataType,指的是,函數的返回值的類型 override def dataType: DataType = { IntegerType } override def deterministic: Boolean = { true } //爲每個分組的數據執行初始化操做 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)=0 } //指的是,每一個分組,有新的值進來的時候,如何進行分組,對應的聚合值的計算 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getAs[Int](0)+1 } //因爲spark是分佈式的,因此一個分組的數據,可能會在不一樣的節點上進行局部的聚合,就是update //可是,最後一個分組,在各個節點上的聚合值,要執行merge,也就是合併 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0) } //最後,指的是,一個分組的聚合,如何經過中間的緩存聚合值,隨後返回一個最終的聚合值 override def evaluate(buffer: Row): Any = { buffer.getAs[Int](0) } } 10、工做原理以及性能調優 1.工做原理 Sqlparse、Analyser、Optimizer、SparkPlan 2.性能調優 1.設置shuffle過程當中的並行度spark.sql.shuffle.partitions(sqlContext.setConf()) 2.在hive數據倉庫建設過程當中,合理設置數據類型,好比能設置爲int的就不要設置爲bigint.減小數據類型致使的沒必要要的內存開銷 3.編寫sql時,儘可能給出明確的列名,好比select name from students。不要寫select * 的方式 4.並行處理查詢結果:對於spark sql查詢的結果,若是數據量比較大的話,好比超多1000條,那麼就不要一次collect到driver在處理。使用foreach()算子,並行處理查詢結果 5.緩存表:對於一條sql語句中可能屢次使用到表,能夠對其進行緩存沒使用sqlContext.cache Table(tableName),或者DataFrame.cache()便可spark sql會用內存列存儲的格式進行表的緩存。而後將spark sql就能夠僅僅掃描須要使用的列,而且自動優化壓縮,來最小化內存使用和GC開銷。sqlcontext.uncacheTable(tableName)能夠將表從緩存中移除。用sqlcontext.setConf(),設置spark.sql.inMemoryColumnarStorage.batchSize參數(默認10000),能夠配置列存儲單位 6.廣播join表:spark.sql.autoBroadcastJoinThreshold.默認是10485760(10MB),也就是在10M之內的表將其廣播出去,在內存足夠用的狀況下,增長其大小,讓更多的表廣播出去,就能夠將join中的較小的表廣播出去,而不是進行網絡數據傳輸了。 7.鎢絲計劃:spark.sql.tungsten.enable,默認是true 自動管理內存 最有效的就是第四點,緩存表和廣播join表也是很是不錯的 11、Hive on spark 背景: Hive是目前大數據領域,事實上的sql標準。其底層默認是基於MapReduce實現的。可是因爲mapreduce速度是在比較慢。所以這兩年,陸續出來了新的sql查詢引擎。包括spark sql,hive on Taz ,hive on spark Spark sql 與hive on spark 是不同的,spark sql 本身研發出來的針對各類數據源,包括hive、json、parquet、JDBC、RDD等均可以執行查詢的,一套基於spark計算引擎的查詢引擎。所以它是spark的一個項目,只不過提供了針對hive執行查詢的功能而已。適合在一些使用spark技術棧的大數據應用類系統中使用。 而hive on spark是hive 的一個項目,他是指,不經過mapreudce做爲惟一的引擎,而是將spark做爲底層的查詢引擎,hive on spark ,只適用於hive,在可預見的將來,頗有可能hive默認的底層引擎從mapreduce切換爲spark,適合於將原有的hive數據倉庫以及數據分析替換爲spark引擎,做爲公司通用大數據統計計算分析引擎 Hive基本的工做原理: hiveQL語句==> 語法分==>AST==> 生成邏輯執行計劃==>operator Tree==> 優化邏輯執行計劃==>optimized operator Tree==> 生成物理執行計劃==>Task Tree==> 優化物理執行計劃==>Optimized Task Tree==> 執行優化後的Optimized Task Tree Hive on spark 的計算原理有以下的幾個要點: 1.將hive表做爲spark RDD來進行操做 2.使用hive 原語 對於一些針對RDD的操做,好比,groupByKey(),sortByKey()等等。不使用spark的transformation操做和原語。若是那樣作的話,那麼就須要從新實現一套hive的原語,並且hive增長了新功能,那麼又要實現新的spark 原語。所以,選擇將hive的原語包裝爲針對RDD的操做便可 3.新的物理執行計劃生成機制 使用sparkCompiler將邏輯執行計劃,集=即Operator Tree,轉換爲Task tree。提交spark task給spark進行執行。Sparktask包裝了DAG,DAG包裝爲sparkwork,Sparktask根據sparkwork表示的DAG計算 4.sparkContext生命週期 Hive on spark 會爲每個用戶的會話,好比執行一次sql語句,建立一個sparkcontext,可是spark不容許在一個JVM內建立多個sparkcontext,所以,須要在單獨額JVM中啓動每個會話的sparkcontext,而後經過RPC與遠程JVM中的sparkContext進行通訊。 5.本地和遠程運行模式 Hive on spark 提供兩種運行模式,本地和遠程。若是將spark master設置爲local 12、Spark sql與spark core整合 案例:每日top3熱點搜索詞統計案例 日誌格式: 日期、用戶、搜索詞、城市、平臺、版本 需求: 1.篩選出符合條件(城市、平臺、版本))的數據 2.統計出天天搜索的UV排名前三的搜索詞 3.按照天天的top3搜索詞的UV搜總次數,倒序排列 4.將數據保存到hive表中 實現思路分析 1.針對原始的數據(HDFS文件),獲取輸入RDD 2.使用filter算子,去針對輸入RDD中的數據,進行數據過濾,過濾出符合條件的數據 2.1普通的作法:直接在filter算子函數中,使用外部的查詢條件(map),可是這樣的話,是否是查詢條件map,會發送到每個task上一份副本(性能並很差) 2.2優化後的作法:將查詢條件,封裝爲Broadcast廣播變量,在filter算子中使用Broadcast廣播變量 3.將數據轉換爲「日期_搜索詞,用戶」的格式,而後對他進行分組,再次進行映射。對天天每一個搜索詞的搜索用戶進行去重操做,並統計去重後的數量,即爲天天每一個搜索詞的UV,最後得到(日期_搜索詞,UV) 4.將獲得的天天的搜索詞的UV,RDD映射爲元素類型的Row的RDD,將RDD轉換爲dataframe 5.將dataframe註冊爲臨時表,使用spark sql的開窗函數,來統計天天的UV排名前三的搜索詞,以及他的搜索UV,最後獲知,是一個dateframe 6.將dateframe轉換爲RDD。繼續操做,按照天天日期進行分組。並進行映射。計算出天天的top3所搜詞的搜索uv的總數,而後將UV總數做爲key,將天天的top3搜索詞以及搜索次數,拼接爲一個字符串 7.按照天天的top3搜索總UV,進行排序,倒序排序 8.將排好序的數據,再次映射回來,變成「日期_搜索詞_UV」的格式 9.再次映射爲dataframe,並將數據保存到hive表中 package com.spark.spark_sql; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; /** * 每日top3熱點搜索詞統計 * @author Administrator * */ public class DailyTop3Keyword { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("DailyTop3Keyword"); JavaSparkContext jsc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(jsc.sc()); //針對hdfs上的文件中的日誌,獲取輸入的RDD JavaRDD<String> rawRDD = jsc.textFile("hdfs://hadoop01:8020/keyword.txt"); //僞造一份數據,查詢條件 //備註:實際上,在實際的企業的項目開發中,極可能,這個查詢條件是J2EE平臺發送到MySQL表中的 //而後,這裏實際上一般是會用Spring框架和ORM框架的,去提取MySQL表中的查詢條件 Map<String,List<String>> queryParamMap =new HashMap<String, List<String>>(); queryParamMap.put("city", Arrays.asList("beijing")); queryParamMap.put("platform", Arrays.asList("android")); queryParamMap.put("version", Arrays.asList("1.0","1.2","1.5","2.0")); //將map封裝爲廣播變量,每一個work節點只拷貝一份數據便可,這樣能夠進行優化 final Broadcast<Map<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParamMap); //使用廣播變量進行篩選 JavaRDD<String> filterRDD = rawRDD.filter(new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String log) throws Exception { String[] logSplited = log.split("\t"); String city =logSplited[3]; String platform =logSplited[4]; String version = logSplited[5]; //與查詢條件進行比較,任何一個 Map<String, List<String>> queryParamMap = queryParamMapBroadcast.value(); List<String> cities = queryParamMap.get("city"); if(cities.size()>0 && !cities.contains(city)){ return false; } List<String> platforms = queryParamMap.get("city"); if(platforms.size()>0 && !platforms.contains(platform)){ return false; } List<String> versions = queryParamMap.get("city"); if(versions.size()>0 && !versions.contains(version)){ return false; } return true; } }); //將過濾出來的原始的日誌進行映射爲(日期_搜索詞,用戶)的格式 JavaPairRDD<String, String> dateKeywordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String t) throws Exception { String[] logSplited = t.split("\t"); String date =logSplited[0]; String user =logSplited[1]; String keyword =logSplited[2]; return new Tuple2<String, String>(date+"_"+keyword, user); } }); //進行分組,獲取天天的搜索詞,有哪些用戶進行搜索了(沒有去重) JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey(); //對天天每一個搜索詞的搜索用戶,執行去重操做,得到其UV值 JavaPairRDD<String, Long> dateKeywordUVRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> t) throws Exception { String dateKeyword = t._1; Iterator<String> users = t._2.iterator(); //對用戶進行去重,而且統計數量 List<String> distinctUsers = new ArrayList<String>(); while(users.hasNext()){ String user = users.next(); if(!distinctUsers.contains(user)){ distinctUsers.add(user); } } //獲取uv long uv = distinctUsers.size(); return new Tuple2<String, Long>(dateKeyword, uv); } }); //將天天每一個搜索詞的UV數據轉換成dataframe JavaRDD<Row> dateKeywordUVRowRDD = dateKeywordUVRDD.map(new Function<Tuple2<String,Long>, Row>() { @Override public Row call(Tuple2<String, Long> v1) throws Exception { String date = v1._1.split("_")[0]; String keyword = v1._1.split("_")[1]; long uv = v1._2; return RowFactory.create(date,keyword,uv); } }); List<StructField> structFields = Arrays.asList( DataTypes.createStructField("date", DataTypes.StringType, true), DataTypes.createStructField("keyword", DataTypes.StringType,true), DataTypes.createStructField("uv", DataTypes.LongType,true) ); StructType structType =DataTypes.createStructType(structFields); DataFrame dateKeywordUVDF = hiveContext.createDataFrame(dateKeywordUVRowRDD, structType); //使用spark sql的開窗函數,統計天天的搜索排名前三的熱點搜索詞 dateKeywordUVDF.registerTempTable("daily_keyword_uv"); DataFrame dailyTop3KeywordDF = hiveContext.sql("select date,keyword,uv from " + "(select date,keyword,uv,row_number over (partition by date order by uv desc ) rank from daily_keyword_uv )tmp " + "where rank <=3"); //將dateframe 轉換爲RDD,而後映射,計算出天天的top3搜索詞的搜索uv的總數 JavaPairRDD<String, String> Top3DateKeywordUvRDD = dailyTop3KeywordDF.javaRDD().mapToPair(new PairFunction<Row, String, String>() { @Override public Tuple2<String, String> call(Row row) throws Exception { String date = String.valueOf(row.get(0)); String keyword =String.valueOf(row.get(1)); Long uv = Long.valueOf(String.valueOf(row.get(2))); return new Tuple2<String, String>(date, keyword+"_"+uv); } }); JavaPairRDD<String, Iterable<String>> Top3DateKeywordRDD = Top3DateKeywordUvRDD.groupByKey(); JavaPairRDD<Long, String> uvDateKeywordsRDD = Top3DateKeywordRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, Long, String>() { @Override public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> t) throws Exception { String date = t._1; Iterator<String> keywordUvIterator = t._2.iterator(); Long totalUv = 0L; String dateKeywordUv = date; while(keywordUvIterator.hasNext()){ String keywordUv= keywordUvIterator.next(); Long uv = Long.valueOf(keywordUv.split("_")[1]); totalUv+=uv; dateKeywordUv+=","+keywordUv; } return new Tuple2<Long, String>(totalUv, dateKeywordUv); } }); //按照天天的總搜索進行倒序排序 JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false); //在此進行映射,將排序後的數據,映射回原始的格式Iterable<ROW> JavaRDD<Row> sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long,String>, Row>() { @Override public Iterable<Row> call(Tuple2<Long, String> t) throws Exception { String dateKeywords=t._2; String[] dateKeywordsSplited = dateKeywords.split(","); String date = dateKeywordsSplited[0]; List<Row> rows= new ArrayList<Row>(); rows.add(RowFactory.create(date, dateKeywordsSplited[1].split("_")[0], Long.valueOf(dateKeywordsSplited[1].split("_")[1]) )); rows.add(RowFactory.create(date, dateKeywordsSplited[1].split("_")[0], Long.valueOf(dateKeywordsSplited[2].split("_")[1]) )); rows.add(RowFactory.create(date, dateKeywordsSplited[1].split("_")[0], Long.valueOf(dateKeywordsSplited[3].split("_")[1]) )); return rows; } }); //將最終的數據轉換爲dateframe DataFrame finalDF = hiveContext.createDataFrame(sortedRowRDD, structType); finalDF.saveAsTable("daily_top3_keyword_uv"); jsc.close(); } }