sparksql

Spark SQL
1、sparkSQL的特色
1.支持多種數據源:hive RDD Partquet JSON JDBC
2.多種性能優化技術:in-memory columnar storage \ byte-code generation \ cost model 動態評估
3.組件擴展性:對於SQL的語法解析器、分析器、以及優化器,用戶均可以本身從新開發,而且動態擴展

Spark sql 的性能優化技術簡介

1.內存列存儲(in-memory columnar storage)
內存列存儲意味着spark sql的數據,不是使用java對象的方式進行存儲,而是使用面向列存儲的方式進行存儲,也就是說,每一列,做爲一個數據存儲的單位,從而大大優化內存使用的效率。使用列存儲以後,減小對內存的消耗,也就避免了對GC(垃圾回收)大量數據的性能開銷

2.字節碼生成技術(byte-code generation )
Spark sql在其catalyst模塊的Expressions中增長一codegen模塊,對於sql語句中的計算表達式,好比select num + num from t 這種sql,就可使用動態字節碼生成技術來優化其性能。

3.Scala代碼編寫的優化
對於Scala代碼編寫中,可能會形成大量的性能的開銷,本身重寫,使用更加複雜的方式,來獲取更好的性能。好比option樣例類、for循環、map/filter/foreach等高階函數,以及不可變對象,都改爲用null,while循環來實現,而且重用可變的對象


2、dataframe的使用
1.spark sql 和 dataframe引言
Spark sql 是spark中的一個模塊,主要是進行結構化數據的處理。他提供的最核心的編程抽象,就是dataframe。同時spark sql 還能夠做爲分佈式的sql查詢引擎。Spark sql最重要的功能之一就是從hive中查詢數據

Dataframe,能夠理解爲時,以列的形式組織的,分佈式的數據集合,他其實和關係型數據庫中的表很是相似,可是底層作了很對的優化。Dataframe能夠經過不少來源,包括:結構化數據文件,hive表,外部關係型數據庫以及RDD


2.SQLContext
要使用spark sql ,首先就得建立一個SQLContext對象,或者是他的子類的對象(HiveContext),好比HiveContext對象;

Java版本:
JavaSparkContext sc = ....;
SQLContext SQLContext = new SQLContext(sc);

Scala版本:
Val sc = SparkContext..
Val SQLContext = new SQLContext(sc)
Import SQLContext.implicits._

3.HiveContext 
除了基本的SQLContext之外,還可使用它的子類---HiveContext。HiveContext的功能除了包含SQLContext提供的全部的功能外,還包括額外的專門針對hive的一些功能。這些額外的功能包括:使用hive語法編寫和執行sql,使用hive的UDF函數,從hive表中讀取數據

要使用HiveContext,就必須預先安裝好hive,SQLContext支持的數據源,HiveContext也一樣支持,而不僅是支持hive,對spark1.3.x以上的版本,都推薦使用HiveContext,由於其功能更加豐富和完善

Spark sql 還支持使用spark.sql.dialect參數設置sql方言,使用SQLContext的serConf()便可進行設置。對與SQLContext,他只支持」sql」一種方言。對於HiveContext,它默認的方言是「hiveql」




4.建立Dataframe
使用SQLContext,能夠從RDD、hive表中或者其餘額數據源,來建立dataframe。

Java版本:
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * 使用json文件建立dataframe
 * @author Administrator
 */
public class DataFrameCreate {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DataFrameCreate");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext SQLContext = new SQLContext(sc);
        DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/spark_input/students.json");
        df.show();

    }
}


提交至集羣:運行打包,上傳。編寫腳本進行提交
【這裏是一臺機器測試】
bin/spark-submit \
--class com.spark.spark_sql.DataFrameCreate \
--files /opt/modules/apache-hive-0.13.1-bin/conf/hive-site.xml \
--driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar \
/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/sql/sparksql_01.jar

 運行的結果
+---+---+--------+
|age| id|    name|
+---+---+--------+
| 10|  1|     leo|
| 25|  2|    kity|
| 30|  4|    lucy|
| 20|  3|     tom|
| 18|  7|    jack|
| 23| 10|  edison|
| 36|  5|    owen|
| 20|  8|    jiny|
| 40|  6|    lisi|
| 45|  9|zhangsan|
+---+---+--------+












Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object DataFrameCreate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DataFrameCreate")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json")
    df.show
  }
}


Dataframe經常使用的操做
java
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * Dataframe的經常使用的操做
 * @author Administrator
 *
 */
public class DataFrameOperation {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DataFrameOperation");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext SQLContext = new SQLContext(sc);
        //建立出來的dataframe徹底能夠能夠理解爲一張表
        DataFrame df = SQLContext.read().json("hdfs://hadoop01:8020/input/students.txt");
        //打印dataframe中全部的數據
        df.show();
        //打印dataframe中元數據的信息
        df.printSchema();
        //查詢某列全部的數據
        df.select("name").show();
        //查詢某幾列全部的數據,並對列進行計算
        df.select(df.col("name"), df.col("age").plus(1)).show();//將查詢出來的age進行加一
        //根據某一列的值進行過濾
        df.filter(df.col("age").gt(30)).show();                //年齡大於30的進行過濾
        //根據某一列進行分組聚合
        df.groupBy(df.col("age")).count().show();
    }
}

Scala
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object DataframeOperation {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DataframeOperation")
    val sc = new SparkContext(conf)
    val SQLContext = new SQLContext(sc)
    val df = SQLContext.read.json("hdfs://hadoop01:8020/spark_input/student.json" 
    df.show
    df.printSchema()
    df.select("name").show
    df.select(df("name"),df("age")+1).show
    df.filter(df("age") > 30).show
    df.groupBy("age").count().show
 }
}


5.RDD與Dataframe之間轉換
爲何要將RDD轉換爲Dataframe?由於這樣的話,咱們就能夠直接針對hdfs上的任何能夠構建爲RDD的數據,使用spark sql 進行sql查詢,這個功能無比強大。想象一下,針對hdfs中的數據,直接就可使用sql進行查詢

Spark sql 支持兩種方式來將RDD轉換爲Dataframe

第一種方式,是使用反射來推斷包含了特定數據類型的RDD的元數據,這種基於反射的方式,代碼比較簡單,當你已經知道你的RDD的元數據時,是一種很是不錯的方式。

第二種方式,是經過編程接口來建立dataframe,你能夠在程序運行的時候動態構建一份元數據,而後將其應用到已經存在的RDD上,這種方式的代碼比較冗長,可是若是在編寫程序時,還不知道RDD的元數據,只有在程序運行時,才能動態得知元數據,那麼只能經過這種動態構建元數據的方式。


1.使用反射的方式推斷元數據
Java版本:spark sql是支持將包含javaBean的RDD轉換爲dataframe的,Javabean的信息,就定義了元數據。Spark sql 如今是不支持包含嵌套javabean或者list等複雜元數據的Javabean。

Scala版本:而scala因爲具備隱式轉換的特性,因此spark sql的scala接口,是支持自動將包含case class 的RDD轉換爲Dataframe的。Case class 就定義了元數據。Spark sql 會經過反射讀取傳遞給 case class 的參數的名稱,而後將其做爲列名。與java不一樣的是,Spark sql是支持將包含了嵌套的數據結構的case class做爲元數據的,好比包含了Array等。
Java版本:
package com.spark.spark_sql;
import java.io.Serializable;
public class Student implements Serializable {
    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    private int age;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public Student() {
    }
    public Student(int id, String name, int age) {
        super();
        this.id = id;
        this.name = name;
        this.age = age;
    }
    @Override
    public String toString() {
        return "Student [id=" + id + ", name=" + name + ", age=" + age + "]";
    }

}


