Spark SQL JAVA和Scala編寫Spark SQL程序實現RDD轉換成DataFrame+操做HiveContext+操做Mysql

1、 以編程方式執行Spark SQL查詢

1. 編寫Spark SQL程序實現RDD轉換成DataFrame

前面咱們學習瞭如何在Spark Shell中使用SQL完成查詢,如今咱們經過IDEA編寫Spark SQL查詢程序。java

Spark官網提供了兩種方法來實現從RDD轉換獲得DataFrame,第一種方法是利用反射機制,推導包含某種類型的RDD,經過反射將其轉換爲指定類型的DataFrame,適用於提早知道RDD的schema。第二種方法經過編程接口與RDD進行交互獲取schema,並動態建立DataFrame,在運行時決定列及其類型。node

首先在maven項目的pom.xml中添加Spark SQL的依賴。mysql

 

 

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

 

1.1. 經過反射推斷Schema

Scala支持使用case class類型導入RDD轉換爲DataFrame,經過case class建立schema,case class的參數名稱會被利用反射機制做爲列名。這種RDD能夠高效的轉換爲DataFrame並註冊爲表。sql

代碼以下:shell

Java版本

    

package com.hzk.sparksql;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;

public class ReflectTransform {

public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
JavaRDD<String> lines=spark.read().textFile("D:\\Bigdata\\20.sparksql\\二、以編程方式執行sparksql\\person.txt").javaRDD();


JavaRDD<Person> rowRDD = lines.map(line -> {
String parts[] = line.split(" ");
return new Person(Integer.valueOf(parts[0]),parts[1],Integer.valueOf(parts[2]));
});

Dataset<Row> df = spark.createDataFrame(rowRDD, Person.class);
// df.select("id", "name", "age").
// coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
df.foreach(new ForeachFunction<Row>() {
@Override
public void call(Row row) throws Exception {
System.out.println("id:"+row.get(0)+",name:"+row.get(1)+",age:"+row.get(2));
}
});
}


static class Person implements Serializable {
private int id;
private String name;
private int age;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public Person(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;

}
}
}

 

Scala版本


import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * RDD
轉化成DataFrame:利用反射機制
  */
//todo:定義一個樣例類Person
case class Person(id:Int,name:String,age:Int)


object CaseClassSchema {

  def main(args: Array[String]): Unit = {
      //todo:一、構建sparkSession 指定appName和master的地址
   
val spark: SparkSession = SparkSession.builder()

                              .appName("CaseClassSchema")
                              .master("local[2]").getOrCreate()
      //todo:2、從sparkSession獲取sparkContext對象
     
val sc: SparkContext = spark.sparkContext

      sc.setLogLevel("WARN")//設置日誌輸出級別
      //todo:3、加載數據
     
val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")

      //todo:4、切分每一行記錄
     
val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))

      //todo:5、將RDD與Person類關聯
     
val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))

      //todo:6、建立dataFrame,須要導入隱式轉換
     
import spark.implicits._

      val personDF: DataFrame = personRDD.toDF()

    //todo-------------------DSL語法操做 start--------------
   
//一、顯示DataFrame的數據,默認顯示20行

    personDF.show()
    //二、顯示DataFrame的schema信息
    personDF.printSchema()
    //三、顯示DataFrame記錄數
    println(personDF.count())
    //四、顯示DataFrame的全部字段
    personDF.columns.foreach(println)
    //五、取出DataFrame的第一行記錄
    println(personDF.head())
    //六、顯示DataFrame中name字段的全部值
    personDF.select("name").show()
    //七、過濾出DataFrame中年齡大於30的記錄
    personDF.filter($"age" > 30).show()
    //八、統計DataFrame中年齡大於30的人數
    println(personDF.filter($"age">30).count())
    //九、統計DataFrame中按照年齡進行分組,求每一個組的人數
    personDF.groupBy("age").count().show()
    //todo-------------------DSL語法操做 end-------------
   
//todo--------------------SQL操做風格 start-----------
   
//todo:將DataFrame註冊成表
   
personDF.createOrReplaceTempView("t_person")

    //todo:傳入sql語句,進行操做

   
spark.sql("select * from t_person").show()


    spark.sql("select * from t_person where name='zhangsan'").show()

    spark.sql("select * from t_person order by age desc").show()
    //todo--------------------SQL操做風格 end-------------
   
sc.stop()
spark.stop()
  }
}
 

1.2. 經過StructType直接指定Schema

當case class不能提早定義好時,能夠經過如下三步建立DataFrame數據庫

(1)將RDD轉爲包含Row對象的RDDapache

