Spark SQL筆記整理(三):加載保存功能與Spark SQL函數

加載保存功能

數據加載(json文件、jdbc)與保存(json、jdbc)

測試代碼以下:javascript

package cn.xpleaf.bigdata.spark.scala.sql.p1 import java.util.Properties import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{SQLContext, SaveMode} /** * SparkSQL關於加載數據和數據落地的各類實戰操做 */ object _03SparkSQLLoadAndSaveOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // readOps(sqlContext) writeOps(sqlContext) sc.stop() } /** * 在write結果到目錄中的時候須要留意相關異常 * org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists * 若是還想使用該目錄的話,就須要設置具體的保存模式SaveMode * ErrorIfExist * 默認的,目錄存在,拋異常 * Append * 追加 * Ingore * 忽略,至關於不執行 * Overwrite * 覆蓋 */ def writeOps(sqlContext:SQLContext): Unit = { val df = sqlContext.read.json("D:/data/spark/sql/people.json") df.registerTempTable("people") val retDF = sqlContext.sql("select * from people where age > 20") // retDF.show() // 將結果落地 //retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json") // 落地到數據庫 val url = "jdbc:mysql://localhost:3306/test" val table = "people1" // 會從新建立一張新表 val properties = new Properties() properties.put("user", "root") properties.put("password", "root") retDF.coalesce(1).write.jdbc(url, table, properties) } /* // sparkSQL讀數據 // java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file sparkSQL使用read.load加載的默認文件格式爲parquet(parquet.apache.org) 加載其它文件格式怎麼辦? 須要指定加載文件的格式.format("json") */ def readOps(sqlContext:SQLContext): Unit = { // val df = sqlContext.read.load("D:/data/spark/sql/users.parquet") // val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json") // val df = sqlContext.read.json("D:/data/spark/sql/people.json") val url = "jdbc:mysql://localhost:3306/test" val table = "people" val properties = new Properties() properties.put("user", "root") properties.put("password", "root") val df = sqlContext.read.jdbc(url, table, properties) df.show() } }

當執行讀操做時,輸出結果以下:html

+---+----+---+------+
| id|name|age|height| +---+----+---+------+ | 1| 小甜甜| 18| 168.0| | 2| 小丹丹| 19| 167.0| | 3| 大神| 25| 181.0| | 4| 團長| 38| 158.0| | 5| 記者| 22| 169.0| +---+----+---+------+

當執行寫操做時:java

1.若是保存到json文件
注意有各類寫模式,另外其保存的是一個目錄,與HDFS兼容的目錄格式

2.若是保存到jdbc
則會在數據庫中建立一個DataFrame所包含列的表,注意該表不能存在

Spark SQL和Hive的集成

須要先啓動Hive,而後再進行下面的操做。node

代碼編寫

測試代碼以下:mysql

package cn.xpleaf.bigdata.spark.scala.sql.p2

import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext /** * 經過建立HiveContext來操做Hive中表的數據 * 數據源: * teacher_info.txt * name(String) height(double) * zhangsan,175 * lisi,180 * wangwu,175 * zhaoliu,195 * zhouqi,165 * weiba,185 * * create table teacher_info( * name string, * height double * ) row format delimited * fields terminated by ','; * * teacher_basic.txt * name(String) age(int) married(boolean) children(int) * zhangsan,23,false,0 * lisi,24,false,0 * wangwu,25,false,0 * zhaoliu,26,true,1 * zhouqi,27,true,2 * weiba,28,true,3 * * create table teacher_basic( * name string, * age int, * married boolean, * children int * ) row format delimited * fields terminated by ','; * * * 需求: *1.經過sparkSQL在hive中建立對應表,將數據加載到對應表 *2.執行sparkSQL做業,計算teacher_info和teacher_basic的關聯信息,將結果存放在一張表teacher中 * * 在集羣中執行hive操做的時候,須要如下配置: * 一、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下 二、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄 export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar */ object _01HiveContextOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() // .setMaster("local[2]") .setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) //建立teacher_info表 hiveContext.sql("CREATE TABLE teacher_info(" + "name string, " + "height double) " + "ROW FORMAT DELIMITED " + "FIELDS TERMINATED BY ','") hiveContext.sql("CREATE TABLE teacher_basic(" + "name string, " + "age int, " + " married boolean, " + "children int) " + "ROW FORMAT DELIMITED " + "FIELDS TERMINATED BY ','") // 向表中加載數據 hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info") hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic") //第二步操做 計算兩張表的關聯數據 val joinDF = hiveContext.sql("SELECT " + "b.name, " + "b.age, " + "if(b.married, '已婚', '未婚') as married, " + "b.children, " + "i.height " + "FROM teacher_info i " + "INNER JOIN teacher_basic b ON i.name = b.name") joinDF.collect().foreach(println) joinDF.write.saveAsTable("teacher") sc.stop() } }