package com.spark.spark_sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

/**
 * 使用反射的方式將RDD轉換爲dataframe
 * @author Administrator
 *
 */
public class RDD2DataframeReflection {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext SQLContext = new SQLContext(sc);
        JavaRDD<String> lines = sc.textFile("E://student.txt");
        JavaRDD<Student> students = lines.map(new Function<String, Student>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Student call(String line) throws Exception {
                String[] split = line.split(",");
                Student stu = new Student();
                stu.setId(Integer.parseInt(split[0]));
                stu.setName(split[1]);
                stu.setAge(Integer.parseInt(split[2]));
                return stu;
            }
        });
        //使用反射的方式將RDD轉換爲dataframe
        
        //將student.class傳入進去其實就是經過反射的方式來建立dataframe
        //由於student.class 自己就是反射的一個應用
        //而後底層還得經過student class 進行反射。來獲取其中的fields
        //這裏要求Javabean要實現Serializable接口,能夠序列化
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class);
        
        //拿到一個dataframe以後,就能夠將其註冊爲一張臨時表,而後針對其中的數據進行sql語句
        studentDF.registerTempTable("student");
        
        //針對student臨時表執行sql語句,查詢年齡大於20歲的學生
        DataFrame df =sqlContext.sql("select * from student where age > 20");
        
        //將查詢出來的dataframe再次轉換爲RDD
        JavaRDD<Row> teenagerRDD = df.javaRDD();
        
        //將RDD中的數據,進行映射,給每一個人的年齡,而後映射爲student
        JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Student call(Row row) throws Exception {
                //row中的順序是按照字典順序進行排列的
                Student student = new Student();
                student.setAge(row.getInt(0));
                student.setId(row.getInt(1));
                student.setName(row.getString(2));
                return student;
            }
        });
        
        //將數據collect 回來,打印出來
        List<Student> studentList = teenagerStudentRDD.collect();
        for (Student student : studentList) {
            System.out.println(student);
        }
        
    }

}

Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
/**
 * 若是要使用scala開發spark程序
 * 而後在其中還要實現基於反射的RDD到dataframe的轉換,就必須得用object  extends App的方式
 * 不能使用def main()方法的方式來運行程序,不然就會報錯no typetag for ... class
 */
object RDD2DaframeReflection extends App{
  
    val conf = new SparkConf().setMaster("local").setAppName("RDD2DaframeReflection")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val lines = sc.textFile("e://student.txt", 1)
    
    //在scala中使用反射的方式,進行RDD到Dataframe的轉換,須要手動的導入一個隱士轉換
    import SQLContext.implicits._
    case class Student(id : Int ,name : String ,age : Int)
    
    //這裏其實就是一個普通的,元素是case class 的rdd
    val students = lines.map(line =>line.split(",")).map(arr => Student(arr(0).trim().toInt,arr(1),arr(2).trim().toInt))
    
    //直接使用RDD的toDF,便可將其轉換爲dataframe
    val studentDF=students.toDF()
    
    //註冊爲一個臨時表
    studentDF.registerTempTable("student")
    
    //
    val teenagerDF = sqlContext.sql("select * from student where age > 20")
    
    val teenagerRDD = teenagerDF.rdd
    
    //在scala中,row中的數據的順序,反而是按照咱們指望的來排列的,這個是跟java是不同的
    teenagerRDD.map{row => Student(row(0).toString().toInt,row(1).toString(),row(2).toString().toInt )
    
    }.collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
    
    
    //在scala中,對row的使用比java中的row更加的豐富
    //在scala中,能夠用row的getAs()方法,獲取指定列名的列
    teenagerRDD.map(row =>Student(row.getAs[Int]("id"),row.getAs("name"),row.getAs("age"))).collect()
      .foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
    
      //還能夠經過row的getValuesMap,方法,獲取指定幾列的值,返回的是一個map
    teenagerRDD.map(row => {
      val map = row.getValuesMap(Array("id","name","age"))
      Student(map("id").toString().toInt,map("name").toString(),map("age").toString().toInt)
    }).collect().foreach(stu => print(stu.id+ ":" +stu.name+":"+stu.age))
}




2.經過編程接口來建立dataframe
Java版本:
package com.spark.spark_sql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * 以編程的方式動態的執行元數據,將RDD轉化爲dataframe
 * @author Administrator
 *
 */
public class RDD2DataProgrammatically {
public static void main(String[] args) {
    
    //建立sparkconf
    SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataProgrammatically");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    
    //第一步,建立一個普通的RDD,可是必須將其轉換爲RDD<Row>的格式
        JavaRDD<String> lines = sc.textFile("e://student.txt");
        
        JavaRDD<Row> rows = lines.map(new Function<String, Row>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Row call(String line) throws Exception {
                String[] split = line.split(",");
                
                return RowFactory.create(Integer.parseInt(split[0]),split[1],Integer.parseInt(split[2]));
            }
        });
        //第二步,動態元數構造據
        //好比說,id name age 等fields的名稱和類型都是在程序運行過程當中,
        //動態的從MySQL等DB中或者是配置文件中,加載出來的,是不固定的
        //因此特別適合這種編程的方式來構造元數據
        
        List<StructField> fields = new ArrayList<StructField>();
        
        fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        
        StructType structType = DataTypes.createStructType(fields);
        
        //第三部,使用動態構造元數據,將RDD轉換爲dataframe
        DataFrame studentDF = sqlContext.createDataFrame(rows, structType);
        
        //後面就能夠直接使用這個df了
        //註冊臨時表
        studentDF.registerTempTable("student");
        DataFrame stus = sqlContext.sql("select * from student where age > 20");
        
        List<Row> list = stus.javaRDD().collect();
        
        for (Row row : list) {
            System.out.println(row);
        }    
}
}
Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType

object RDD2DataframeProgramatically extends App{
 val conf = new SparkConf().setMaster("local").setAppName("RDD2DataframeProgramatically");
  
  val sc = new SparkContext(conf)
  
  val sqlContext = new SQLContext(sc)
  
  //第一步:構造出元素爲Row的普通的RDD
  val studentRDD = sc.textFile("e://student.txt", 1)
      .map(line => Row(
            line.split(",")(0).toInt,
            line.split(",")(1),
            line.split(",")(2).toInt))
  
   //第二步:以編程的方式構造元數據
   val structType =StructType(Array(
       StructField("id",IntegerType,true),
       StructField("name",StringType,true),
       StructField("age",IntegerType,true)))
  
  //第三步:進行RDD到dataframe的轉換
       
   val studentDF = SQLContext.createDataFrame(studentRDD,structType )  
   
   studentDF.registerTempTable("student");
  
  val teenagerDF = sqlContext.sql("select * from student where age > 20")
  
   teenagerDF.rdd.collect.foreach(row => println(row))
  

  
}


6.通用的load和save操做
1.dataframe的load和save

對於的spark sql的dataframe來講,不管是從什麼數據源建立出來的dataframe,都有一些共同的load和save操做,load操做主要是用於加載數據,建立出來的dataframe:save操做,主要用於將dataframe中的數據集保存到文件中(保存到的是一個目錄中)

Java版本:
Dataframe df = sqlContext.read().load(「users.parquet」);
df.select(「name」,」facourite_color」).write().save(「nameAndFav_color_dir」);


