基於 Algebird 談一談代數數據類型在數據聚合中的應用

此文已由做者肖乃同受權網易雲社區發佈。
java

歡迎訪問網易雲社區,瞭解更多網易技術產品運營經驗。android




代數數據類型是指知足必定數學特性的數據類型, 這些特性使得計算可以很方便的並行化,在Scalding和 Spark等數據計算框架中有着普遍的應用。代數數據類型是一個通用的概念, 其實現不限於Algebird, 本文主要結合近期處理的一個數據任務, 介紹一下這一技術及Algebird這個函數庫。 文中代碼示例都是基於Scala, 若有紕漏歡迎指正。git

應用場景:雲閱讀用戶流失模型特徵體取

近期接到這樣一個任務, 提取一個特定時間窗口登錄用戶的95個特徵,用於訓練預測用戶流失的模型。抽取 任何一個單獨特徵並不複雜, 不過特徵衆多,數據分佈在多個數據源。我計劃延續前期的代碼, 使用scalding 處理這一任務。 這些特徵大體分爲四類:github

  1. 功能使用的次數sql

  2. 功能使用的天數express

  3. 用戶某一屬性最新的非空值apache

  4. 用戶某一屬性的集合數量api

使用次數就是常規的數值累加,2和4都要考慮集合的去重,3是時間上的maxBy同時要考慮空值處理問題。 爲了方便統一處理,我考慮將這些數據都轉化成可加的代數數據類型, 而後基於這些類型作聚合。安全

Scalding 及 Spark 類型安全聚合接口介紹

先來看一下Scalding提供的聚合接口,直接使用Algebird提供的聚合器:bash

import com.twitter.algebird.Aggregator.count
  val users: TypedPipe[User] = ???
  users.groupBy(_.userId).aggregate(count)複製代碼

Spark在2.0後增長了DataSet這一新的API, 簡單講就是類型安全的DataFrame (基本等同於與scalding type safe api 之於 field based api)。

import org.apache.spark.sql.expressions.scalalang.count
 spark.createDataset(Seq(1,2,3))
      .groupByKey(_)
      .agg(count)複製代碼

Spark 沒有直接使用 algebird, 其參考algebird代碼(看了一下spark的代碼註釋),寫了一套相似接口。 後面會講到如何在Spark使用Algebird。

Aggregator提供了可複用的聚合組件,再也不限於特定的字段。Algebird的Aggregator是基於半羣和幺半羣的代數數據類型聚合。

代數數據類型理論

先來補習一下數學知識,羣是一個二元操做下知足必定特徵的集合:

  1. 閉包性: 集合中的任意兩個元素A和B, A op B 結果依然是集合的元素

  2. 結合律: 集合中的任意兩個元素A、B和C, (A op B) op C 等價於 A op (B op C)

  3. 幺元(也能夠叫零元 Unit):集合存在元素e, 使得任意的元素A有 e op A 等價於 A op e 等價於 A

  4. 逆元: 任意元素A, 存在集合元素B, 使得 A op B = e (e 爲幺元)

知足所有條件的是羣, 知足1和2的是半羣, 知足一、2和3的是幺半羣(有幺元存在)

舉例幾個具體的例子說明一下:

  • 天然數在加法下是一個羣,知足閉包和結合律,0是幺元,負數是逆元

  • 偶數在加法下是一個羣, 奇數不是, 不知足閉包性,奇數相加爲偶數

  • 奇數在乘法下是一個幺半羣, 不存在逆元

  • 正整數在加法下是一個半羣, 不存在幺元,0不屬於正整數

Algebird 實現介紹

Algebird 是twitter開源的scala的抽象代數庫,實現了常見數據類型的半羣、幺半羣等支持, 是從scalding分離出來的通用庫。

經過例子比較好理解:

import com.twitter.algebird._ import com.twitter.algebird.Operators._ Max(3) + Max(5) + Max(10)  // result: Max(10)
 Map(1 -> 2) + Map(1 -> 3) // result: Map(1 -> 5) 
 Map(1 -> Max(3), 2 -> Max(7)) + Map(1 -> Max(-10), 2 -> Max(20)) 
 // result: Map(1 -> Max(3), 2 -> Max(20))複製代碼

Max是個Semigroup(半羣), Map是個Monoid(幺半羣), Algebird有很大的靈活性,從上面示例能夠看到Map的值是半羣,能夠實現相同key的值的聚合。

Algebird除了基本Semigroup和Monoid, Map、IndexedSeq、Tuple等高階的羣 (參數類型是羣的羣,我這樣稱謂),能夠組合出很是靈活的使用。

用戶流失模型中的應用

回到我要處理的問題上來, 須要按照用戶去計算4類不一樣的特徵值, 這些值很稀疏, 能夠把上述問題轉化成聚合問題。

以搜索這個事件來講明, 假設要統計用戶搜索的次數、天數、關鍵詞數量,那麼

Map("搜索次數" -> 1)
  Map("搜索天數" -> date)
  Map("搜索關鍵詞數量" -> keyword)複製代碼

Map是幺半羣,須要值類型是半羣, date和keyword須要轉換成半羣的數據結構。 關鍵詞數量須要去重, 能夠使用Set來作, 使用Set求集合, 最後取集合數量, 聚合器以下:

import com.twitter.algebird.Aggregator.{const, toSet, prepareMonoid => sumAfter}

  val searchCountAgg = sumAfter[MdaEvent, Map[String, Int]](_.searchCount)

  val keywordCountAgg = toSet[String]
   .composePrepare[ClientEvent](_.keyword)
   .andThenPresent(_.size)複製代碼

搜索天數統計,日期也能夠使用上述集合,不過天數的統計很是多, 集合開銷比較大, 我把它轉成一個bitset, , 我統計的窗口只有1個月, 因此用Long型記下相對於開始日期, 這一天是否是有使用:

import com.twitter.algebird.Monoid
  import com.github.nscala_time.time.Imports._
  import org.joda.time.Days

  class Bits(val value: Long) extends AnyVal {
    def count: Int = java.lang.Long.bitCount(value)
    def get(b: Int): Int = if((value & (1 << b)) > 0) 1 else 0
    override def toString: String = value.toBinaryString
  }

  object BitsMonoid extends Monoid[Bits] {
    override def zero = new Bits(0L)
    override def plus(left: Bits, right: Bits) = new Bits(left.value | right.value)
    override def sumOption(iter: TraversableOnce[Bits]): Option[Bits] = {
      if(iter.isEmpty) None
      Some(iter.reduce((a, b) => new Bits(a.value | b.value)))
    }
  }

  def dateDiffToBits(fromDate: DateTime): Long => Bits = {
    val base = fromDate.withTimeAtStartOfDay()
    (timestamp: Long) => {
      val theDay = new DateTime(timestamp).withTimeAtStartOfDay()
      val days = Days.daysBetween(base, theDay).getDays
      require(days < 64, s"only 64 bits long is supported, got day diff: $days")
      new Bits(1 << days)
    }
  }

  val toBitsFun = dateDiffToBits(sampleStartDate)

  val searchDaysAgg = {
      implicit val m = BitsMonoid
      sumAfter[MdaEvent, Map[String, Bits]] { event =>
        searchTime(event).mapValues(toBitsFun)
      }.andThenPresent(_.mapValues(b => b.count))
    }複製代碼

最後來處理第三類特徵非空最新屬性,這個屬性是取按時間的最大值,空值須要特別處理一下, 使用Max, 把排序函數修改一下:

import com.twitter.algebird.Aggregator.max
  def latestStringProperty[U <: ClientEvent](fn: U => String): Aggregator[U, U, String] = {
      import com.twitter.algebird.Aggregator.max
      implicit val ordU = Ordering.by { u: U =>
        val p = fn(u)
        val isEmpty = if (p.isEmpty) 0 else 1
        (isEmpty, u.opTime) // empty property always be covered by value property
      }
      max[U].andThenPresent(e => fn(e))
  }複製代碼

最後就可以使用這些聚合器, 提取所需的特徵值了

val multiOps = MultiAggregator(
    searchCountAgg,
    keywordCountAgg,
    searchDaysAgg,
    latestStringProperty(_.productVersion)
 )
  val daReport = daEvents.groupBy(_.userId).aggregate(multiOps)複製代碼

如何在Spark中的使用

最後來說講若是在Spark中使用Algebird聚合器, 這個特徵提取原本應該在Spark處理更爲方便, 來研究了一下Spark的聚合器。

Spark沒有直接使用Algebird, 但其聚合器基本參照Algebird的, 我寫了一個適配的類來方便直接在 Spark中使用上述的聚合器:

import com.twitter.algebird.MonoidAggregator
  import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
  import org.apache.spark.sql.expressions.Aggregator
  import org.apache.spark.sql.{Encoder, SparkSession, TypedColumn}

  implicit class MonoidToTypedColumn[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) {
    def toColumn: TypedColumn[A,C] = new MonoidAggregatorAdaptor(m).toColumn
  }

  class MonoidAggregatorAdaptor[-A,B: Encoder,C: Encoder](val m: MonoidAggregator[A,B,C]) extends  Aggregator[A,B,C] {
      override def zero = m.monoid.zero
      override def reduce(b: B, a: A) = m.reduce(b, m.prepare(a))
      override def finish(reduction: B) =  m.present(reduction)
      override def merge(b1: B, b2: B) = m.reduce(b1, b2)

      override def bufferEncoder = implicitly[Encoder[B]]
      override def outputEncoder = implicitly[Encoder[C]]
 }複製代碼

這裏只貼了適配Monoid的聚合器, Semigroup的會稍麻煩,代碼比較多, 基本參考org.apache.spark.sql.expressions.ReduceAggregator。

最後咱們就能夠直接再Spark使用Algebird:

val latest = maxBy[DeviceEvent, Long](_.timestamp).toColumn.name("latest")
  val count = size.toColumn.name("count")

  spark.createDataset(Seq(DeviceEvent("a", "iphone", 10L), DeviceEvent("a", "android", 100L), DeviceEvent("a", "iphone", 123L)))
      .groupByKey(_.id)
      .agg(count, latest)
      .collect複製代碼

總結

使用代數數據類型, 咱們數據計算的代碼更接近於問題描述語言, 表達力更強,避免了命令式的操做,bug更少。


網易雲免費體驗館,0成本體驗20+款雲產品!

更多網易技術、產品、運營經驗分享請點擊




相關文章:
【推薦】 Android TV 開發 (1)

相關文章
相關標籤/搜索