第0章 預備知識0.1 Scala0.1.1 Scala 操做符0.1.2 拉鍊操做0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享變量0.3 Spark SQL0.3.1 RDD、DataFrame 與 DataSet0.3.2 DataSet 與 RDD 互操做0.3.3 RDD、DataFrame 與 DataSet 之間的轉換0.3.4 用戶自定義聚合函數(UDAF)0.3.5 開窗函數0.4 Spark Streaming0.4.1 Dstream transformation 算子概覽0.4.2 Dstream updataStateByKey 算子概覽0.4.3 窗口操做0.4.4 Receiver 與 Direct0.5 Java0.5.1 對象池php
object ListDemo01 {
def main(args: Array[String]): Unit = {
// 說明
// 1. 在默認狀況下 List 是 scala.collection.immutable.List 即不可變
// 2. 在 scala 中,List 就是不可變的,如須要使用可變的 List,則須要使用 ListBuffer
// 3. List 在 package object scala 中作了聲明 val List = scala.collection.immutable.List
// 4. val Nil = scala.collection.immutable.Nil // List()
val list01 = List(1, 2, 3, "Hello") // 建立時,直接分配元素
println(list01) // List(1, 2, 3, Hello)
val list02 = Nil // 空集合
println(list02) // List()
// 訪問 List 的元素
val value1 = list01(1) // 1是索引,表示取出第2個元素
println("value1=" + value1) // 2
println("====================list追加元素後的效果====================")
// 經過 :+ 和 +: 給 list 追加元素(自己的集合並無變化)
val list1 = List(1, 2, 3, "abc")
// :+ 運算符表示在列表的最後增長數據
val list2 = list1 :+ 4 // (1,2,3,"abc", 4)
println(list1) // list1 沒有變化 (1, 2, 3, "abc"),說明 list1 仍是不可變
println(list2) // 新的列表結果是 [1, 2, 3, "abc", 4]
val list3 = 10 +: list1 // (10, 1, 2, 3, "abc")
println("list3=" + list3)
// :: 符號的使用
val list4 = List(1, 2, 3, "abc")
// 說明 val list5 = 4 :: 5 :: 6 :: list4 :: Nil 步驟:
// 1. List()
// 2. List(List(1, 2, 3, "abc"))
// 3. List(6, List(1, 2, 3, "abc"))
// 4. List(5, 6, List(1, 2, 3, "abc"))
// 5. List(4, 5, 6, List(1, 2, 3, "abc"))
val list5 = 4 :: 5 :: 6 :: list4 :: Nil
println("list5=" + list5)
// ::: 符號的使用
// 說明 val list6 = 4 :: 5 :: 6 :: list4 ::: Nil 步驟:
// 1. List()
// 2. List(1, 2, 3, "abc")
// 3. List(6, 1, 2, 3, "abc")
// 4. List(5, 6, 1, 2, 3, "abc")
// 5. List(4, 5, 6, 1, 2, 3, "abc")
// 下面等價 4 :: 5 :: 6 :: list4
val list6 = 4 :: 5 :: 6 :: list4 ::: Nil
println("list6=" + list6)
}
}
輸出結果以下:css
List(1, 2, 3, Hello)
List()
value1=2
====================list追加元素後的效果====================
List(1, 2, 3, abc)
List(1, 2, 3, abc, 4)
list3=List(10, 1, 2, 3, abc)
list5=List(4, 5, 6, List(1, 2, 3, abc))
list6=List(4, 5, 6, 1, 2, 3, abc)
把一對集合 A 和 B 的包含的元素合成到一個集合中:java
object zipTest01 {
def main(args: Array[String]): Unit = {
val prices1 = List(5.0, 20.0, 9.95)
val quantities1 = List(10, 2, 1)
println(prices1.zip(quantities1))
println("----------------------------------")
val prices2 = List(5.0, 20.0, 9.95)
val quantities2 = List(10, 2)
println(prices2.zip(quantities2))
println("----------------------------------")
val prices3 = List(5.0, 20.0, 9.95)
val quantities3 = List(10, 2)
println(prices3.zipAll(quantities3, 9.95, 1))
println("----------------------------------")
val prices4 = List(5.0, 20.0, 9.95)
val quantities4 = List(10, 2, 1)
println(prices4.zipWithIndex)
println(quantities4.zipWithIndex)
}
}
運行結果:mysql
List((5.0,10), (20.0,2), (9.95,1))
----------------------------------
List((5.0,10), (20.0,2))
----------------------------------
List((5.0,10), (20.0,2), (9.95,1))
----------------------------------
List((5.0,0), (20.0,1), (9.95,2))
List((10,0), (2,1), (1,2))
這個方法之因此叫「拉鍊(zip)」,是由於它就像拉鍊的齒狀結構同樣將兩個集合結合在一塊兒。注意
:若是一個集合比另外一個集合短, 那麼結果中的對偶數量和較短的那個集合的元素數量相同。程序員
zipAll 方法可讓你指定較短列表的缺省值。
zipWithIndex 方法返回對偶的列表,其中每一個對偶中第二個組成部分是每一個元素的下標。算法
Spark 很是重要的一個功能特性就是能夠將 RDD 持久化在內存中,當對 RDD 執行持久化操做時,每一個節點都會將本身操做的 RDD 的 partition 持久化到內存中,而且在以後對該 RDD 的反覆使用中,直接使用內存的 partition。這樣的話,對於針對一個 RDD 反覆執行多個操做的場景, 就只要對 RDD 計算一次便可,後面直接使用該 RDD,而不須要反覆計算屢次該 RDD
。
巧妙使用 RDD 持久化,甚至在某些場景下,能夠將 Spark 應用程序的性能提升 10 倍。對於迭代式算法和快速交互式應用來講,RDD 持久化是很是重要的。
例如,讀取一個有着數十萬行數據的 HDFS 文件,造成 linesRDD,這一讀取過程會消耗大量時間,在 count 操做結束後,linesRDD 會被丟棄,會被後續的數據覆蓋,當第二次再次使用 count 時,又須要從新讀取 HDFS 文件數據,再次造成新的 linesRDD,這會致使反覆消耗大量時間,會嚴重下降系統性能。
若是在讀取完成後將 linesRDD 緩存起來,那麼下一次執行 count 操做時將會直接使用緩存起來的 linesRDD,這會節省大量的時間。
要持久化一個 RDD,只要調用其 cache() 或者 persist() 方法便可。在該 RDD 第一次被計算出來時,就會直接緩存在每一個節點中,並且 Spark 的持久化機制仍是自動容錯
的,若是持久化的 RDD 的任何 partition 丟失了,那麼 Spark 會自動經過其源 RDD,使用 transformation 操做從新計算該 partition。
cache() 和 persist() 的區別在於,cache() 是 persist() 的一種簡化方式,cache() 的底層就是調用的 persist() 的無參版本,同時就是調用 persist(MEMORY_ONLY),將輸入持久化到內存中
。若是須要從內存中清除緩存,那麼可使用 unpersist() 方法。
Spark 本身也會在 shuffle 操做時,進行數據的持久化,好比寫入磁盤,主要是爲了在節點失敗時,避免須要從新計算整個過程。
sql
Spark 一個很是重要的特性就是共享變量
。
默認狀況下,若是在一個算子的函數中使用到了某個外部的變量,那麼這個變量的值會被拷貝到每一個 task 中
,此時每一個 task 只能操做本身的那份變量副本。若是多個 task 想要共享某個變量,那麼這種方式是作不到的。
Spark 爲此提供了兩種共享變量,一種是 Broadcast Variable(廣播變量)
,另外一種是 Accumulator(累加變量)
。Broadcast Variable 會將用到的變量,僅僅爲每一個節點拷貝一份
,更大的用途是優化性能,減小網絡傳輸以及內存損耗。Accumulator 則能夠讓多個 task 共同操做一份變量
,主要能夠進行累加操做。Broadcast Variable 是共享讀變量,task 不能去修改它,而 Accumulator 可讓多個 task 操做一個變量。數據庫
1.廣播變量
廣播變量容許程序員在每一個機器上保留緩存的只讀變量,而不是給每一個任務發送一個副本
。例如,可使用它們以有效的方式爲每一個節點提供一個大型輸入數據集的副本。Spark 還嘗試使用高效的廣播算法分發廣播變量,以下降通訊成本。
Spark action 被劃分爲多個 Stages,被多個 「shuffle」 操做(寬依賴)所分割。Spark 自動廣播每一個階段任務所需的公共數據(一個 Stage 中多個 task 使用的數據),以這種方式廣播的數據以序列化形式緩存,並在運行每一個任務以前反序列化
。這意味着,顯式建立廣播變量僅在跨多個階段的任務須要相同數據或者以反序列化格式緩存數據很重要時纔有用。
Spark 提供的 Broadcast Variable 是只讀的
,而且在每一個節點上只會有一個副本,而不會爲每一個 task 都拷貝一份副本,所以,它的最大做用,就是減小變量到各個節點的網絡傳輸消耗,以及在各個節點上的內存消耗
。此外,Spark 內部也使用了高效的廣播算法來減小網絡消耗。
能夠經過調用 SparkContext 的 broadcast() 方法來針對每一個變量建立廣播變量。而後在算子的函數內,使用到廣播變量時,每一個節點只會拷貝一份副本了,每一個節點可使用廣播變量的 value() 方法獲取值。
2.累加器
累加器(accumulator):Accumulator 是僅僅被相關操做累加的變量
,所以能夠在並行中被有效地支持。它們可用於實現計數器
(如 MapReduce)或總和計數
。
Accumulator 是存在於 Driver 端的,從節點不斷把值發到 Driver 端,在 Driver端計數
(Spark UI 在 SparkContext 建立時被建立, 即在 Driver 端被建立,所以它能夠讀取 Accumulator 的數值),存在於 Driver 端的一個值,從節點是讀取不到的
。
Spark 提供的 Accumulator 主要用於多個節點對一個變量進行共享性的操做
。
Accumulator 只提供了累加的功能,可是卻給咱們提供了多個 task 對於同一個變量並行操做的功能,可是 task 只能對 Accumulator 進行累加操做,不能讀取它的值
,只有 Driver 程序能夠讀取 Accumulator 的值。
自定義累加器類型的功能在 1.X 版本中就已經提供了,可是使用起來比較麻煩,在 2.0 版本後, 累加器的易用性有了較大的改進,並且官方還提供了一個新的抽象類:AccumulatorV2
來提供更加友好的自定義類型累加器的實現方式。
官方同時給出了一個實現的示例: CollectionAccumulator 類, 這個類容許以集合的形式收集 spark 應用執行過程當中的一些信息。例如,咱們能夠用這個類收集 Spark 處理數據時的一些細節,固然,因爲累加器的值最終要匯聚到 driver 端,爲了不 driver 端的 outofmemory 問題,須要對收集的信息的規模要加以控制,不宜過大。express
package com.atguigu.session
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
/**
* 自定義累加器
*/
class SessionStatisticAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]]() {
// 自定義累加器:要求要在類的裏面維護一個 mutable.HashMap 結構
val countMap = new mutable.HashMap[String, Int]()
// 判斷累加器是否爲空
override def isZero: Boolean = {
this.countMap.isEmpty
}
// 複製一個如出一轍的累加器
override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
val acc = new SessionStatisticAccumulator
acc.countMap ++= this.countMap // 將兩個 Map 拼接在一塊兒
acc
}
// 重置累加器
override def reset(): Unit = {
this.countMap.clear()
}
// 向累加器中添加 KV 對(K 存在,V 累加1,K 不存在,從新建立)
override def add(k: String): Unit = {
if (!this.countMap.contains(k)) {
this.countMap += (k -> 0)
}
this.countMap.update(k, this.countMap(k) + 1)
}
// 兩個累加器進行合併(先判斷兩個累加器是不是同一類型的,再將兩個 Map 進行合併(是個小難點))
override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
other match {
// (1 : 100).foldLeft(0) 等價於 (0 : (1 to 100))(_+_) 又等價於 { case (int1, int2) => int1 + int2 }
// acc.countMap.foldLeft(this.countMap) 等價於 this.countMap : acc.countMap 又等價於 this.countMap 和 acc.countMap 的每個 KV 作操做
case acc: SessionStatisticAccumulator => acc.countMap.foldLeft(this.countMap) {
case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0) + v))
}
}
}
override def value: mutable.HashMap[String, Int] = {
this.countMap
}
}
一、RDD
RDD,全稱爲 Resilient Distributed Datasets,即分佈式數據集,是 Spark 中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素能夠並行計算的集合。在 Spark 中,對數據的全部操做不外乎建立 RDD、轉化已有 RDD 以及調用 RDD 操做進行求值
。每一個 RDD 都被分爲多個分區, 這些分區運行在集羣中的不一樣的節點上。RDD 能夠包含 Python、Java、Scala 中任意類型的對象,甚至能夠包含用戶自定義的對象。RDD 具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD 容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高查詢速度
。
RDD 支持兩種操做:transformation
操做和 action
操做。RDD 的 transformation 操做是返回一個新的 RDD 的操做,好比 map 和 filter(),而 action 操做則是向驅動器程序返回結果或者把結果寫入外部系統的操做,好比 count()和 first()。apache
二、DataFrame
DataFrame 是一個分佈式數據容器
。相比於 RDD,DataFrame 更像傳統數據庫中的二維表格,除了數據以外,還記錄數據的結構信息,即 schema。同時,與 Hive 相似,DataFrame 也支持嵌套數據類型(struct,array 和 map)。從 API 易用性的角度上看,DataFrame API 提供的是一套高層的關係操做,比函數式的 RDD API 要更加友好,門檻更低。因爲與 R 和 Pandas 中的 DataFrame 相似, Spark DataFrame 很好地繼承了傳統單機數據分析的開放和體驗。
三、DataSet
DataSet 是 DataFrame API 的一個拓展,是 Spark 最新的數據抽象。DataSet 具備用戶友好的 API 風格,既具備類型安全檢查也具備 DataFrame 的查詢優化特性。
DataSet 支持編解碼器,當須要訪問非堆上的數據時能夠避免反序列化整個對象,提升了效率。
樣例類被用來在 DataSet 中定義數據的結構信息,樣例類中每一個屬性的名稱直接映射到 DataSet 中的字段名稱。
DataSet 是強類型的
。好比能夠有 DataSet[Car],DataSet[Person]。
DataFrame 只知道字段,可是不知道字段的類型
,因此在執行這些操做的時候是沒有辦法在編譯的時候檢查是否類型失敗的,好比你能夠對一個 String 類型進行加減法操做,在執行的時候纔會報錯,而 DataSet 不只僅知道字段,並且知道字段類型,因此有更爲嚴格的錯誤檢查
。就像 JSON 對象和類對象之間的類比。
介紹一下 Spark 將 RDD 轉換成 DataFrame 的兩種方式:
1.經過反射獲取 Schema:使用 case class 的方式,不過在 scala 2.10 中最大支持 22 個字段的 case class,這點須要注意;
2.經過編程獲取 Schema:經過 spark 內部的 StructType 方式,將普通的 RDD 轉換成 DataFrame。
DataSet與RDD互操做代碼:
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
case class Person(name: String, age: Int)
object SparkRDDtoDF {
def main(agrs: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]")
conf.set("spark.sql.warehouse.dir", "file:D:\\learn\\JetBrains\\workspace_idea3\\commerce_basic\\analyse\\product\\src\\test\\")
conf.set("spark.sql.shuffle.partitions", "20")
val sparkSession = SparkSession.builder().appName("RDD to DataFrame").config(conf).getOrCreate()
// 經過代碼的方式,設置 Spark log4j 的級別
sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._
// use case class convert RDD to DataFrame
// 經過反射的方式完成 RDD 向 DataFrame 的轉換
// val peopleDF = rddToDFCase(sparkSession)
// 經過編程的方式完成 RDD 向 DataFrame 的轉換
val peopleDF = rddToDF(sparkSession)
peopleDF.show()
peopleDF.select($"name", $"age").filter($"age" > 20).show()
}
// 1. 經過編程獲取 Schema
def rddToDF(sparkSession: SparkSession): DataFrame = {
// 設置 schema 結構
val schema = StructType(Seq(StructField("name", StringType, true), StructField("age", IntegerType, true)))
val rowRDD = sparkSession.sparkContext
.textFile("file:D:\\learn\\JetBrains\\workspace_idea3\\commerce_basic\\analyse\\product\\src\\test\\people.txt", 2)
.map(x => x.split(",")).map(x => Row(x(0), x(1).trim().toInt))
sparkSession.createDataFrame(rowRDD, schema)
}
// 2. 經過反射獲取 Schema
def rddToDFCase(sparkSession: SparkSession): DataFrame = {
// 導入隱飾操做,不然 RDD 沒法調用 toDF 方法
import sparkSession.implicits._
val peopleRDD = sparkSession.sparkContext
.textFile("file:D:\\learn\\JetBrains\\workspace_idea3\\commerce_basic\\analyse\\product\\src\\test\\people.txt", 2)
.map(x => x.split(",")).map(x => Person(x(0), x(1).trim().toInt)).toDF()
peopleRDD
}
}
一、DataFrame/DataSet 轉 RDD
val rdd1=testDF.rdd
val rdd2=testDS.rdd
二、RDD 轉 DataFrame
import spark.implicits._
val testDF = rdd.map {
line =>
(line._1, line._2)
}.toDF("col1", "col2")
三、RDD 轉 DataSet
參見 DataSet 與 RDD 互操做代碼。
四、DataFrame 轉 DataSet
import spark.implicits._
val testDF = testDS.toDF
五、DataSet 轉 DataFrame
import spark.implicits._
// 定義字段名和類型
case class Coltest(col1: String, col2: Int) extends Serializable
val testDS = testDF.as[Coltest]
一、弱類型 UDAF 函數
經過繼承 UserDefinedAggregateFunction 來實現用戶自定義聚合函數。
package com.atguigu.product
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
/**
* 自定義弱類型的聚合函數(UDAF)
*/
class GroupConcatDistinct extends UserDefinedAggregateFunction {
// 設置 UDAF 函數的輸入類型爲 String
override def inputSchema: StructType = StructType(StructField("cityInfoInput", StringType) :: Nil)
// 設置 UDAF 函數的緩衝區類型爲 String
override def bufferSchema: StructType = StructType(StructField("cityInfoBuffer", StringType) :: Nil)
// 設置 UDAF 函數的輸出類型爲 String
override def dataType: DataType = StringType
// 設置一致性檢驗,若是爲 true,那麼輸入不變的狀況下計算的結果也是不變的
override def deterministic: Boolean = true
// 初始化自定義的 UDAF 函數
// 設置聚合中間 buffer 的初始值
// 須要保證這個語義:兩個初始 buffer 調用下面實現的 merge 方法後也應該爲初始 buffer,
// 即若是你初始值是1,而後你 merge 是執行一個相加的動做,兩個初始 buffer 合併以後等於 2,不會等於初始 buffer 了,
// 這樣的初始值就是有問題的,因此初始值也叫"zero value"
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
}
// 設置 UDAF 函數的緩衝區更新:實現一個字符串帶去重的拼接
// 用輸入數據 input 更新 buffer 值,相似於 combineByKey
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 緩衝中的已經拼接過的城市信息串
var cityInfoBuffer = buffer.getString(0)
// 剛剛傳遞進來的某個城市信息
val cityInfoInput = input.getString(0)
// 在這裏要實現去重的邏輯
// 判斷:以前沒有拼接過某個城市信息,那麼這裏才能夠接下去拼接新的城市信息
if (!cityInfoBuffer.contains(cityInfoInput)) {
if ("".equals(cityInfoBuffer)) {
cityInfoBuffer += cityInfoInput
} else {
// 好比 1:北京
// 1:北京,2:上海
cityInfoBuffer += "," + cityInfoInput
}
}
buffer.update(0, cityInfoBuffer)
}
// 把兩個自定義的 UDAF 函數的值合併在一塊兒
// 合併兩個 buffer, 將 buffer2 合併到 buffer1. 在合併兩個分區聚合結果的時候會被用到, 相似於 reduceByKey
// 這裏要注意該方法沒有返回值,在實現的時候是把 buffer2 合併到 buffer1 中去,你須要實現這個合併細節
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// cityInfoBuffer1: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ...
var cityInfoBuffer1 = buffer1.getString(0)
// cityInfoBuffer2: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ...
val cityInfoBuffer2 = buffer2.getString(0)
// 將 cityInfoBuffer2 中的數據帶去重的加入到 cityInfoBuffer1 中
for (cityInfo <- cityInfoBuffer2.split(",")) {
if (!cityInfoBuffer1.contains(cityInfo)) {
if ("".equals(cityInfoBuffer1)) {
cityInfoBuffer1 += cityInfo
} else {
cityInfoBuffer1 += "," + cityInfo
}
}
}
buffer1.update(0, cityInfoBuffer1)
}
// 計算並返回最終的聚合結果
override def evaluate(buffer: Row): Any = {
buffer.getString(0)
}
}
二、強類型 UDAF 函數
經過繼承 Aggregator 來實現強類型自定義聚合函數。
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
// 定義 case 類
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
/**
* 計算並返回最終的聚合結果
*/
def zero: Average = Average(0L, 0L)
/**
* 根據傳入的參數值更新 buffer 值
*/
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1 buffer
}
/**
* 合併兩個 buffer 值,將 buffer2 的值合併到 buffer1
*/
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count b1
}
/**
* 計算輸出
*/
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
/**
* 設定中間值類型的編碼器,要轉換成 case 類
* Encoders.product 是進行 scala 元組和 case 類轉換的編碼器
*/
def bufferEncoder: Encoder[Average] = Encoders.product
/**
* 設定最終輸出值的編碼器
*/
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
開窗函數與聚合函數同樣,都是對行的集合組進行聚合計算
。
開窗用於爲行定義一個窗口(這裏的窗口是指運算將要操做的行的集合),它對一組值進行操做,不須要使用 GROUP BY 子句對數據進行分組,可以在同一行中同時返回基礎行的列和聚合列
。
開窗函數的調用格式爲:函數名(列) OVER(選項)
第一大類:聚合開窗函數
-> 聚合函數(列) OVER(選項),這裏的選項能夠是 PARTITION BY 子句,但不能夠是 ORDER BY 子句。
第二大類:排序開窗函數
-> 排序函數(列) OVER(選項), 這裏的選項能夠是 ORDER BY 子句, 也能夠是 OVER(PARTITION BY 子句 ORDER BY 子句),但不能夠只是 PARTITION BY 子句。
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
val scoreDF = sparkSession.sparkContext
.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78)
))
.toDF("name", "grade", "score")
scoreDF.createOrReplaceTempView("score")
scoreDF.show()
}
執行結果以下所示:
+----+-----+-----+
|name|grade|score|
+----+-----+-----+
| a1| 1| 80|
| a2| 1| 78|
| a3| 1| 95|
| a4| 2| 74|
| a5| 2| 92|
| a6| 3| 99|
| a7| 3| 99|
| a8| 3| 45|
| a9| 3| 55|
| a10| 3| 78|
+----+-----+-----+
一、聚合開窗函數
OVER 關鍵字表示把聚合函數當成聚合開窗函數而不是聚合函數。SQL 標準容許將全部聚合函數用作聚合開窗函數。
sparkSession.sql("select name, grade, score, count(name) over() name_count from score").show()
查詢結果以下所示:
+----+-----+-----+----------+
|name|grade|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 10|
| a2| 1| 78| 10|
| a3| 1| 95| 10|
| a4| 2| 74| 10|
| a5| 2| 92| 10|
| a6| 3| 99| 10|
| a7| 3| 99| 10|
| a8| 3| 45| 10|
| a9| 3| 55| 10|
| a10| 3| 78| 10|
+----+-----+-----+----------+
在上邊的例子中,開窗函數 COUNT(*) OVER() 對於查詢結果的每一行都返回全部符合條件的行的條數。OVER 關鍵字後的括號中還常常添加選項用以改變進行聚合運算的窗口範圍
。若是 OVER 關鍵字後的括號中的選項爲空,則開窗函數會對結果集中的全部行進行聚合運算。
開窗函數的 OVER 關鍵字後括號中的可使用 PARTITION BY 子句來定義行的分區來供進行聚合計算。與 GROUP BY 子句不一樣,PARTITION BY 子句建立的分區是獨立於結果集的,建立的分區只是供進行聚合計算的,並且不一樣的開窗函數所建立的分區也不互相影響
。下面的 SQL 語句用於顯示按照班級分組後每組的人數:
sparkSession.sql("select name, grade, score, count(name) over(partition by grade) name_count from score").show()
執行結果以下所示:
+----+-----+-----+----------+
|name|grade|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 3|
| a2| 1| 78| 3|
| a3| 1| 95| 3|
| a6| 3| 99| 5|
| a7| 3| 99| 5|
| a8| 3| 45| 5|
| a9| 3| 55| 5|
| a10| 3| 78| 5|
| a4| 2| 74| 2|
| a5| 2| 92| 2|
+----+-----+-----+----------+
OVER (PARTITION BY grade) 表示對結果集按照 grade 進行分區,而且計算當前行所屬的組的聚合計算結果。在同一個 SELECT 語句中能夠同時使用多個開窗函數,並且這些開窗函數並不會相互干擾
。
二、排序開窗函數
對於排序開窗函數來說,它支持的開窗函數分別爲:ROW_NUMBER(行號)
、RANK(排名:會跳躍)
、DENSE_RANK(密集排名)
和 NTILE(分組排名)
。
sparkSession.sql("select name, grade, score, row_number() over(order by score) as row_number from score").show()
sparkSession.sql("select name, grade, score, rank() over(order by score) as rank from score").show()
sparkSession.sql("select name, grade, score, dense_rank() over(order by score) as dense_rank from score").show()
sparkSession.sql("select name, grade, score, ntile(6) over(order by score) as ntile from score").show()
執行的結果以下:
+----+-----+-----+----------+
|name|grade|score|row_number|
+----+-----+-----+----------+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 5|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 10|
+----+-----+-----+----------+
+----+-----+-----+----+
|name|grade|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 4|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 9|
+----+-----+-----+----+
+----+-----+-----+----------+
|name|grade|score|dense_rank|
+----+-----+-----+----------+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 4|
| a1| 1| 80| 5|
| a5| 2| 92| 6|
| a3| 1| 95| 7|
| a6| 3| 99| 8|
| a7| 3| 99| 8|
+----+-----+-----+----------+
+----+-----+-----+-----+
|name|grade|score|ntile|
+----+-----+-----+-----+
| a8| 3| 45| 1|
| a9| 3| 55| 1|
| a4| 2| 74| 2|
| a2| 1| 78| 2|
| a10| 3| 78| 3|
| a1| 1| 80| 3|
| a5| 2| 92| 4|
| a3| 1| 95| 4|
| a6| 3| 99| 5|
| a7| 3| 99| 6|
+----+-----+-----+-----+
看到上面的結果了吧,下面來介紹下相關的內容。咱們獲得的最終結果是按照 score 進行升序顯示的。
對於 row_number() over(order by score) as row_number
來講,這個排序開窗函數是按 score 升序的方式來排序,並得出排序結果的序號。
對於 rank() over(order by score) as rank
來講,這個排序形容函數是按 score 升序的方式來排序,並得出排序結果的排名號。這個函數求出來的排名結果能夠並列, 並列排名以後的排名將是並列的排名加上並列數(簡單說每一個人只有一種排名,而後出現兩個並列第一名的狀況,這時候排在兩個第一名後面的人將是第三名,也就是沒有了第二名,可是有兩個第一名)。
對於 dense_rank() over(order by score) as dense_rank
來講,這個排序函數是按 score 升序的方式來排序,並得出排序結果的排名號。這個函數與 rank() 函數不一樣在於,並列排名以後的排名只是並列排名加 1(簡單說每一個人只有一種排名,而後出現兩個並列第一名的狀況,這時候排在兩個第一名後面的人將是第二名,也就是兩個第一名,一個第二名)。
對於 ntile(6) over(order by score) as ntile
來講,這個排序函數是按 score 升序的方式來排序,而後 6 等分紅 6 個組, 並顯示所在組的序號。
排序函數和聚合開窗函數相似,也支持在 OVER 子句中使用 PARTITION BY 語句。例如:
sparkSession.sql("select name, grade, score, row_number() over(partition by grade order by score) as row_number from score").show()
sparkSession.sql("select name, grade, score, rank() over(partition by grade order by score) as rank from score").show()
sparkSession.sql("select name, grade, score, dense_rank() over(partition by grade order by score) as dense_rank from score").show()
sparkSession.sql("select name, grade, score, ntile(6) over(partition by grade order by score) as ntile from score").show()
須要注意一點,在排序開窗函數中使用 PARTITION BY 子句須要放置在 ORDER BY 子句以前
。
完整的測試代碼以下:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class Score(name: String, grade: Int, score: Int)
object WindowFunctionTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
val scoreDF = sparkSession.sparkContext
.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78)
))
.toDF("name", "grade", "score")
scoreDF.createOrReplaceTempView("score")
scoreDF.show()
// 一、聚合開窗函數
// sparkSession.sql("select name, grade, score, count(name) over() name_count from score").show()
// 顯示按照班級分組後每組的人數
// sparkSession.sql("select name, grade, score, count(name) over(partition by grade) name_count from score").show()
// 二、排序開窗函數
sparkSession.sql("select name, grade, score, row_number() over(order by score) as row_number from score").show()
sparkSession.sql("select name, grade, score, rank() over(order by score) as rank from score").show()
sparkSession.sql("select name, grade, score, dense_rank() over(order by score) as dense_rank from score").show()
sparkSession.sql("select name, grade, score, ntile(6) over(order by score) as ntile from score").show()
sparkSession.sql("select name, grade, score, row_number() over(partition by grade order by score) as row_number from score").show()
sparkSession.sql("select name, grade, score, rank() over(partition by grade order by score) as rank from score").show()
sparkSession.sql("select name, grade, score, dense_rank() over(partition by grade order by score) as dense_rank from score").show()
sparkSession.sql("select name, grade, score, ntile(6) over(partition by grade order by score) as ntile from score").show()
}
}
updateStateByKey 操做,可讓咱們爲每個 key 維護一個 state,並持續不斷地更新該 state。
1.首先,要定義一個 state,能夠是任意的數據類型。
2.其次,要定義 state 更新函數 -- 指定一個函數如何使用以前的 state 和新值來更新 state。
對於每一個 batch,Spark 都會爲每一個以前已經存在的 key 去應用一次 state 更新函數,不管這個 key 在 batch 中是否有新的數據。若是 state 更新函數返回 none,那麼 key 對應的 state 就會被刪除。
固然,對於每個新出現的 key,也會執行 state 更新函數。
注意
:updateStateByKey 操做,要求必須開啓 checkpoint 機制。
package com.atguigu.stream
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UpdateStateByKeyWordCount {
def main(args: Array[String]): Unit = {
// 構建 Spark 上下文
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
// 建立 Spark 客戶端
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
// 設置檢查點目錄
ssc.checkpoint("./streaming_checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCount = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
var newValue = state.getOrElse(0)
for (value <- values) {
newValue += value
}
Option(newValue)
})
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
}
Spark Streaming 提供了窗口計算,容許在數據的滑動窗口上應用轉換,下圖說明了這個滑動窗口:
這兩個參數必須是源 DStream 的 batch 間隔的倍數
(上圖中 batch 間隔爲 1)。
batch 間隔爲切割 RDD 的間隔,滑動間隔爲每隔多長時間來計算一次,窗口長度爲每次計算的數據量是多少
。
1.Receiver
Receiver 是使用 Kafka 的高層次 Consumer API 來實現的。Receiver 每隔一段 batch 時間去 Kafka 獲取那段時間最新的消息數據,Receiver 從 Kafka 獲取的數據都是存儲在 Spark Executor 的內存中的,而後 Spark Streaming 啓動的 job 會去處理那些數據
。
對於高階消費者,誰來消費分區不是由 Spark Streaming 決定的,也不是 Storm 決定的,有一個高階消費者 API, 由高階消費者決定分區向消費者的分配,即由高階消費者 API 決定消費者消費哪一個分區,而消費者讀取數據後何時提交 offset 也不是由它們本身決定的,高階消費者 API 會根據參數配置隔幾秒提交一次
。
這會引發一個問題,當 Spark Streaming 中的 Receiver 讀取 Kafka 分區數據時,假設讀取了 100 條數據,高階消費者 API 會執行 offset 的提交,例如每隔 3 秒,這 100 條數據就是 RDD,假設此 RDD 尚未處理完, 高階消費者 API 執行了 offset 提交,可是 Spark Streaming 掛掉了,因爲 RDD 在內存中,那麼 RDD 的數據就丟失了,若是想從新拿數據,從哪裏去拿不是由 Spark Streaming 說了算的,是由高階 API 決定的,因爲 offset 已經提交,高階 API 認爲這個數據 Spark Streaming 已經拿過了,再拿要拿 100 條之後的數據,那麼以前丟失的 100 條數據就永遠丟失了。
針對這一問題,Spark Streaming 設計了一個規則,即 Spark Streaming 預寫日誌規則(Write Ahead Log,WAL)
,每讀取一批數據,會寫一個 WAL 文件,在 WAL文件中,讀了多少條就寫多少條,WAL 文件存儲於 HDFS 上。假設 RDD 中有 100 條數據,那麼 WAL 文件中也有 100 條數據,此時若是 Spark Streaming 掛掉,那麼回去讀取 HDFS 上的 WAL 文件,把 WAL 文件中的 100 條數據取出再生成 RDD,而後再去消費。因爲這一設計須要寫 HDFS,會對總體性能形成影響
。
假設有 6 個分區,高階消費者的話會在 Spark 集羣的 Worker 上啓動 Receiver,有 6 個分區則會用 6 個線程去讀取分區數據,這是在一個 Worker 的一個 Receiver中有 6 個線程同時讀取 6 個分區的數據,隨着數據量愈來愈大, 數據讀取會成爲瓶頸,此時能夠建立多個 Receiver 分散讀取分區數據,而後每一個 Receiver 建立一個 Dstream,再把這些流所有都合併起來,而後進行計算。讀取時,一方面把 RDD 放在內存中,一方面寫 HDFS 中的 WAL 文件。
根據上面的情景,又要建立多個 Receiver,又要進行合併,又要在內存中存儲 RDD,又要寫 HDFS 上的 WAL 文件,高級 API 的缺點仍是比較多的
。
高階消費者是由高階消費者 API 本身提交 offset 到 ZooKeeper 中。
2.Direct
低階消費者須要本身維護 offset,Spark Streaming 從分區裏讀一部分數據,而後將 offset 保存到 CheckpointPath 目錄中
,好比 5s 生成一個 Spark Streaming job(每一個 action 操做啓動一次 job),每一個 job 生成的時候,會寫一次 CheckpointPath 下的文件,Checkpoint 中有 job 信息和 offset 信息(固然還有 RDD 依賴關係等其餘信息),即保存了未完成的 job 和分區讀取的 offset,一旦 Spark Streaming 掛掉後重啓,能夠經過從 CheckpointPath 中的文件中反序列化來讀取 Checkpoint 的數據
。
假設有 5 個分區,第一次 Spark Streaming 讀取 100 條數據,那麼每一個 partition 都會讀取 100 條數據,這 100 條數據對應 offset 是 0~99,這 5 個分區的 100 條數據數據直接對應 RDD 的 5 個分區,針對這一 RDD 會啓動一個 job 進行處理,job 啓動時會將 job 信息和 offset(0~99)寫入 CheckpointPath,處理完成前保存 job 和 offset,一旦處理完成,job 信息會被刪除,可是 offset 信息會被保留,經過此次的 offset 肯定下一次的讀取範圍,即 100~199,新的 job 信息會被寫入,新的 offset 100~199 覆蓋原來的 0~99。若是處理第二批次的時候掛掉了, offset 還在,就能夠重讀這塊數據。
在學習 MySQL 時,咱們接觸到了數據庫鏈接池技術,數據庫鏈接池負責分配、管理和釋放數據庫鏈接,它容許應用程序重複使用一個現有的數據庫鏈接,而不是再從新創建一個
;釋放空閒時間超過最大空閒時間的數據庫鏈接來避免由於沒有釋放數據庫鏈接而引發的數據庫鏈接遺漏。這項技術能明顯提升對數據庫操做的性能。
在實際開發時,對象的建立和銷燬操做也是很是消耗資源的,所以,咱們考慮使用對象池技術。當咱們須要建立對象時,向對象池申請一個對象,若是對象池裏有空閒的可用節點,就會把節點返回給用戶;當咱們須要銷燬對象時,將對象返回給對象池便可。
咱們經常使用的數據庫鏈接池是 C3P0 等數據庫鏈接池,根據對象池的概念,咱們發現對象池與數據庫鏈接池有很大的類似之處,其實,不少數據庫鏈接池就是藉助對象池技術實現的,所以,咱們能夠經過對象池實現本身的數據庫鏈接池。
MySQL代理代碼:
/**
* MySQL 客戶端代理對象
*
* @param jdbcUrl MySQL URL
* @param jdbcUser MySQL 用戶
* @param jdbcPassword MySQL 密碼
* @param client 默認客戶端實現
*/
case class MySqlProxy(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None) {
// 獲取客戶端鏈接對象
private val mysqlClient = client getOrElse {
DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword)
}
/**
* 執行增刪改 SQL 語句
*
* @param sql
* @param params
* @return 影響的行數
*/
def executeUpdate(sql: String, params: Array[Any]): Int = {
var rtn = 0
var pstmt: PreparedStatement = null
try {
// 第一步:關閉自動提交
mysqlClient.setAutoCommit(false)
// 第二步:根據傳入的 sql 語句建立 prepareStatement
pstmt = mysqlClient.prepareStatement(sql)
// 第三步:爲 prepareStatement 中的每一個參數填寫數值
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
}
// 第四步:執行增刪改操做
rtn = pstmt.executeUpdate()
// 第五步:手動提交
mysqlClient.commit()
} catch {
case e: Exception => e.printStackTrace
}
rtn
}
/**
* 執行查詢 SQL 語句
*
* @param sql
* @param params
*/
def executeQuery(sql: String, params: Array[Any], queryCallback: QueryCallback) {
var pstmt: PreparedStatement = null
var rs: ResultSet = null
try {
// 第一步:根據傳入的 sql 語句建立 prepareStatement
pstmt = mysqlClient.prepareStatement(sql)
// 第二步:爲 prepareStatement 中的每一個參數填寫數值
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
}
// 第三步:執行查詢操做
rs = pstmt.executeQuery()
// 第四步:處理查詢後的結果
queryCallback.process(rs)
} catch {
case e: Exception => e.printStackTrace
}
}
/**
* 批量執行 SQL 語句
*
* @param sql
* @param paramsList
* @return 每條SQL語句影響的行數
*/
def executeBatch(sql: String, paramsList: Array[Array[Any]]): Array[Int] = {
var rtn: Array[Int] = null
var pstmt: PreparedStatement = null
try {
// 第一步:關閉自動提交
mysqlClient.setAutoCommit(false)
pstmt = mysqlClient.prepareStatement(sql)
// 第二步:爲 prepareStatement 中的每一個參數填寫數值
if (paramsList != null && paramsList.length > 0) {
for (params <- paramsList) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
pstmt.addBatch()
}
}
// 第三步:執行批量的 SQL 語句
rtn = pstmt.executeBatch()
// 第四步:手動提交
mysqlClient.commit()
} catch {
case e: Exception => e.printStackTrace
}
rtn
}
// 關閉 MySQL 客戶端
def shutdown(): Unit = mysqlClient.close()
}
如上代碼所示,咱們完成了 MySQL 代理類 MySqlProxy 的建立,每一個 MySqlProxy 對象都會完成一次與 MySQL 的鏈接並提供操做 MySQL 數據庫的接口,那麼若是咱們將 MySqlProxy 對象建立的工做交給對象池,那麼就能夠實現重複利用與 MySQL 創建的鏈接,這與數據庫鏈接池的功能是同樣的。
在本項目中,咱們使用了 Apache common-pool2 框架,Apache common-pool2 包提供了一個通用的對象池技術的實現
。能夠很方便的基於它來實現本身的對象池,好比 DBCP 和 Jedis 他們的內部對象池的實現就是依賴於 common-pool2。
common-pool2 有四個核心:
一、工做類:要經過對象池建立對象的類,例如 MySqlProxy 類。
二、工廠類:生產工做類的工廠,工廠類是基於 BasePooledObjectFactory 的。
三、配置類:對象池活躍對象個數、最大空閒數等信息都須要配置,基於 GenericObjectPoolConfig。
四、對象池:實際的對象池類,基於 GenericObjectPool,其對象的建立須要傳入工廠類對象和配置類對象。
common-pool2 的對應關係以下圖所示:
對象池實現代碼:
/**
* 擴展知識:將 MySqlProxy 實例視爲對象,MySqlProxy 實例的建立使用對象池進行維護
*
* 建立自定義工廠類,繼承 BasePooledObjectFactory 工廠類,負責對象的建立、包裝和銷燬
*
* @param jdbcUrl
* @param jdbcUser
* @param jdbcPassword
* @param client
*/
class PooledMySqlClientFactory(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None)
extends BasePooledObjectFactory[MySqlProxy] with Serializable {
// 用於池來建立對象
override def create(): MySqlProxy = MySqlProxy(jdbcUrl, jdbcUser, jdbcPassword, client)
// 用於池來包裝對象
override def wrap(obj: MySqlProxy): PooledObject[MySqlProxy] = new DefaultPooledObject(obj)
// 用於池來銷燬對象
override def destroyObject(p: PooledObject[MySqlProxy]): Unit = {
p.getObject.shutdown()
super.destroyObject(p)
}
}
/**
* 建立 MySQL 池工具類
*/
object CreateMySqlPool {
// 加載 JDBC 驅動,只須要一次
Class.forName("com.mysql.jdbc.Driver")
// 在 org.apache.commons.pool2.impl 中預設了三個能夠直接使用的對象池:GenericObjectPool、GenericKeyedObjectPool 和 SoftReferenceObjectPool
// 建立 genericObjectPool 爲 GenericObjectPool
// GenericObjectPool 的特色是能夠設置對象池中的對象特徵,包括 LIFO 方式、最大空閒數、最小空閒數、是否有效性檢查等等
private var genericObjectPool: GenericObjectPool[MySqlProxy] = null
// 伴生對象經過 apply 完成對象的建立
def apply(): GenericObjectPool[MySqlProxy] = {
// 單例模式
if (this.genericObjectPool == null) {
this.synchronized {
// 獲取 MySQL 配置參數
val jdbcUrl = ConfigurationManager.config.getString(Constants.JDBC_URL)
val jdbcUser = ConfigurationManager.config.getString(Constants.JDBC_USER)
val jdbcPassword = ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)
val size = ConfigurationManager.config.getInt(Constants.JDBC_DATASOURCE_SIZE)
val pooledFactory = new PooledMySqlClientFactory(jdbcUrl, jdbcUser, jdbcPassword)
val poolConfig = {
// 建立標準對象池配置類的實例
val c = new GenericObjectPoolConfig
// 設置配置對象參數
// 設置最大對象數
c.setMaxTotal(size)
// 設置最大空閒對象數
c.setMaxIdle(size)
c
}
// 對象池的建立須要工廠類和配置類
// 返回一個 GenericObjectPool 對象池
this.genericObjectPool = new GenericObjectPool[MySqlProxy](pooledFactory, poolConfig)
}
}
genericObjectPool
}
}
GenericObjectPool 的核心方法以下: borrowObject():從對象池中取出一個對象。 returnObject():將使用完成的對象還給對象池。在每次使用 CreateMySqlPool 時,經過 borrowObject() 提取對象,經過 returnObject() 歸還對象。