Scala版本:
Val df = sqlContext.read.load(「users.parquet」)
Df.select(「name」,」facourite_color」).write().save(「nameAndFav_color_dir」)


Java版本:
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
 * 通用的load和save操做
 * @author Administrator
 */
public class GenericLoadSave {
    public static void main(String[] args) {
        
    SparkConf conf = new SparkConf().setMaster("local").setAppName("GenericLoadSave");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame usersDF = SQLContext.read().load("C://Users//Administrator//Desktop//users.parquet");
    
    usersDF.printSchema();
    usersDF.show();
    usersDF.select("name","favourite_color").write().save("e://users2");
    }
}

Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import scala.tools.scalap.Main
import org.apache.spark.sql.DataFrame

object GenericLoadSave {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("GenericLoadSave")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val  usersDF = sqlContext.read.load("hdfs://hadoop01:8020/spark_input/users.parquet")
    usersDF.select("name","favourite_color").write.save("hdfs://hadoop01:8020/spark_output")

  }
}

2.手動指定數據源:

也能夠手動指定用來操做的數據源,數據源一般是使用其權限定名來指定,好比parquet是org.apache.spark.sql.parquet。可是spark sql內置了一些數據源類型,好比json,parquet.jdbc等等,實際上,經過這個功能,就能夠在不一樣類型的數據源之間進行轉換了。好比將json文件中的數據存儲到parquet文件中。默認狀況下,若是不指定數據源,默認就是parquet
Java版本:
DataFrame df =SQLContext.read().format(「json」).load(「people,json」)
Df.select(「name」,」age」).write().format(「parquet」).save(「out」)

Scala版本:
Val df = SQLContext.read.format(「json」).load(「people,json」)
Df.select(「name」,」age」).write.format(「parquet」).save(「out」) 


Java版本
     
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * 手動指定數據源
 * @author Administrator
 */
public class ManuallySpecifyOptions {
    public static void main(String[] args) {
        
        SparkConf conf = new SparkConf().setMaster("local").setAppName("ManuallySpecifyOptions");
        
        SparkContext sc = new SparkContext(conf);
        
        SQLContext SQLContext = new SQLContext(sc);
        
        DataFrame df = SQLContext.read().format("json").load("e://users.parquet");
        
        df.write().format("json").save("e://out");
        
    }
}


3.Save mode 

Spark sql 對於save操做,提供了不一樣的save mode.主要用來處理,當目標位置已經有數據時,應該如何處理。並且save操做並不會執行鎖操做,而且不是原子的,所以是有必定風險出現髒數據的

Save mode    意義
SaveMode.ErrorIfExists(默認)    若是目標位置存在數據,那麼就拋出異常
SaveMode.Append    若是目標位置存在數據,那麼就將數據追加進去
SaveMode.Overwrite    若是目標位置存在數據,那麼就將已經存在的數據刪除,用新的數據進行覆蓋
SaveMode.Ignore    若是目標位置存在數據,那麼就忽略,不作任何的操做
Java版本:
package com.spark.spark_sql;
import org.apache.derby.impl.tools.sysinfo.Main;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
/**
 * savemode 示例
 * @author Administrator
 */
public class SaveModeDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SaveModeDemo");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.read().format("json").load("e://users.parquet");
        df.save("e://out", SaveMode.Append);

    }
}

3、Parquet數據源
1.使用編程方式加載數據
Parquet是面向分析型業務的列式存儲格式,由Twitter和cloudera合做開發。2015年成爲Apache的頂級項目

列式存儲和行式存儲相比較有哪些優勢;
1.能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量
2.壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding 和 Delta Encoding)進一步節約磁盤空間。
3.只讀取須要的列,支持向量運算,可以獲取更好的掃描性能
Java版本:

package com.spark.spark_sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

/**
 * parquet數據源之使用編程方式加載數據
 */
public class ParquetLoadData {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetLoadData");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        //讀取parquet文件中的數據,建立一個dataframe
        DataFrame userDF = sqlContext.read().parquet("e://users.parquet");
        
        //將其註冊爲臨時表,使用sql查詢所須要的數據
        userDF.registerTempTable("users");
        
        DataFrame userNamesDF = sqlContext.sql("select name from users");
        
        //對查詢出來的dataframe進行transformation操做,處理數據,而後打印出來
        List<String> userNames = userNamesDF.javaRDD().map(new Function<Row, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Row row) throws Exception {
                
                return "Name: "+row.getString(0);
            }
        }).collect();
        
        for (String name : userNames) {
            System.out.println(name);
        }
    }
}



Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object PatquetLoadData {
  
  val conf = new SparkConf().setAppName("PatquetLoadData")
  val sc = new SparkContext(conf)
  val sqlContext =new SQLContext(sc)
  val usersDF = sqlContext.read.parquet("hdfs://hadoop01:8020/spark_input/users.parquet");
  usersDF.registerTempTable("user");
  val userNameDF = sqlContext.sql("select * from user")
  userNameDF.rdd.map(row => "Name: "+row(0)).collect.foreach(userName =>println(userName))
}
2.自動分區推斷
表分區是一張常見的優化的方式,好比hive中就提供分區表的特性。在一個分區表中,不一樣分區的數據一般存儲在不一樣的目錄中,分區列的值一般就包含在了分區的目錄名中。Spark sql中的parquet數據源,支持自動根據目錄名推斷出分區信息。例如,若是將入口數據存儲在分區表中,而且使用性別和國家做爲分區列。那麼目錄結構多是以下所示:

|----tableName
|----gender=male
  |----country=US
          |.....
  |----country=ZH
|----gender=female
  |--country=...
           |...
若是將tableName傳入SQLContext.read.parquet()或者SQLContext.read.load()方法,那麼sparksql就會自動根據目錄的結構,推斷出分區的信息,是gender和country。即便數據文件中包含兩列的值name和age,可是sparksql返回的dataframe,調用printSchema()方法時,會打印出四個列的值:name age country gender .這就是自動分區推斷的功能

此外,分區的列的數據類型,也是自動被推斷出來的。目前,spark sql僅支持自動推斷出數字類型和字符串類型。有時,用戶也許不但願spark sql自動推斷分區列的數據類型。此時只要設置一個配置便可,spark.sql.sources.partitionColumnTypeInference.enabled,默認是true,即自動推斷分區列的類型,設置爲false,即不會自動推斷類型。進行自定推斷分區列的類型時,全部的分區列的類型,就統一默認的是String

案列:
1.hdfs上建立相對應的目錄結構:
bin/hdfs dfs -mkdir -p /spark_input/gender=male/country=US
2.將文件上傳到目錄下
bin/hdfs dfs -put users.parquet /spark_input/gender=male/country=US/

3.查詢出schema
package com.spark.spark_sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
 *parquet數據源之 自動推斷分區
 */
public class ParquetPartitionDiscovery {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ParquetPartitionDiscovery").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame userDF = sqlContext.read().parquet("hdfs://hadoop01:8020/spark_input/gender=male/country=US/users.parquet");
        userDF.printSchema();
         userDF.show();
    }
}


4.結果:
root
 |-- name: binary (nullable = true)
 |-- favourite_color: binary (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)

