在處理時間序列或者是有序數據時候,常常會越到這樣的情形:一、求客戶最近一個月的平均消費金額;二、求客戶最近一個月的消費次數;三、求與客戶在最近一個月內發生大額消費的客戶數量apache
上述問題中,問題1與問題2是典型的求指定客戶在指定時間段的行爲統計值,能夠經過客戶進行分組而後過濾統計便可,而問題三則是求解與該客戶在指定時間段內發生某種行爲的客戶數量,即沒有具體的聚合Key,從而不一樣按照問題1與問題2進行分組的方式求解,而經過相似時間序列中求一個序列的移動平均方法來求解函數
針對上述兩類情形,下面分別提供了兩個函數,這個兩個函數,一個是aggregateByKey,根據key聚合,並對聚合後的每一條記錄採用窗口函數獲取聚合數據,即該函數是對每個key對應的value進行移動聚合操做。另外一個是aggregateByAll,根據key進行排序,經過窗口函數獲取結果進行聚合。這兩個函數都是根據窗口函數進行聚合操做,但不一樣點在於做用範圍不一樣。spa
窗口函數與聚合函數用戶定義能夠自由定義,經過定義不一樣的窗口函數與聚合函數,能夠實現不一樣的移動邏輯以及聚合運算。scala
aggregateByKey實現起來比較簡單,由於通常狀況先,單個key對應的value不會很大(單個executor的內存是能夠裝得下的),全部能夠經過簡單的reduceByKey把全部相同key對應的value混洗到相同的分區,而後對分區內的每條數據經過窗口函數移動,把移動獲取到的數據根據聚合函數執行組合操做便可code
aggregateByAll因爲要對整個RDD內的數據進行移動聚合,全部不可以像aggregateByKey那樣把待聚合數據放在一塊兒(由於是全部,executor通常裝不下),因此要經過分區分別操做。簡要步驟以下:排序
技術難點在於移動窗口是跨分區時候如何解決?即當前數據須要聚合的數據在另一個分區中。索引
package com.jiamz.aidp.bigdata.utils import com.jiamz.aidp.bigdata.SparkHelper import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, RangePartitioner} import scala.reflect.ClassTag /** * Created by zhoujiamu on 2020/3/11. */ object MovingAggregate { private val lastPart: Byte = -1 private val currPart: Byte = 0 private val nextPart: Byte = 1 /** * 根據key對value進行聚合操做, 與reduceByKey不一樣的是, 該聚合操做是對同一key對應的value根據窗口函數進行滑動, * 而後針對窗口移動獲取的結果進行聚合操做 * @param rdd 待聚合key-value形式的RDD * @param winFunc 移動窗口函數 * @param aggFunc 聚合函數 * @tparam K key類型 * @tparam V value類型 * @tparam U 聚合結果類型 * @return 移動聚合結果RDD */ def aggregateByKey[K: ClassTag, V: ClassTag, U: ClassTag](rdd: RDD[(K, V)], winFunc: (V, V) => Boolean, aggFunc: Seq[V] => U): RDD[(K, (V, U))] ={ val result = rdd.mapPartitions(iter => iter.map{case(k, v) => k -> Seq(v)}) .reduceByKey(_++_) .mapPartitions(iter => iter.flatMap{case(k, seq) => { val aggResult = seq.flatMap(s1 => { val aggSeq = seq.filter(s2 => winFunc(s1, s2)) if (aggSeq.nonEmpty) Iterator(s1 -> aggFunc(aggSeq)) else Iterator.empty }) aggResult.map(res => k -> res) }} ) result } /** * 根據key對數據進行排序, 經過winFunc函數來滑動截取須要聚合的數據進行聚合操做, * @param rdd 待聚合key-value形式的RDD * @param winFunc 移動窗口函數 * @param aggFunc 聚合函數 * @tparam K key類型 * @tparam V value類型 * @tparam U 聚合結果類型 * @return 移動聚合結果RDD */ def aggregateByAll[K: Ordering: ClassTag, V: ClassTag, U: ClassTag](rdd: RDD[(K, V)], winFunc: (K, K) => Boolean, aggFunc: Seq[V] => U ): RDD[(K, (V, U))] ={ val partitioner = new RangePartitioner(rdd.getNumPartitions, rdd) val newRdd = rdd.partitionBy(partitioner).cache() /** * 獲取當前數據近鄰的記錄 * @param index 當前數據索引 * @param seq 數據列表 * @return 當前數據近鄰記錄 */ def getNbrs(index: Int, seq: Seq[(K, V)]): Seq[(K, V)] ={ val center = seq(index)._1 val len = seq.length var start = index var end = index while (end < len && winFunc(center, seq(end)._1)) end += 1 while (start >= 0 && winFunc(center, seq(start)._1)) start -= 1 seq.slice(start+1, end) } val numPartition = rdd.getNumPartitions def getPartTail(pid: Int, seq: Seq[(K, V)]): Seq[(Int, ((K, V), Byte))] ={ val center = seq.last._1 val length = seq.length var ind = length - 1 while (ind >= 0 && winFunc(center, seq(ind)._1)) ind -= 1 seq.slice(ind+1, length).map(data => (pid+1, (data, lastPart))) } def getPartHead(pid: Int, seq: Seq[(K, V)]): Seq[(Int, ((K, V), Byte))] ={ val center = seq.head._1 val length = seq.length var ind = 1 while (ind < length && winFunc(center, seq(ind)._1)) ind += 1 seq.slice(0, ind).map(data => (pid-1, (data, nextPart))) } // 相鄰分區的數據中符合窗口函數的須要進行copy到相鄰分區中,使得對每一條數據的鄰居數據(設定窗口內)都在同一分區 val rddWithShuffle = newRdd.mapPartitionsWithIndex((pid, iter) => { if (numPartition == 1 || iter.isEmpty){ iter.map(data => (pid, (data, currPart))) } else { val seq = iter.toSeq.sortBy(_._1) val moved = if (pid == 0){ // 第一個分區的尾部數據日後一個分區移動 getPartTail(pid, seq) } else if (pid == numPartition-1){ // 最後一個分區的尾部數據往前一個分區移動 getPartHead(pid, seq) } else { // 中間分區先後的數據都往相鄰分區移動 getPartHead(pid, seq) ++ getPartTail(pid, seq) } val fixed = seq.map(data => (pid, (data, currPart))) (fixed ++ moved).toIterator } }).partitionBy(new HashPartitioner(numPartition)) val aggregateResult = rddWithShuffle.mapPartitions(part => { val seq = part.toSeq.sortBy(_._1) val data = seq.map(_._2._1) seq.zipWithIndex.filter(x => x._1._2._2.equals(currPart)) .map{case(_, i) => { val center = seq(i)._2._1 val nbrs = getNbrs(i, data) val aggRes = aggFunc(nbrs.map(_._2)) (center._1, (center._2, aggRes)) }}.toIterator }) aggregateResult } def main(args: Array[String]): Unit = { SparkHelper.setLogLevel("WARN") val sc = SparkHelper.getSparkContext("MovingAggregate", "spark.master" -> "local") val array = Array( ("id1", (1, 10.0)), ("id1", (2, 20.0)), ("id1", (3, 30.0)), ("id1", (7, 40.0)), ("id1", (8, 50.0)), ("id1", (9, 60.0)), ("id2", (1, 70.0)), ("id2", (2, 80.0)), ("id2", (3, 30.0)), ("id2", (4, 20.0)), ("id2", (7, 50.0)), ("id2", (9, 60.0)) ) /** 針對key作滑動平均 */ // 定義移動窗口函數 def winFunc(v1: (Int, Double), v2: (Int, Double)) = { v2._1 - v1._1 <= 3 && v2._1 >= v1._1 } // 定義聚合函數, 求平均 def aggFunc(seq: Seq[(Int, Double)]) = { val values = seq.map(_._2) values.sum / values.length } val rdd = sc.makeRDD(array) val res = aggregateByKey(rdd, winFunc, aggFunc) // 打印按照key計算移動平均結果 res.collect().foreach(println) println("-"*50) val array1 = Array( (1, 10.0), (2, 20.0), (3, 30.0), (7, 40.0), (8, 50.0), (9, 60.0), (20, 70.0), (22, 80.0), (23, 30.0), (31, 20.0), (33, 50.0), (36, 60.0) ) val rdd1 = sc.makeRDD(array1, numSlices = 3) def winFunc1(s1: Int, s2: Int) = { math.abs(s2 - s1) <= 1 } def aggFunc1(seq: Seq[Double]) = { seq.sum / seq.length } val res1 = aggregateByAll(rdd1, winFunc1, aggFunc1) println("-"*30) // 打印按照key指定的窗口計算移動平均結果 res1.collect().foreach(println) } }
(id1,((1,10.0),20.0)) (id1,((2,20.0),25.0)) (id1,((3,30.0),30.0)) (id1,((7,40.0),50.0)) (id1,((8,50.0),55.0)) (id1,((9,60.0),60.0)) (id2,((1,70.0),50.0)) (id2,((2,80.0),43.333333333333336)) (id2,((3,30.0),25.0)) (id2,((4,20.0),35.0)) (id2,((7,50.0),55.0)) (id2,((9,60.0),60.0)) -------------------------------------------------- ------------------------------ (1,(10.0,15.0)) (2,(20.0,20.0)) (3,(30.0,25.0)) (7,(40.0,45.0)) (8,(50.0,50.0)) (9,(60.0,55.0)) (20,(70.0,70.0)) (22,(80.0,55.0)) (23,(30.0,55.0)) (31,(20.0,20.0)) (33,(50.0,50.0)) (36,(60.0,60.0))