打包、上傳與配置

打包後上傳到集羣環境中,而後針對Spark作以下配置:sql

在集羣中執行hive操做的時候,須要如下配置:
    一、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
    二、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄 export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar

提交spark做業

使用的spark提交做業的腳本以下:shell

[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh 
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop /home/uplooking/app/spark/bin/spark-submit \ --class $2 \ --master spark://uplooking02:7077 \ --executor-memory 1G \ --num-executors 1 \ $1 \

執行以下命令:數據庫

./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps

驗證

能夠在做業執行的輸出結果有看到咱們指望的輸出,也能夠直接在Hive中操做來進行驗證:apache

hive> show tables; OK hpeople people t1 teacher teacher_basic teacher_info Time taken: 0.03 seconds, Fetched: 6 row(s) hive> select * from teacher; OK zhangsan 23 未婚 0 175.0 lisi 24 未婚 0 180.0 wangwu 25 未婚 0 175.0 zhaoliu 26 已婚 1 195.0 zhouqi 27 已婚 2 165.0 weiba 28 已婚 3 185.0 Time taken: 0.369 seconds, Fetched: 6 row(s)

Spark和ES的集成

須要確保ElasticSearch環境已經搭建好。json

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._

/** * Spark和ES的集成操做 * 引入Spark和es的maven依賴 * elasticsearch-hadoop * 2.3.0 * 將account.json加載到es的索引庫spark/account * 能夠參考官方文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html */ object _02SparkElasticSearchOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_02SparkElasticSearchOps.getClass().getSimpleName) .setMaster("local[2]") /** * Spark和es的集成配置 */ conf.set("es.index.auto.create", "true") conf.set("es.nodes", "uplooking01") conf.set("es.port", "9200") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // write2ES(sqlContext) readFromES(sc) sc.stop() } /** * 從es中讀數據 * (使用sparkContext進行操做) */ def readFromES(sc:SparkContext): Unit = { val resources = "spark/account" // 索引庫/類型 val jsonRDD = sc.esJsonRDD(resources) jsonRDD.foreach(println) } /** * 向es中寫入數據 * (使用sqlContext進行操做) */ def write2ES(sqlContext:SQLContext): Unit = { val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json") val resources = "spark/account" // 索引庫/類型 jsonDF.saveToEs(resources) } }

Spark SQL函數

概述(Spark 1.5.X ~ 1.6.X的內置函數)

使用Spark SQL中的內置函數對數據進行分析,Spark SQL API不一樣的是,DataFrame中的內置函數操做的結果是返回一個Column對象,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就爲數據的複雜分析創建了堅實的基礎並提供了極大的方便性,例如說,咱們在操做DataFrame的方法中能夠隨時調用內置函數進行業務須要的處理,這之於咱們構建附件的業務邏輯而言是能夠極大的減小沒必要須的時間消耗(基於上就是實際模型的映射),讓咱們聚焦在數據分析上,這對於提升工程師的生產力而言是很是有價值的Spark 1.5.x開始提供了大量的內置函數,還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan

整體上而言內置函數包含了五大基本類型:

一、聚合函數,例如countDistinct、sumDistinct等; 二、集合函數,例如sort_array、explode等 三、日期、時間函數,例如hour、quarter、next_day 四、數學函數,例如asin、atan、sqrt、tan、round等; 五、開窗函數,例如rowNumber等 六、字符串函數,concat、format_number、rexexp_extract 七、其它函數,isNaN、sha、randn、callUDF 如下爲Hive中的知識內容,可是顯然Spark SQL也有一樣的概念 UDF 用戶自定義函數:User Definded Function 一路輸入,一路輸出 a--->A strlen("adbad")=5 UDAF 用戶自定義聚合函數:User Definded Aggregation Function 多路輸入,一路輸出 sum(a, b, c, d)---->彙總的結果 表函數 UDTF:用戶自定義表函數:User Definded Table Function 多路輸入,多路輸出 "hello you" "hello me" ---->轉換操做,----->split("")---->Array[] ["hello, "you"]---> "hello" "you" ---->行列轉換

一個基本的案例以下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /** * SparkSQL 內置函數操做 */ object _03SparkSQLFunctionOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val pdf = sqlContext.read.json("D:/data/spark/sql/people.json") pdf.show() pdf.registerTempTable("people") // 統計人數 sqlContext.sql("select count(1) from people").show() // 統計最小年齡 sqlContext.sql("select age, " + "max(age) as max_age, " + "min(age) as min_age, " + "avg(age) as avg_age, " + "count(age) as count " + "from people group by age order by age desc").show() sc.stop() } }

輸出結果以下:

+---+------+-------+
|age|height| name| +---+------+-------+ | 10| 168.8|Michael| | 30| 168.8| Andy| | 19| 169.8| Justin| | 32| 188.8| Jack| | 10| 158.8| John| | 19| 179.8| Domu| | 13| 179.8| 袁帥| | 30| 175.8| 殷傑| | 19| 179.9| 孫瑞| +---+------+-------+ 18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1 +---+ |_c0| +---+ | 9| +---+ 18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1 +---+-------+-------+-------+-----+ |age|max_age|min_age|avg_age|count| +---+-------+-------+-------+-----+ | 32| 32| 32| 32.0| 1| | 30| 30| 30| 30.0| 2| | 19| 19| 19| 19.0| 3| | 13| 13| 13| 13.0| 1| | 10| 10| 10| 10.0| 2| +---+-------+-------+-------+-----+

Spark SQL開窗函數

一、Spark 1.5.x版本之後,在Spark SQL和DataFrame中引入了開窗函數,好比最經典的就是咱們的row_number(),可讓咱們實現分組取topn的邏輯。

二、作一個案例進行topn的取值(利用Spark的開窗函數),不知道同窗們是否還有印象,咱們以前在最先的時候,作過topn的計算,當時是很是麻煩的。可是如今用了Spark SQL以後,很是方便。

Spark SQL之UDF操做

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * SparkSQL 內置函數操做 */ object _04SparkSQLFunctionOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * hive中的用戶自定義函數UDF操做(即在SparkSQL中類比hive來進行操做,由於hive和SparkSQL都是交互式計算) * 1.建立一個普通的函數 * 2.註冊(在SqlContext中註冊) * 3.直接使用便可 * * 案例:建立一個獲取字符串長度的udf */ // 1.建立一個普通的函數 def strLen(str:String):Int = str.length // 2.註冊(在SqlContext中註冊) sqlContext.udf.register[Int, String]("myStrLen", strLen) val list = List("Hello you", "Hello he", "Hello me") // 將RDD轉換爲DataFrame val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => { Row(word) }) val scheme = StructType(List( StructField("word", DataTypes.StringType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("test") // 3.直接使用便可 sqlContext.sql("select word, myStrLen(word) from test").show() sc.stop() } }

輸出結果以下:

+-----+---+
| word|_c1| +-----+---+ |Hello| 5| | you| 3| |Hello| 5| | he| 2| |Hello| 5| | me| 2| +-----+---+

Spark SQL之wordcount操做

測試代碼以下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Row, SQLContext} /** * 這兩部分都比較重要: * 1.使用SparkSQL完成單詞統計操做 * 2.開窗函數使用 */ object _05SparkSQLFunctionOps2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val list = List("Hello you", "Hello he", "Hello me") // 將RDD轉換爲DataFrame val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => { Row(line) }) val scheme = StructType(List( StructField("line", DataTypes.StringType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("test") df.show() // 執行wordcount val sql = "select t.word, count(1) as count " + "from " + "(select " + "explode(split(line, ' ')) as word " + "from test) as t " + "group by t.word order by count desc" sqlContext.sql(sql).show() sc.stop() } }

輸出結果以下:

+---------+
| line| +---------+ |Hello you| | Hello he| | Hello me| +---------+ +-----+-----+ | word|count| +-----+-----+ |Hello| 3| | me| 1| | he| 1| | you| 1| +-----+-----+


原文連接:http://blog.51cto.com/xpleaf/2114584
相關文章
相關標籤/搜索