+----------------+-------------------+------+-------+
|            name|    favourite_color|gender|country|
+----------------+-------------------+------+-------+
|      [6C 65 6F]|         [72 65 64]|  male|     US|
|   [6A 61 63 6B]|[79 65 6C 6C 6F 77]|  male|     US|
|[6B 69 74 74 79]|   [77 68 69 74 65]|  male|     US|
|      [74 6F 6D]|   [67 72 65 65 6E]|  male|     US|
|      [61 6C 6C]|      [70 69 6E 6B]|  male|     US|
+----------------+-------------------+------+-------+
【注意】前面是由於parquet數據的問題,本身從hive中導出來的。能夠利用json格式的數據load,而後save的時候以parquet的方式生成一個parquet文件,用於測試

3.合併元數據
如同ProtocolBuffer,Avro,Thrift同樣,Parquet也是支持元數據合併的。用戶能夠一開始就定義一個簡單的元數據,而後隨着業務須要。逐漸往元數據中添加更多的列,在這種狀況下,用戶可能會建立多個parquet文件,有着多個不一樣的卻互相兼容的元數據。Parquet數據源支持自動推斷這種狀況,而且進行多個parquet文件的元數據的合併

由於元數據合併是一種相對耗時的操做,並且在大多數的狀況下不是一種必要的特性,從spark1.5.0版本開始,默認是關閉parquet文件的自動合併元數據的。能夠經過如下的兩種方式開啓parquet數據源的自動合併元數據的特性

1.讀取parquet文件時,將數據源的選項,mergeSchema,設置爲true

2.根據sqlContext.setConf()方法,將spark.paquet.mergeSchema參數設置爲true

案例:
合併學生的基本信息和成績信息的元數據
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode

object ParquetMergeSchema {
  def main(args: Array[String]): Unit = {
    
 
  val conf = new SparkConf().setAppName("ParquetMergeSchema")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  
  import SQLContext.implicits._
  //建立一個dataframe,做爲學生的基本信息,並寫入一個parquet文件中
  val studentsWithNameAge =Array(("leo",23),("jack",25))
  
  val studentsWithNameAgeDF = sc.parallelize(studentsWithNameAge, 2).toDF("name","age")
  studentsWithNameAgeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append)
  
  //建立一個dataframe,做爲學生的成績信息,並寫入一個parquet文件中
   val studentsWithNameGrade =Array(("marry","A"),("jack","B"))
  
  val studentsWithNameGradeDF = sc.parallelize(studentsWithNameGrade, 2).toDF("name","age")
  studentsWithNameGradeDF.save("hdfs://hadoop01:8020/spark_out/students","parquet", SaveMode.Append)
  
  
  //首先,第一個dataframe和第二個dataframe的元數據確定是不同的
  //一個包含了name和age兩個列,一個是包含name和grade兩個列
  //因此,這指望的是,讀取出來的表數據,自動合併兩個文件的元數據,出現三列,name age grade 
  
  
  //用mergeSchema的方式,讀取students表中的數據,進行元數據的合併
  val studentsDF = sqlContext.read.option("mergeSchema", "true").parquet("hdfs://hadoop01:8020/spark_out/students")
  
  studentsDF.printSchema();
  studentsDF.show();
  
  } 
}
4、JSON數據源
Spark sql 能夠自動推斷JSON文件的元數據,而且加載其數據,建立一個dataframe。可使用SQLContext.read.json()方法,針對一個元素爲String的RDD,或者是一個JSON文件

可是要注意的是,這裏使用的JSON文件與傳統意義上的JSON文件是不同的。每行都必須也只能包含一個,單獨的,自包含的,有效的JSON對象。不能讓json對象分散在多行。不然會報錯

綜合性複雜案例:查詢成績爲80分以上的學生的基本信息與成績信息
Java 版本
package com.spark.spark_sql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;
/**
 * json數據源
 */
public class JSONDataSource {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JSONDataSource").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 針對json文件,建立出dataframe
        DataFrame studentDF = sqlContext.read().json("e://student.json");    //hdfs://hadoop01:8020/spark_input/student.json
        // 針對學生成績信息的dataframe,註冊臨時表,查詢分數大於80分的學生的姓名和分數
        studentDF.registerTempTable("student");

        DataFrame stuNameDF = sqlContext.sql("select name,score from student where score > 80");

        List<String> goodName = stuNameDF.javaRDD().map(new Function<Row, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public String call(Row row) throws Exception {

                return row.getString(0);
            }
        }).collect();

        // 針對JavaRDD<String> 建立dataframe
        List<String> studentInfoJSONs = new ArrayList<String>();
        studentInfoJSONs.add("{\"name\":\"leo\",\"age\":18}");
        studentInfoJSONs.add("{\"name\":\"marry\",\"age\":15}");
        studentInfoJSONs.add("{\"name\":\"jack\",\"age\":30}");
        JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);

        DataFrame studentInfoDF = sqlContext.read().json(studentInfoJSONsRDD);

        // 針對學生的基本信息的dataframe,註冊臨時表,而後查詢分數大於80分的學生的基本信息

        studentInfoDF.registerTempTable("info");
        String sql = "select name , age  from  info where name in (";

        for (int i = 0; i < goodName.size(); i++) {
            sql += "'" + goodName.get(i) + "'";
            if (i < goodName.size() - 1) {
                sql += ",";
            }
        }
        sql += ")";
        DataFrame goodStuInfosDF = sqlContext.sql(sql);

        // 將兩份數據的dataframe轉換爲javapairRDD,執行join transformation

        JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = goodStuInfosDF.javaRDD()
                .mapToPair(new PairFunction<Row, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(Row row) throws Exception {

                        return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1)));
                    }
                }).join(stuNameDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(Row row) throws Exception {
                        return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf((int) row.getLong(1)));
                    }
                }));
        
        //將封裝在RDD中好學生的所有信息,轉換爲一個javaRDD<Row>的格式
        JavaRDD<Row> rows = pairs.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception {
                
                return RowFactory.create(t._1,t._2._2,t._2._1);
            }
        });
        
        //建立一份元數據,將JavaRDD<Row>轉換爲dataframe
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame goodStudentsDF =sqlContext.createDataFrame(rows, structType);
                
        //將好學生的所有信息保存到json文件中去
        goodStudentsDF.write().format("json").save("e://good");    //hdfs://hadoop01:8020/spark_out/goodStudent
        
    }
}

 
Scala版本:
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType

object JSONDatasource {
  val conf = new SparkConf().setAppName("JSONDatasource")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  val studentScoreDF =sqlContext.read.json("hdfs://hadoop01:8020/spark_input/student.json")
  studentScoreDF.registerTempTable("stu_score")
  val goodStuScoreDF = sqlContext.sql("select name , score from stu_score where score > 80")
  
  val goodStuNames = goodStuScoreDF.map(row => row(0)).collect()
  
  
  val studentInfoJSON =Array("{\"name\":\"Leo\",\"age\":18}","{\"name\":\"marry\",\"age\":15}","{\"name\":\"jack\",\"age\":30}") 
  
  
  val studentInfoRDD =sc.parallelize(studentInfoJSON, 3)
  
  val studentInfoDF = sqlContext.read.json(studentInfoRDD)//json格式特有的,能夠接受RDD
  
  studentInfoDF.registerTempTable("stu_info")
  
  var sql = "select name , age from stu_info where name in ("
  
  for(i <- 0 until goodStuNames.length){
    
    sql += "'"+goodStuNames(i)+"'"
    if(i < goodStuNames.length-1){
      sql += ","
    }
    
  }
  sql += ")"
  
  val goodStuInfoDF = sqlContext.sql(sql)
  
