浪尖 浪尖聊大數據
本文主要是幫助你們從入門到精通掌握spark sql。篇幅較長,內容較豐富建議你們收藏,仔細閱讀。java
更多大數據,spark教程,請點擊 閱讀原文 加入浪尖知識星球獲取。mysql
微信羣能夠加浪尖微信 158570986 。sql
熟悉spark sql的都知道,spark sql是從shark發展而來。Shark爲了實現Hive兼容,在HQL方面重用了Hive中HQL的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,能夠近似認爲僅將物理執行計劃從MR做業替換成了Spark做業(輔之內存列式存儲等各類和Hive關係不大的優化);
同時還依賴Hive Metastore和Hive SerDe(用於兼容現有的各類Hive存儲格式)。
Spark SQL在Hive兼容層面僅依賴HQL parser、Hive Metastore和Hive SerDe。也就是說,從HQL被解析成抽象語法樹(AST)起,就所有由Spark SQL接管了。執行計劃生成和優化都由Catalyst負責。藉助Scala的模式匹配等函數式語言特性,利用Catalyst開發執行計劃優化策略比Hive要簡潔得多。
Spark SQL
spark sql提供了多種接口:shell
純Sql 文本express
固然,相應的,也會有各類客戶端:apache
sql文本,能夠用thriftserver/spark-sql編程
編碼,Dataframe/dataset/sqljson
Dataframe/Dataset也是分佈式數據集,但與RDD不一樣的是其帶有schema信息,相似一張表。
能夠用下面一張圖詳細對比Dataset/dataframe和rdd的區別:
Dataset是在spark1.6引入的,目的是提供像RDD同樣的強類型、使用強大的lambda函數,同時使用spark sql的優化執行引擎。到spark2.0之後,DataFrame變成類型爲Row的Dataset,即爲:api
type DataFrame = Dataset[Row]
因此,不少移植spark1.6及以前的代碼到spark2+的都會報錯誤,找不到dataframe類。微信
val df = spark.read.json(「file:///opt/meitu/bigdata/src/main/data/people.json」) df.show() import spark.implicits._ df.printSchema() df.select("name").show() df.select($"name", $"age" + 1).show() df.filter($"age" > 21).show() df.groupBy("age").count().show() spark.stop()
分桶排序保存hive表 df.write.bucketBy(42,「name」).sortBy(「age」).saveAsTable(「people_bucketed」) 分區以parquet輸出到指定目錄 df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 分區分桶保存到hive表 df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")
cube sales.cube("city", "year」).agg(sum("amount")as "amount」) .show() rull up sales.rollup("city", "year」).agg(sum("amount")as "amount」).show() pivot 只能跟在groupby以後 sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount」).show()
Spark SQL容許用戶提交SQL文本,支持一下三種手段編寫sql文本:
要先聲明構建SQLContext或者SparkSession,這個是SparkSQL的編碼入口。早起的版本使用的是SQLContext或者HiveContext,spark2之後,建議使用的是SparkSession。
1. SQLContext new SQLContext(SparkContext) 2. HiveContext new HiveContext(spark.sparkContext) 3. SparkSession 不使用hive元數據: val spark = SparkSession.builder() .config(sparkConf) .getOrCreate() 使用hive元數據 val spark = SparkSession.builder() .config(sparkConf) .enableHiveSupport().getOrCreate()
使用
val df =spark.read.json("examples/src/main/resources/people.json") df.createOrReplaceTempView("people") spark.sql("SELECT * FROM people").show()
spark-sql 啓動的時候相似於spark-submit 能夠設置部署模式資源等,可使用
bin/spark-sql –help 查看配置參數。
須要將hive-site.xml放到${SPARK_HOME}/conf/目錄下,而後就能夠測試
show tables; select count(*) from student;
thriftserver jdbc/odbc的實現相似於hive1.2.1的hiveserver2,可使用spark的beeline命令來測試jdbc server。
安裝部署 1). 開啓hive的metastore bin/hive --service metastore 2). 將配置文件複製到spark/conf/目錄下 3). thriftserver sbin/start-thriftserver.sh --masteryarn --deploy-mode client 對於yarn只支持client模式 4). 啓動bin/beeline 5). 鏈接到thriftserver !connect jdbc:hive2://localhost:10001
定義一個udf很簡單,例如咱們自定義一個求字符串長度的udf。.
val len = udf{(str:String) => str.length} spark.udf.register("len",len) val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.createOrReplaceTempView("employees") ds.show() spark.sql("select len(name) from employees").show()
定義一個UDAF
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverageUDAF extends UserDefinedAggregateFunction { //Data types of input arguments of this aggregate function definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil) //Data types of values in the aggregation buffer defbufferSchema:StructType = { StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil) } //The data type of the returned value defdataType:DataType = DoubleType //Whether this function always returns the same output on the identical input defdeterministic: Boolean = true //Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to // standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides // the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still // immutable. definitialize(buffer:MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } //Updates the given aggregation buffer `buffer` with new input data from `input` defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)) { buffer(0) = buffer.getLong(0)+ input.getLong(0) buffer(1) = buffer.getLong(1)+ 1 } } // Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1` defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={ buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0) buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1) } //Calculates the final result defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1) }
使用UDAF
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.createOrReplaceTempView("employees") ds.show() spark.udf.register("myAverage", MyAverageUDAF) val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show()
定義一個Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.expressions.Aggregator case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverageAggregator extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble }
使用
spark.udf.register("myAverage2", MyAverageAggregator) import spark.implicits._ val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee] ds.show() val averageSalary = MyAverageAggregator.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show()
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
默認的是parquet,能夠經過spark.sql.sources.default,修改默認配置。
val parquetFileDF =spark.read.parquet("people.parquet") peopleDF.write.parquet("people.parquet")
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.write.mode("append").orc("/opt/outputorc/") spark.read.orc("/opt/outputorc/*").show(1)
ds.write.mode("overwrite").json("/opt/outputjson/") spark.read.json("/opt/outputjson/*").show()
spark 1.6及之前的版本使用hive表須要hivecontext。
Spark2開始只須要建立sparksession增長enableHiveSupport()便可。
val spark = SparkSession .builder() .config(sparkConf) .enableHiveSupport() .getOrCreate() spark.sql("select count(*) from student").show()
寫入mysql
wcdf.repartition(1).write.mode("append").option("user", "root") .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())
從mysql裏讀
val fromMysql = spark.read.option("user", "root") .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())
自定義source比較簡單,首先咱們要看看source加載的方式
指定的目錄下,定義一個DefaultSource類,在類裏面實現自定義source。就能夠實現咱們的目標。
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} class DefaultSource extends DataSourceV2 with ReadSupport { def createReader(options: DataSourceOptions) = new SimpleDataSourceReader() }
import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} import org.apache.spark.sql.types.{StringType, StructField, StructType} class SimpleDataSourceReader extends DataSourceReader { def readSchema() = StructType(Array(StructField("value", StringType))) def createDataReaderFactories = { val factoryList = new java.util.ArrayList[DataReaderFactory[Row]] factoryList.add(new SimpleDataSourceReaderFactory()) factoryList } }
import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} class SimpleDataSourceReaderFactory extends DataReaderFactory[Row] with DataReader[Row] { def createDataReader = new SimpleDataSourceReaderFactory() val values = Array("1", "2", "3", "4", "5") var index = 0 def next = index < values.length def get = { val row = Row(values(index)) index = index + 1 row } def close() = Unit }
使用
val simpleDf = spark.read .format("bigdata.spark.SparkSQL.DataSources") .load() simpleDf.show()
總體流程以下:
整體執行流程以下:從提供的輸入API(SQL,Dataset, dataframe)開始,依次通過unresolved邏輯計劃,解析的邏輯計劃,優化的邏輯計劃,物理計劃,而後根據cost based優化,選取一條物理計劃進行執行.
簡單化成四個部分:
1). analysis Spark 2.0 之後語法樹生成使用的是antlr4,以前是scalaparse。 2). logical optimization 常量合併,謂詞下推,列裁剪,boolean表達式簡化,和其它的規則 3). physical planning eg:SortExec 4). Codegen codegen技術是用scala的字符串插值特性生成源碼,而後使用Janino,編譯成java字節碼。Eg: SortExec
1). 實現
繼承Rule[LogicalPlan]
2). 註冊
spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)
3). 使用
selectExpr("amountPaid* 1")
spark.experimental.extraStrategies =Seq(countStrategy)
4). 使用
spark.sql("select count(*) fromtest")