(2)基於StructType類型建立schema,與第一步建立的RDD相匹配編程

(3)經過sparkSession的createDataFrame方法對第一步的RDD應用schema建立DataFrameapi

Java版本

 

package com.hzk.sparksql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.io.Serializable;
import java.util.ArrayList;

public class DynamicTransform {
public static void main(String[] args) {

SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
JavaRDD<String> lines=spark.read().textFile("D:\\Bigdata\\20.sparksql\\二、以編程方式執行sparksql\\person.txt").javaRDD();

JavaRDD<Row> personMaps=lines.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
String[] personString=s.split(" ");
return RowFactory.create(Integer.valueOf(personString[0]),personString[1],Integer.valueOf(personString[2]));
}
});
ArrayList<StructField> fields = new ArrayList<StructField>();
StructField field = null;
field = DataTypes.createStructField("id", DataTypes.IntegerType, true);
fields.add(field);
field = DataTypes.createStructField("name", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("age", DataTypes.IntegerType, true);
fields.add(field);

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> df = spark.createDataFrame(personMaps, schema);
df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
df.foreach(new ForeachFunction<Row>() {
@Override
public void call(Row row) throws Exception {
System.out.println("id:"+row.get(0)+",name:"+row.get(1)+",age:"+row.get(2));
}
});
}

}
 

Scala版本app



import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * RDD
轉換成DataFrame:經過指定schema構建DataFrame
  */
object SparkSqlSchema {

  def main(args: Array[String]): Unit = {
      //todo:1、建立SparkSession,指定appName和master
     
val spark: SparkSession = SparkSession.builder()

                                .appName("SparkSqlSchema")
                                .master("local[2]")
                                .getOrCreate()
      //todo:2、獲取sparkContext對象
   
val sc: SparkContext = spark.sparkContext

      //todo:3、加載數據
   
val dataRDD: RDD[String] = sc.textFile("d:\\person.txt")

      //todo:4、切分每一行
   
val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))

      //todo:5、加載數據到Row對象中
   
val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))

      //todo:6、建立schema
   
val schema:StructType= StructType(Seq(

                                      StructField("id", IntegerType, false),
                                      StructField("name", StringType, false),
                                      StructField("age", IntegerType, false)
                                    ))

     //todo:7、利用personRDD與schema建立DataFrame
   
val personDF: DataFrame = spark.createDataFrame(personRDD,schema)


    //todo:8、DSL操做顯示DataFrame的數據結果
   
personDF.show()


    //todo:9、將DataFrame註冊成表
   
personDF.createOrReplaceTempView("t_person")

   
    //todo:10、sql語句操做
   
spark.sql("select * from t_person").show()


    spark.sql("select count(*) from t_person").show()


    sc.stop()
spark.stop()
  }
}
 

 

 

 

2. 編寫Spark SQL程序操做HiveContext

 

HiveContext是對應spark-hive這個項目,與hive有部分耦合, 支持hql,是SqlContext的子類,在Spark2.0以後,HiveContext和SqlContext在SparkSession進行了統一,能夠經過操做SparkSession來操做HiveContext和SqlContext。

2.1. 添加pom依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

2.2. 代碼實現

package gec.sql

import org.apache.spark.sql.SparkSession
/**
  *
todo:Sparksql操做hive的sql
 
*/
object HiveSupport {

  def main(args: Array[String]): Unit = {
      //todo:1、建立sparkSession
    
val spark: SparkSession = SparkSession.builder()

       .appName("HiveSupport")
       .master("local[2]")
       .config("spark.sql.warehouse.dir", "d:\\spark-warehouse")
       .enableHiveSupport() //開啓支持hive
       .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")  //設置日誌輸出級別


    //todo:2、操做sql語句

   
spark.sql("CREATE TABLE IF NOT EXISTS person (id int, name string, age int) row format delimited fields terminated by ' '")

    spark.sql("LOAD DATA LOCAL INPATH './data/student.txt' INTO TABLE person")
    spark.sql("select * from person ").show()
    spark.stop()
  }
}

須要在當前項目下建立一個data目錄,而後在data目錄下建立一個student.txt數據文件。

 

 

 

3.編寫Spark SQL程序操做Mysql

 

1. JDBC

Spark SQL能夠經過JDBC從關係型數據庫中讀取數據的方式建立DataFrame,經過對DataFrame一系列的計算後,還能夠將數據再寫回關係型數據庫中。

1.1. SparkSql從MySQL中加載數據

1.1.1 經過IDEA編寫SparkSql代碼

Java版本

 