  val goodStuRDD = goodStuInfoDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("age"))}.join (goodStuScoreDF.rdd.map{row => (row.getAs[String]("name"),row.getAs[Long]("score"))})
  
  val goodStuRows = goodStuRDD.map(info => Row(info._1,info._2._2.toInt,info._2._1.toInt))
  
  val structType = StructType(Array(StructField("name",StringType,true),StructField("score",IntegerType,true),StructField("age",IntegerType,true)))
  
  val goodStudentDF = sqlContext.createDataFrame(goodStuRows, structType)
  
  goodStudentDF.write.format("json").save("hdfs://hadoop01:8020/spark_out/good_scala")
  
  
}


5、Hive數據源
Spark sql 支持對hive中存儲的數據進行讀寫,操做hive中的數據時,就必須建立HiveContext,而不是SQLContext。HiveContext繼承SQLContext,可是增長了在hive元數據庫中查找表,以及用hiveql語法編寫SQL的功能。處理sql()方法外,HiveContext還提供hql()方法,從而用hive語法來編譯sql。

使用HiveContext,能夠執行Hive的大部分功能,包括建立表、往表裏導入數據以及用SQL語句查詢表中的數據,查詢出來的數據是一個row數組

將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下

HiveContext sqlContext = new HiveContext(sc);
sqlContext.sql(「create table if not exists student (name string ,age int)」)
sqlContext.sql(「load data local inpath ‘/usr/local/student.txt’into table students」);
Row[] teenagers = sqlContext.sql(「select name ,age from students where age <= 18」).collect();


將數據保存到表中
Spark sql還容許將數據保存到hive表中。調用Dataframe的saveAsTable,便可將dataframe中的數據保存到hive表中。與registerTempTable不一樣,saveAsTable是會將dataframe彙總的數據物化到hive表中,並且還會在hive元數據庫中建立表的元數據

默認狀況下,saveAsTable會建立一張hive managed table,也就是說,數據的位置都是有元數據庫中的信息控制的。當managed table 被刪除時,表中的數據也會一併內物理刪除

RegisterTempTable只是註冊一個臨時的表,只要spark application重啓或者中止了,那麼表就沒有了。而SaveAsTable建立的是物化的表,不管是spark application重啓仍是中止,表都會一直存在

調用HiveContext.table()方法,還能夠直接針對hive中的表,建立一個dataframe

案例:查詢分數大於80分的學生的信息
package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
/**
 * hive數據源
 * @author Administrator
 */
public class HiveDataSource {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("HiveDataSource");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //建立HiveContext,注意,這裏接收的是sparkContext做爲參數,不是Javasparkcontext
        HiveContext hiveContext = new HiveContext(sc.sc());
        
        //第一個功能,使用HiveContext的sql()/hql()方法,能夠執行hive中能夠執行的hiveQL語句
        //判斷是否存在student_info表,若是存在則刪除
        hiveContext.sql("drop  table if exists student_info");
        //判斷student_info表是否不存在,不存在就建立該表
        hiveContext.sql("create  table if not exists student_info(name string,age int) row format delimited fields terminated by '\t'");
        
        //將學生基本信息導入到Student_info表
        hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info");
        //用一樣的方式向student_scores導入數據
        hiveContext.sql("drop  table if exists student_score");
        
        hiveContext.sql("create  table if not exists student_score(name string,score int)row format delimited fields terminated by '\t'");
        
        hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score");
        
        //第二個功能個,執行sql還能夠返回dataframe,用於查詢
        
        //執行sql查詢,關聯兩種表,查詢成績大於80 分的學生信息
        DataFrame goodstudentDF  = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80");
        
        //第三個功能,能夠將dataframe中的數據,理論上來講,dataframe對應的RDD的元素是ROW就能夠
        //將Dataframe中的數據保存到hive表中
        
        //將dataframe中的數據保存到good_student_info表中
        hiveContext.sql("drop  table if exists good_student_info");
        goodstudentDF.saveAsTable("good_student_info");
        
        //第四個功能,可使用table()方法,針對hive表直接建立dataframe
        
        //針對good_student_info表,直接建立dataframe
        Row[] rows = hiveContext.table("good_student_info").collect();
        for (Row row : rows) {
            System.out.println(row);
        }
        
        sc.close();
    }
}



Scala版本:
package com.spark.spark_sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkContext

object HiveDataSource {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HiveDataSource");
        val sc = new SparkContext(conf)
        //建立HiveContext,注意,這裏接收的是sparkContext做爲參數,不是Javasparkcontext
        val hiveContext = new HiveContext(sc);
        
        //第一個功能,使用HiveContext的sql()/hql()方法,能夠執行hive中能夠執行的hiveQL語句
        //判斷是否存在student_info表,若是存在則刪除
        hiveContext.sql("drop  table if exists student_info");
        //判斷student_info表是否不存在,不存在就建立該表
        hiveContext.sql("create  table if not exists student_info(name string,age int) row format delimited fields terminated by '\t'");
        
        //將學生基本信息導入到Student_info表
        hiveContext.sql("load data local inpath '/home/hadoop/student_info.txt' into table student_info");
        
        //用一樣的方式向student_scores導入數據
        hiveContext.sql("drop  table if exists student_score");
        
        hiveContext.sql("create  table if not exists student_score(name string,score int)row format delimited fields terminated by '\t'");
        
        hiveContext.sql("load data local inpath '/home/hadoop/student_score.txt' into table student_score");
        
        //第二個功能個,執行sql還能夠返回dataframe,用於查詢
        
        //執行sql查詢,關聯兩種表,查詢成績大於80 分的學生信息
        val goodstudentDF  = hiveContext.sql("select i.name,i.age,s.score from student_info i join student_score s on i.name=s.name where s.score >=80");
        
        //第三個功能,能夠將dataframe中的數據,理論上來講,dataframe對應的RDD的元素是ROW就能夠
        //將Dataframe中的數據保存到hive表中
        
        //將dataframe中的數據保存到good_student_info表中
        hiveContext.sql("drop  table if exists good_student_info");
        goodstudentDF.saveAsTable("good_student_info");
        
        //第四個功能,可使用table()方法,針對hive表直接建立dataframe
        
        //針對good_student_info表,直接建立dataframe
        val rows = hiveContext.table("good_student_info").collect();
        for ( row <- rows) {
            println(row);
        }
  }
}

6、內置函數
在spark 1.5.x版本,增長了一系列內置函數到dataframe API中,而且實現了code-generation的優化。與普通的函數不一樣,dataframe的函數並不會執行後當即返回一個結果值,而是返回一個Column對象,用於在並行做業中進行求值。Column能夠用在dataframe的操做之中,好比select, filter,groupBy等操做。函數的輸入值,也能夠是Column。

