1、 以編程方式執行Spark SQL查詢
1. 編寫Spark SQL程序實現RDD轉換成DataFrame
前面咱們學習瞭如何在Spark Shell中使用SQL完成查詢,如今咱們經過IDEA編寫Spark SQL查詢程序。java
Spark官網提供了兩種方法來實現從RDD轉換獲得DataFrame,第一種方法是利用反射機制,推導包含某種類型的RDD,經過反射將其轉換爲指定類型的DataFrame,適用於提早知道RDD的schema。第二種方法經過編程接口與RDD進行交互獲取schema,並動態建立DataFrame,在運行時決定列及其類型。node
首先在maven項目的pom.xml中添加Spark SQL的依賴。mysql
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency> |
1.1. 經過反射推斷Schema
Scala支持使用case class類型導入RDD轉換爲DataFrame,經過case class建立schema,case class的參數名稱會被利用反射機制做爲列名。這種RDD能夠高效的轉換爲DataFrame並註冊爲表。sql
代碼以下:shell
Java版本
package com.hzk.sparksql;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
public class ReflectTransform {
public static void main(String[] args) { SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate(); JavaRDD<String> lines=spark.read().textFile("D:\\Bigdata\\20.sparksql\\二、以編程方式執行sparksql\\person.txt").javaRDD();
JavaRDD<Person> rowRDD = lines.map(line -> { String parts[] = line.split(" "); return new Person(Integer.valueOf(parts[0]),parts[1],Integer.valueOf(parts[2])); });
Dataset<Row> df = spark.createDataFrame(rowRDD, Person.class); // df.select("id", "name", "age"). // coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); df.foreach(new ForeachFunction<Row>() { @Override public void call(Row row) throws Exception { System.out.println("id:"+row.get(0)+",name:"+row.get(1)+",age:"+row.get(2)); } }); }
static class Person implements Serializable { private int id; private String name; private int age;
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public Person(int id, String name, int age) { this.id = id; this.name = name; this.age = age;
} } } |
Scala版本
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession}
/** * RDD轉化成DataFrame:利用反射機制 */ //todo:定義一個樣例類Person case class Person(id:Int,name:String,age:Int)
object CaseClassSchema {
def main(args: Array[String]): Unit = { //todo:一、構建sparkSession 指定appName和master的地址 val spark: SparkSession = SparkSession.builder() .appName("CaseClassSchema") .master("local[2]").getOrCreate() //todo:2、從sparkSession獲取sparkContext對象 val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN")//設置日誌輸出級別 //todo:3、加載數據 val dataRDD: RDD[String] = sc.textFile("D:\\person.txt") //todo:4、切分每一行記錄 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) //todo:5、將RDD與Person類關聯 val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //todo:6、建立dataFrame,須要導入隱式轉換 import spark.implicits._ val personDF: DataFrame = personRDD.toDF()
//todo-------------------DSL語法操做 start-------------- //一、顯示DataFrame的數據,默認顯示20行 personDF.show() //二、顯示DataFrame的schema信息 personDF.printSchema() //三、顯示DataFrame記錄數 println(personDF.count()) //四、顯示DataFrame的全部字段 personDF.columns.foreach(println) //五、取出DataFrame的第一行記錄 println(personDF.head()) //六、顯示DataFrame中name字段的全部值 personDF.select("name").show() //七、過濾出DataFrame中年齡大於30的記錄 personDF.filter($"age" > 30).show() //八、統計DataFrame中年齡大於30的人數 println(personDF.filter($"age">30).count()) //九、統計DataFrame中按照年齡進行分組,求每一個組的人數 personDF.groupBy("age").count().show() //todo-------------------DSL語法操做 end------------- //todo--------------------SQL操做風格 start----------- //todo:將DataFrame註冊成表 personDF.createOrReplaceTempView("t_person") //todo:傳入sql語句,進行操做 spark.sql("select * from t_person").show()
spark.sql("select * from t_person where name='zhangsan'").show()
spark.sql("select * from t_person order by age desc").show() //todo--------------------SQL操做風格 end------------- sc.stop() spark.stop() } } |
1.2. 經過StructType直接指定Schema
當case class不能提早定義好時,能夠經過如下三步建立DataFrame數據庫
(1)將RDD轉爲包含Row對象的RDDapache
(2)基於StructType類型建立schema,與第一步建立的RDD相匹配編程
(3)經過sparkSession的createDataFrame方法對第一步的RDD應用schema建立DataFrameapi
Java版本
package com.hzk.sparksql;
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;
import java.io.Serializable; import java.util.ArrayList;
public class DynamicTransform { public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate(); JavaRDD<String> lines=spark.read().textFile("D:\\Bigdata\\20.sparksql\\二、以編程方式執行sparksql\\person.txt").javaRDD();
JavaRDD<Row> personMaps=lines.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { String[] personString=s.split(" "); return RowFactory.create(Integer.valueOf(personString[0]),personString[1],Integer.valueOf(personString[2])); } }); ArrayList<StructField> fields = new ArrayList<StructField>(); StructField field = null; field = DataTypes.createStructField("id", DataTypes.IntegerType, true); fields.add(field); field = DataTypes.createStructField("name", DataTypes.StringType, true); fields.add(field); field = DataTypes.createStructField("age", DataTypes.IntegerType, true); fields.add(field);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = spark.createDataFrame(personMaps, schema); df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1"); df.foreach(new ForeachFunction<Row>() { @Override public void call(Row row) throws Exception { System.out.println("id:"+row.get(0)+",name:"+row.get(1)+",age:"+row.get(2)); } }); }
} |
Scala版本app
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/** * RDD轉換成DataFrame:經過指定schema構建DataFrame */ object SparkSqlSchema { def main(args: Array[String]): Unit = { //todo:1、建立SparkSession,指定appName和master val spark: SparkSession = SparkSession.builder() .appName("SparkSqlSchema") .master("local[2]") .getOrCreate() //todo:2、獲取sparkContext對象 val sc: SparkContext = spark.sparkContext //todo:3、加載數據 val dataRDD: RDD[String] = sc.textFile("d:\\person.txt") //todo:4、切分每一行 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) //todo:5、加載數據到Row對象中 val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //todo:6、建立schema val schema:StructType= StructType(Seq( StructField("id", IntegerType, false), StructField("name", StringType, false), StructField("age", IntegerType, false) ))
//todo:7、利用personRDD與schema建立DataFrame val personDF: DataFrame = spark.createDataFrame(personRDD,schema)
//todo:8、DSL操做顯示DataFrame的數據結果 personDF.show()
//todo:9、將DataFrame註冊成表 personDF.createOrReplaceTempView("t_person") //todo:10、sql語句操做 spark.sql("select * from t_person").show()
spark.sql("select count(*) from t_person").show()
sc.stop() spark.stop() } } |
2. 編寫Spark SQL程序操做HiveContext
HiveContext是對應spark-hive這個項目,與hive有部分耦合, 支持hql,是SqlContext的子類,在Spark2.0以後,HiveContext和SqlContext在SparkSession進行了統一,能夠經過操做SparkSession來操做HiveContext和SqlContext。
2.1. 添加pom依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.3</version> </dependency> |
2.2. 代碼實現
package gec.sql
import org.apache.spark.sql.SparkSession /** * todo:Sparksql操做hive的sql */ object HiveSupport { def main(args: Array[String]): Unit = { //todo:1、建立sparkSession val spark: SparkSession = SparkSession.builder() .appName("HiveSupport") .master("local[2]") .config("spark.sql.warehouse.dir", "d:\\spark-warehouse") .enableHiveSupport() //開啓支持hive .getOrCreate() spark.sparkContext.setLogLevel("WARN") //設置日誌輸出級別
//todo:2、操做sql語句 spark.sql("CREATE TABLE IF NOT EXISTS person (id int, name string, age int) row format delimited fields terminated by ' '") spark.sql("LOAD DATA LOCAL INPATH './data/student.txt' INTO TABLE person") spark.sql("select * from person ").show() spark.stop() } } 須要在當前項目下建立一個data目錄,而後在data目錄下建立一個student.txt數據文件。 |
3.編寫Spark SQL程序操做Mysql
1. JDBC
Spark SQL能夠經過JDBC從關係型數據庫中讀取數據的方式建立DataFrame,經過對DataFrame一系列的計算後,還能夠將數據再寫回關係型數據庫中。
1.1. SparkSql從MySQL中加載數據
1.1.1 經過IDEA編寫SparkSql代碼
Java版本
public static void dataFromMysql() { SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate(); Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "123456"); //todo:三、讀取mysql中的數據 Dataset<Row> df = spark.read().jdbc("jdbc:mysql://localhost:3306/baidu", "student", properties); df.show(); } |
Scala版本
package gec.sql import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} /** * todo:Sparksql從mysql中加載數據 */ object DataFromMysql { def main(args: Array[String]): Unit = { //todo:1、建立sparkSession對象 val spark: SparkSession = SparkSession.builder() .appName("DataFromMysql") .master("local[2]") .getOrCreate() //todo:2、建立Properties對象,設置鏈接mysql的用戶名和密碼 val properties: Properties =new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //todo:3、讀取mysql中的數據 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.200.100:3306/spark","iplocation",properties) //todo:4、顯示mysql中表的數據 mysqlDF.show() spark.stop() } } |
執行查看效果:

1.1.2 經過spark-shell運行
(1)、啓動spark-shell(必須指定mysql的鏈接驅動包)
spark-shell \ --master spark://node1:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \ --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar |
(2)、從mysql中加載數據
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load() |
(3)、執行查詢

1.2. SparkSql將數據寫入到MySQL中
1.2.1 經過IDEA編寫SparkSql代碼
(1)編寫代碼
Java版本
public static void sparkSqlToMysql() { SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate(); Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "123456"); JavaRDD<String> lines = spark.read().textFile("D:\\Bigdata\\20.sparksql\\二、以編程方式執行sparksql\\person.txt").javaRDD(); JavaRDD<Person> personRDD = lines.map(new Function<String, Person>() { @Override public Person call(String s) throws Exception { String[] strings = s.split(" "); return new Person(Integer.valueOf(strings[0]), strings[1], Integer.valueOf(strings[2])); } }); Dataset<Row> df = spark.createDataFrame(personRDD, Person.class); df.createOrReplaceTempView("person"); Dataset<Row> resultDF = spark.sql("select * from person order by age desc"); Properties properties1 = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "123456"); resultDF.write().jdbc("jdbc:mysql://localhost:3306/baidu", "person", properties); spark.stop();
}
public static class Person implements Serializable { private int id; private String name; private int age;
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public Person(int id, String name, int age) { this.id = id; this.name = name; this.age = age;
} }
|
Scala版本
package gec.sql import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} /** * todo:sparksql寫入數據到mysql中 */ object SparkSqlToMysql { def main(args: Array[String]): Unit = { //todo:1、建立sparkSession對象 val spark: SparkSession = SparkSession.builder() .appName("SparkSqlToMysql") .getOrCreate() //todo:2、讀取數據 val data: RDD[String] = spark.sparkContext.textFile(args(0)) //todo:3、切分每一行, val arrRDD: RDD[Array[String]] = data.map(_.split(" ")) //todo:4、RDD關聯Student val studentRDD: RDD[Student] = arrRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) //todo:導入隱式轉換 import spark.implicits._ //todo:5、將RDD轉換成DataFrame val studentDF: DataFrame = studentRDD.toDF() //todo:6、將DataFrame註冊成表 studentDF.createOrReplaceTempView("student") //todo:7、操做student表 ,按照年齡進行降序排列 val resultDF: DataFrame = spark.sql("select * from student order by age desc")
//todo:8、把結果保存在mysql表中 //todo:建立Properties對象,配置鏈接mysql的用戶名和密碼 val prop =new Properties() prop.setProperty("user","root") prop.setProperty("password","123456")
resultDF.write.jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
//todo:寫入mysql時,能夠配置插入mode,overwrite覆蓋,append追加,ignore忽略,error默認表存在報錯 //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop) spark.stop() } } //todo:建立樣例類Student case class Student(id:Int,name:String,age:Int) |
(2)用maven將程序打包
經過IDEA工具打包便可
(3)將Jar包提交到spark集羣
spark-submit \ --class gec.sql.SparkSqlToMysql \ --master spark://node1:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \ --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \ /root/original-spark-2.0.2.jar /person.txt |
(4)查看mysql中表的數據