public static void dataFromMysql() {
SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
//todo:三、讀取mysql中的數據
Dataset<Row> df = spark.read().jdbc("jdbc:mysql://localhost:3306/baidu", "student", properties);
df.show();
}
 

 

Scala版本

package gec.sql
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
  *
todo:Sparksql從mysql中加載數據
 
*/
object DataFromMysql {

  def main(args: Array[String]): Unit = {
      //todo:1、建立sparkSession對象
     
val spark: SparkSession = SparkSession.builder()

        .appName("DataFromMysql")
        .master("local[2]")
        .getOrCreate()
    //todo:2、建立Properties對象,設置鏈接mysql的用戶名和密碼
   
val properties: Properties =new Properties()

    properties.setProperty("user","root")
    properties.setProperty("password","123456")
    //todo:3、讀取mysql中的數據
   
val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.200.100:3306/spark","iplocation",properties)

    //todo:4、顯示mysql中表的數據
   
mysqlDF.show()

    spark.stop()
  }
}

執行查看效果:

 

 

1.1.2 經過spark-shell運行

(1)、啓動spark-shell(必須指定mysql的鏈接驅動包)

 

spark-shell \

--master spark://node1:7077 \

--executor-memory 1g \

--total-executor-cores  2 \

--jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \

--driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar

 

(2)、從mysql中加載數據

val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()

 

(3)、執行查詢

 

 

1.2. SparkSql將數據寫入到MySQL中

1.2.1 經過IDEA編寫SparkSql代碼

(1)編寫代碼

Java版本

 

public static void sparkSqlToMysql() {
SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
JavaRDD<String> lines = spark.read().textFile("D:\\Bigdata\\20.sparksql\\二、以編程方式執行sparksql\\person.txt").javaRDD();
JavaRDD<Person> personRDD = lines.map(new Function<String, Person>() {
@Override
public Person call(String s) throws Exception {
String[] strings = s.split(" ");
return new Person(Integer.valueOf(strings[0]), strings[1], Integer.valueOf(strings[2]));
}
});
Dataset<Row> df = spark.createDataFrame(personRDD, Person.class);
df.createOrReplaceTempView("person");
Dataset<Row> resultDF = spark.sql("select * from person order by age desc");
Properties properties1 = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
resultDF.write().jdbc("jdbc:mysql://localhost:3306/baidu", "person", properties);
spark.stop();


}

public static class Person implements Serializable {
private int id;
private String name;
private int age;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public Person(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;

}
}

 

Scala版本

package gec.sql
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
/**
  *
todo:sparksql寫入數據到mysql中
 
*/
object SparkSqlToMysql {

  def main(args: Array[String]): Unit = {
    //todo:1、建立sparkSession對象
     
val spark: SparkSession = SparkSession.builder()

        .appName("SparkSqlToMysql")
        .getOrCreate()
    //todo:2、讀取數據
     
val data: RDD[String] = spark.sparkContext.textFile(args(0))

    //todo:3、切分每一行,
   
val arrRDD: RDD[Array[String]] = data.map(_.split(" "))

    //todo:4、RDD關聯Student
   
val studentRDD: RDD[Student] = arrRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt))

    //todo:導入隱式轉換
   
import spark.implicits._

    //todo:5、將RDD轉換成DataFrame
   
val studentDF: DataFrame = studentRDD.toDF()

    //todo:6、將DataFrame註冊成表
   
studentDF.createOrReplaceTempView("student")

    //todo:7、操做student表 ,按照年齡進行降序排列
   
val resultDF: DataFrame = spark.sql("select * from student order by age desc")


    //todo:8、把結果保存在mysql表中
     
//todo:建立Properties對象,配置鏈接mysql的用戶名和密碼
     
val prop =new Properties()

      prop.setProperty("user","root")
      prop.setProperty("password","123456")

  resultDF.write.jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)

    //todo:寫入mysql時,能夠配置插入mode,overwrite覆蓋,append追加,ignore忽略,error默認表存在報錯
   
//resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)

    spark.stop()
  }
}
//todo:建立樣例類Student
case class Student(id:Int,name:String,age:Int)
 

 

(2)用maven將程序打包

經過IDEA工具打包便可

 

(3)將Jar包提交到spark集羣

spark-submit \

--class gec.sql.SparkSqlToMysql \

--master spark://node1:7077 \

--executor-memory 1g \

--total-executor-cores 2 \

--jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar  \

--driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \

/root/original-spark-2.0.2.jar  /person.txt

 

 

(4)查看mysql中表的數據

相關文章
相關標籤/搜索