種類    函數
聚合函數    approxCountDistinct,avg,count,
countDistinct,first,last,max,mean,min,sum,sumDistinct
集合函數    Array_contains,explode,size,sort_array
日期/時間函數    日期時間轉換
Unix_timestamp,from_unixtime,to_date
Quarter,day,dayofyear,weekofyear,
From_utc_timestamp,to_utc_timestamp
從日期中提取字段
Year,month,dayofmonth,hour,minute,
second
日期/時間函數    日期時間計算
Dateiff,date_add,date_sub,add_months,
last_day,next_day,months_between
獲取當前時間
Current_date,current_timestamp,trunc,
date_format
數學函數    Abs,scros,asin,atan,atan2,bin,cbrt,
ceil,conv,cos,sosh,exp,expm1,factorial,floor,hex,hypot,log,log10,log1p,log2,
Pmod,pow,rint,round,shiftLeft,
shiftRight,shiftRightUnsigned,sifnum,
Sin,sinh,sqrt,tan,tanh,toDegrees,
toRadians,unhex
混合函數    Array,bitwiseNOT,callUDF,coalesce,
crc32,greatest,if,inputFileName,isNaN,
isnotnull,isnull,least,lit,md5,
monotonicalliIncreasingId,nanvl,negate,not,rand,randn,randn
sha,sha1,sparkPartitionId,struct,when
字符串函數    Ascii,base64,concat,concat_ws,decode,encode,format_number,format_string,get_json_object,initcap,instr,length,levenshtein,locate,lower,lpad,ltrim,printf,redexp_extract,regexp_replace,repeat,reverse,rpad,rtrim,soundex,space,split,substring,substring_index,translate,trim,unbase64,upper
窗口函數     cumeDist,denseRank,lag,lead,ntile,percentRank,rank,rowNumber

案例:
根據天天的用戶訪問和購買日誌,統計每日的UV和銷售額

統計每日的UV
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._

object DailyUV {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DailyUV")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //構造用戶 訪問日誌數據,並建立dataframe
    //要使用spark內置函數,就必須在這裏代入sparkcontext下的隱式轉換
    import sqlContext.implicits._
    //模擬用戶訪問日誌,日誌用逗號隔開,第一列是日期,第二列是用戶id
    val userAccessLog = Array("2015-10-01,1122","2015-10-01,1122", "2015-10-01,1123", "2015-10-01,1125","2015-10-01,1125", "2015-10-02,1122", "2015-10-02,1124","2015-10-02,1122",  "2015-10-02,1123")
    val userAccessLogRDD= sc.parallelize(userAccessLog, 1);
    //將模擬出來的用戶日誌RDD,轉換爲dataframe
    //首先將普通的RDD轉換爲row的RDD
    val userAccessLogRowRDD = userAccessLogRDD.map(log =>Row(log.split(",")(0),log.split(",")(1).toInt))
  
    
    //構造dataframe元數據
    val structType = StructType(Array(StructField("date",StringType,true),StructField("userid",IntegerType,true)))
    val userAccessLogRowDF=sqlContext.createDataFrame(userAccessLogRowRDD, structType)
    
    //這裏正式使用spark1.5.x版本的提供的最新特性,內置函數,countDistinct
    //天天都有不少用戶來訪問,可是每一個用戶可能天天都會訪問不少次
    //UV:指的是對用戶進行去重之後的訪問總數
    //聚合函數的用法;
    //首先對dataframe調用groupBy()方法,對某一列進行分組
    //而後調用agg()方法,第一個參數,必須必須傳入以前在groupBy方法中出現的字段
    //第二個參數,傳入countDistinct、sum,first等,spark提供的內置函數
    //內置函數中,傳入的參數也是用單引號做爲前綴的,其餘的字段
    
    
    userAccessLogRowDF.groupBy("date").agg('date, countDistinct('userid)).map(row => Row(row(1),row(2))).collect.foreach(println)
        
  }
}


統計每日的銷售額
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.functions._

object DailySale {
  
  val conf = new SparkConf().setMaster("local").setAppName("DailySale")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  
  //模擬元數據
  //實際上,有些時候,會出現日誌的上報錯誤,好比日誌裏丟了用戶的信息,那麼這種,就一概不統計了
  val userSaleLog = Array("2015-10-01,100,1122","2015-10-01,100,1133","2015-10-01,100,","2015-10-02,300,1122","2015-10-02,200,1122","2015-10-02,100,","2015-10-02,100,1122")

  val userSaleLogRDD = sc.parallelize(userSaleLog, 1);
  
  //進行過濾
  
  val filteredUserSaleRowRDD = userSaleLogRDD.filter(log => if(log.split(",").length ==3) true else false)
  
  
  val userSaleLogRowRDD = filteredUserSaleRowRDD.map(log => Row(log.split(",")(0),log.split(",")(1).toDouble))
  
  val structType = StructType(Array(StructField("date",StringType,true),StructField("sale_amount",DoubleType,true)))
  
  val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)
  //開始統計每日銷售額的統計
  userSaleLogDF.groupBy("date").agg('date, sum('sale_amount)).map(row => Row(row(1),row(2))).collect().foreach(println)
  
  
}
7、開窗函數
Spark1.4.x版本之後,爲spark sql和dataframe引入了開窗函數,好比最經典最經常使用的row_number(),可讓咱們實現分組取topN的邏輯

案例:統計每一個種類的銷售額排名前3 的產品

package com.spark.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

/**
 * 開窗函數row_number
 * @author Administrator
 */
public class RowNumberWindowFunction {
public static void main(String[] args) {
    
    SparkConf conf = new SparkConf().setAppName("NewNumberWindowFunction");
    JavaSparkContext sc = new JavaSparkContext(conf);
    HiveContext hiveContext = new HiveContext(sc.sc());
    
    //建立銷售表sales表
    hiveContext.sql("drop table if exists sales");
    hiveContext.sql("create table if not exists sales (product string,category string ,revenue bigint) row format delimited fields terminated by '\t'");
    hiveContext.sql("load data local inpath '/home/hadoop/sales.txt' into table sales");
    //開始編寫咱們的統計邏輯,使用row_number()開窗函數
    //先說明一下,row_number()開窗函數的做用
    //其實就是給每個分組的數據,按照其排序順序,打上一個分組內的行號
    //好比說,有一個分組date=20151001,裏面有三條數據,1122 1121 1124 
    //那麼對這個分組的每一行使用row_number()開窗函數之後,三行依次會得到一個組內的行號
    //行號從1開始遞增,好比 1122 11121 21124 3
    
    DataFrame top3SalesDF = hiveContext.sql("select product,category,revenue from "
            + "(select product,category,revenue ,row_number() over (partition by category order by revenue desc ) rank from sales) tmp_sales  where rank <=3");
    
    //row_number()開窗函數的語法說明
    //首先,能夠在select查詢時,使用row_number()函數
    //其次,row_number()函數後面先跟上over關鍵字
    //而後括號中是partition by 也就是根據那個字段進行分組
    //其次是可使用order by進行組內排序
    //而後row_number()就能夠給每個組內的行,一個組一個行號
    
    //將每組排名前三的數據,保存到一個表中
    hiveContext.sql("drop table if exists top3_sales");
    top3SalesDF.saveAsTable("top3_sales");
    sc.close();
}
}

函數名(列) OVER(選項) 
8、UDF自定義函數
UDF:User Defined Function.用戶自定義函數
package com.spark.spark_sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object UDF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("UDF")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    
    //構造模擬數據
    val names =Array("Leo","Marry","Jack","Tom")
    val namesRDD = sc.parallelize(names, 1)
    val namesRowRDD = namesRDD.map(name => Row(name))
    val structType =StructType(Array(StructField("name",StringType)))
    val namesRowDF =sqlContext.createDataFrame(namesRowRDD, structType)
    
    //註冊一張name表
    namesRowDF.registerTempTable("names")
    
    //定義和註冊自定義函數
    //定義函數
    //註冊函數
    sqlContext.udf.register("strLen",(str:String) => str.length)
    
    //使用本身的函數
    sqlContext.sql("select name,strLen(name) from names").collect.foreach(println)

  }
}
9、UDAF自定義聚合函數
UDAF:User Defined Aggregate Function。用戶自定義聚合函數,是spark1.5.x引入的最新特性。
UDF針對的是單行輸入,返回一個輸出,
UDAF,則能夠針對多行輸入,進行聚合計算,返回一個輸出,功能更加的強大

Scala代碼:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._


class StringCount extends UserDefinedAggregateFunction {
  //inputschema   指的是,輸入數據的數據
  override def inputSchema: StructType = {
    StructType(Array(StructField("str",StringType,true)))
  }
  //bufferSchema,指的是,中間進行聚合時,所處理的數據類型
  override def bufferSchema: StructType = {
    StructType(Array(StructField("count",IntegerType,true)))
  }

  //dataType,指的是,函數的返回值的類型
  override def dataType: DataType = {
   IntegerType
  }
  override def deterministic: Boolean = {
    true
  }
  //爲每個分組的數據執行初始化操做
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=0
  }
  //指的是,每一個分組,有新的值進來的時候,如何進行分組,對應的聚合值的計算
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getAs[Int](0)+1

  }
  //因爲spark是分佈式的,因此一個分組的數據,可能會在不一樣的節點上進行局部的聚合,就是update
  //可是,最後一個分組,在各個節點上的聚合值,要執行merge,也就是合併
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
  }
  //最後,指的是,一個分組的聚合,如何經過中間的緩存聚合值,隨後返回一個最終的聚合值
  override def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0)
  }
}


10、工做原理以及性能調優

1.工做原理
Sqlparse、Analyser、Optimizer、SparkPlan



2.性能調優
1.設置shuffle過程當中的並行度spark.sql.shuffle.partitions(sqlContext.setConf())
2.在hive數據倉庫建設過程當中,合理設置數據類型,好比能設置爲int的就不要設置爲bigint.減小數據類型致使的沒必要要的內存開銷
3.編寫sql時,儘可能給出明確的列名,好比select name from students。不要寫select * 的方式
4.並行處理查詢結果:對於spark sql查詢的結果,若是數據量比較大的話,好比超多1000條,那麼就不要一次collect到driver在處理。使用foreach()算子,並行處理查詢結果
5.緩存表:對於一條sql語句中可能屢次使用到表,能夠對其進行緩存沒使用sqlContext.cache Table(tableName),或者DataFrame.cache()便可spark sql會用內存列存儲的格式進行表的緩存。而後將spark sql就能夠僅僅掃描須要使用的列,而且自動優化壓縮,來最小化內存使用和GC開銷。sqlcontext.uncacheTable(tableName)能夠將表從緩存中移除。用sqlcontext.setConf(),設置spark.sql.inMemoryColumnarStorage.batchSize參數(默認10000),能夠配置列存儲單位

6.廣播join表:spark.sql.autoBroadcastJoinThreshold.默認是10485760(10MB),也就是在10M之內的表將其廣播出去,在內存足夠用的狀況下,增長其大小,讓更多的表廣播出去,就能夠將join中的較小的表廣播出去,而不是進行網絡數據傳輸了。
7.鎢絲計劃:spark.sql.tungsten.enable,默認是true 自動管理內存

最有效的就是第四點,緩存表和廣播join表也是很是不錯的



11、Hive on spark 
背景:
Hive是目前大數據領域,事實上的sql標準。其底層默認是基於MapReduce實現的。可是因爲mapreduce速度是在比較慢。所以這兩年,陸續出來了新的sql查詢引擎。包括spark sql,hive on Taz ,hive on spark 

Spark sql 與hive on spark 是不同的,spark sql 本身研發出來的針對各類數據源,包括hive、json、parquet、JDBC、RDD等均可以執行查詢的,一套基於spark計算引擎的查詢引擎。所以它是spark的一個項目,只不過提供了針對hive執行查詢的功能而已。適合在一些使用spark技術棧的大數據應用類系統中使用。

而hive on spark是hive 的一個項目,他是指,不經過mapreudce做爲惟一的引擎,而是將spark做爲底層的查詢引擎,hive on spark ,只適用於hive,在可預見的將來,頗有可能hive默認的底層引擎從mapreduce切換爲spark,適合於將原有的hive數據倉庫以及數據分析替換爲spark引擎,做爲公司通用大數據統計計算分析引擎

Hive基本的工做原理:
hiveQL語句==>
語法分==>AST==>
生成邏輯執行計劃==>operator Tree==>
優化邏輯執行計劃==>optimized operator Tree==>
生成物理執行計劃==>Task Tree==>
優化物理執行計劃==>Optimized Task Tree==>
執行優化後的Optimized  Task Tree



Hive on spark 的計算原理有以下的幾個要點:
1.將hive表做爲spark RDD來進行操做

2.使用hive 原語
對於一些針對RDD的操做,好比,groupByKey(),sortByKey()等等。不使用spark的transformation操做和原語。若是那樣作的話,那麼就須要從新實現一套hive的原語,並且hive增長了新功能,那麼又要實現新的spark 原語。所以,選擇將hive的原語包裝爲針對RDD的操做便可

3.新的物理執行計劃生成機制
使用sparkCompiler將邏輯執行計劃,集=即Operator Tree,轉換爲Task tree。提交spark task給spark進行執行。Sparktask包裝了DAG,DAG包裝爲sparkwork,Sparktask根據sparkwork表示的DAG計算

4.sparkContext生命週期
Hive on spark 會爲每個用戶的會話,好比執行一次sql語句,建立一個sparkcontext,可是spark不容許在一個JVM內建立多個sparkcontext,所以,須要在單獨額JVM中啓動每個會話的sparkcontext,而後經過RPC與遠程JVM中的sparkContext進行通訊。

5.本地和遠程運行模式
Hive on spark 提供兩種運行模式,本地和遠程。若是將spark master設置爲local






12、Spark sql與spark core整合
案例:每日top3熱點搜索詞統計案例
日誌格式:
日期、用戶、搜索詞、城市、平臺、版本

需求:
1.篩選出符合條件(城市、平臺、版本))的數據
2.統計出天天搜索的UV排名前三的搜索詞
3.按照天天的top3搜索詞的UV搜總次數,倒序排列
4.將數據保存到hive表中

實現思路分析
1.針對原始的數據(HDFS文件),獲取輸入RDD
2.使用filter算子,去針對輸入RDD中的數據,進行數據過濾,過濾出符合條件的數據
2.1普通的作法:直接在filter算子函數中,使用外部的查詢條件(map),可是這樣的話,是否是查詢條件map,會發送到每個task上一份副本(性能並很差)
2.2優化後的作法:將查詢條件,封裝爲Broadcast廣播變量,在filter算子中使用Broadcast廣播變量

3.將數據轉換爲「日期_搜索詞,用戶」的格式,而後對他進行分組,再次進行映射。對天天每一個搜索詞的搜索用戶進行去重操做,並統計去重後的數量,即爲天天每一個搜索詞的UV,最後得到(日期_搜索詞,UV)

4.將獲得的天天的搜索詞的UV,RDD映射爲元素類型的Row的RDD,將RDD轉換爲dataframe
5.將dataframe註冊爲臨時表,使用spark sql的開窗函數,來統計天天的UV排名前三的搜索詞,以及他的搜索UV,最後獲知,是一個dateframe
6.將dateframe轉換爲RDD。繼續操做,按照天天日期進行分組。並進行映射。計算出天天的top3所搜詞的搜索uv的總數,而後將UV總數做爲key,將天天的top3搜索詞以及搜索次數,拼接爲一個字符串
7.按照天天的top3搜索總UV,進行排序,倒序排序
8.將排好序的數據,再次映射回來,變成「日期_搜索詞_UV」的格式
9.再次映射爲dataframe,並將數據保存到hive表中

package com.spark.spark_sql;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

/**
 * 每日top3熱點搜索詞統計
 * @author Administrator
 *
 */
public class DailyTop3Keyword {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DailyTop3Keyword");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        HiveContext hiveContext = new HiveContext(jsc.sc());
        //針對hdfs上的文件中的日誌,獲取輸入的RDD
        JavaRDD<String> rawRDD = jsc.textFile("hdfs://hadoop01:8020/keyword.txt");
        //僞造一份數據,查詢條件
        //備註:實際上,在實際的企業的項目開發中,極可能,這個查詢條件是J2EE平臺發送到MySQL表中的
        //而後,這裏實際上一般是會用Spring框架和ORM框架的,去提取MySQL表中的查詢條件
        Map<String,List<String>> queryParamMap =new HashMap<String, List<String>>();
        queryParamMap.put("city", Arrays.asList("beijing"));
        queryParamMap.put("platform", Arrays.asList("android"));
        queryParamMap.put("version", Arrays.asList("1.0","1.2","1.5","2.0"));
        
        //將map封裝爲廣播變量,每一個work節點只拷貝一份數據便可,這樣能夠進行優化
        final Broadcast<Map<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParamMap);
        
        //使用廣播變量進行篩選
        JavaRDD<String> filterRDD = rawRDD.filter(new Function<String, Boolean>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(String log) throws Exception {
                String[] logSplited = log.split("\t");
                String city =logSplited[3];
                String platform =logSplited[4];
                String version = logSplited[5];
                //與查詢條件進行比較,任何一個
                Map<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
                
                List<String> cities = queryParamMap.get("city");
                if(cities.size()>0 && !cities.contains(city)){
                    return false;
                }
                
                List<String> platforms = queryParamMap.get("city");
                if(platforms.size()>0 && !platforms.contains(platform)){
                    return false;
                }
                List<String> versions = queryParamMap.get("city");
                if(versions.size()>0 && !versions.contains(version)){
                    return false;
                }
                
                return true;
            } 
        });
        
        //將過濾出來的原始的日誌進行映射爲(日期_搜索詞,用戶)的格式
        JavaPairRDD<String, String> dateKeywordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {

            
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, String> call(String t) throws Exception {
                 String[] logSplited = t.split("\t");
                 String date =logSplited[0];
                 String user =logSplited[1];
                 String keyword =logSplited[2];
    
                return new Tuple2<String, String>(date+"_"+keyword, user);
            }
        });
        
        //進行分組,獲取天天的搜索詞,有哪些用戶進行搜索了(沒有去重)
        JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeywordUserRDD.groupByKey();
        
        //對天天每一個搜索詞的搜索用戶,執行去重操做,得到其UV值
        JavaPairRDD<String, Long> dateKeywordUVRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, String, Long>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> t) throws Exception {
                String  dateKeyword = t._1;
                Iterator<String> users = t._2.iterator();
                //對用戶進行去重,而且統計數量
                List<String> distinctUsers = new ArrayList<String>();
                
                while(users.hasNext()){
                    String user = users.next();
                    if(!distinctUsers.contains(user)){
                        distinctUsers.add(user);
                    }
                }
                //獲取uv
                long uv = distinctUsers.size();
                return new Tuple2<String, Long>(dateKeyword, uv);
            }
        });
        
        //將天天每一個搜索詞的UV數據轉換成dataframe
        JavaRDD<Row> dateKeywordUVRowRDD = dateKeywordUVRDD.map(new Function<Tuple2<String,Long>, Row>() {

            @Override
            public Row call(Tuple2<String, Long> v1) throws Exception {
                String date = v1._1.split("_")[0];
                String keyword = v1._1.split("_")[1];
                long uv = v1._2;
                return RowFactory.create(date,keyword,uv);
            }
        });
        
        List<StructField> structFields = Arrays.asList(
                DataTypes.createStructField("date", DataTypes.StringType, true),
                DataTypes.createStructField("keyword", DataTypes.StringType,true),
                DataTypes.createStructField("uv", DataTypes.LongType,true)
                );
        
        StructType structType =DataTypes.createStructType(structFields);
        DataFrame dateKeywordUVDF = hiveContext.createDataFrame(dateKeywordUVRowRDD, structType);
        
        //使用spark sql的開窗函數,統計天天的搜索排名前三的熱點搜索詞
        dateKeywordUVDF.registerTempTable("daily_keyword_uv");
        
        
        DataFrame dailyTop3KeywordDF = hiveContext.sql("select date,keyword,uv from "
                + "(select date,keyword,uv,row_number over (partition by date order by uv desc ) rank from daily_keyword_uv )tmp "
                + "where rank <=3");
        
        //將dateframe 轉換爲RDD,而後映射,計算出天天的top3搜索詞的搜索uv的總數
        JavaPairRDD<String, String> Top3DateKeywordUvRDD = dailyTop3KeywordDF.javaRDD().mapToPair(new PairFunction<Row, String, String>() {

            @Override
            public Tuple2<String, String> call(Row row) throws Exception {
                String date = String.valueOf(row.get(0));
                String keyword =String.valueOf(row.get(1));
                Long uv = Long.valueOf(String.valueOf(row.get(2)));
                
                return new Tuple2<String, String>(date, keyword+"_"+uv);
            }
        });
        
        JavaPairRDD<String, Iterable<String>> Top3DateKeywordRDD = Top3DateKeywordUvRDD.groupByKey();
        JavaPairRDD<Long, String> uvDateKeywordsRDD = Top3DateKeywordRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, Long, String>() {

            @Override
            public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> t) throws Exception {
                String date = t._1;
                
                Iterator<String> keywordUvIterator = t._2.iterator();
                Long totalUv = 0L;
                String dateKeywordUv = date;
                while(keywordUvIterator.hasNext()){
                    String keywordUv= keywordUvIterator.next();
                    Long uv = Long.valueOf(keywordUv.split("_")[1]);
                    totalUv+=uv;
                    dateKeywordUv+=","+keywordUv;
                }
                return new Tuple2<Long, String>(totalUv, dateKeywordUv);
            }
        });
        //按照天天的總搜索進行倒序排序
        
        JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
        
        //在此進行映射,將排序後的數據,映射回原始的格式Iterable<ROW>
        JavaRDD<Row> sortedRowRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long,String>, Row>() {

            @Override
            public Iterable<Row> call(Tuple2<Long, String> t) throws Exception {
                String dateKeywords=t._2;
                String[] dateKeywordsSplited = dateKeywords.split(",");
                String date = dateKeywordsSplited[0];
                List<Row> rows= new ArrayList<Row>();
                rows.add(RowFactory.create(date,
                        dateKeywordsSplited[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplited[1].split("_")[1])
                        ));
                rows.add(RowFactory.create(date,
                        dateKeywordsSplited[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplited[2].split("_")[1])
                        ));
                rows.add(RowFactory.create(date,
                        dateKeywordsSplited[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplited[3].split("_")[1])
                        ));    
                return rows;
            }
        });
        
        //將最終的數據轉換爲dateframe
        DataFrame finalDF = hiveContext.createDataFrame(sortedRowRDD, structType);
        
        finalDF.saveAsTable("daily_top3_keyword_uv");
        jsc.close();
    }
}
相關文章
相關標籤/搜索