【異常檢測】Isolation forest 的spark 分佈式實現

1.算法簡介html

  算法的原始論文 http://cs.nju.edu.cn/zhouzh/zhouzh.files/publication/icdm08b.pdf 。python的sklearn中已經實現了相關的api,對於單機的數據已經足夠使用了,連接以下 http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html 。若是你想探究分佈式下該算法怎麼實現,下面細看。java

  按照慣例先講一下算法的思想,對於已經瞭解的小夥伴來講,這段跳過。它的思路有點相似隨機森林,併發訓練N棵樹,每棵樹是沒有關聯的,且每棵樹用到的樣本和屬性也是隨機的,所不一樣的是,isolation forest (下面簡稱IF)是非監督的算法,經過構建二叉樹,而後在構建好的樹上,來預測樣本的深度,若是深度太淺,則是疑似異常的樣本。更加詳細的論斷和細節請查看論文,或者參考國內各大博客主寫的我的感悟,咱們把重點放在分佈式實現上面。python

2.分佈式實現git

在實現以前重點關注一點,IF算法並不須要全部的樣本,甚至不能使用太多的樣本,使用小樣本的狀況,算法效果更優。這一點在論文中有論斷:github

如上圖所示,若是使用所有的樣本做爲訓練集,則異常的樣本,未必能識別出來,而在小樣本下能夠輕鬆識別。論文中比較了這兩種方式,前者AUC達到0.67,然後者能達到0.91。算法

基於上面的論斷,每棵樹的樣本大小不能太大,固然下面實現的方式既支持小樣本又支持大樣本,這個依賴於用戶本身喜歡了apache

import java.util.concurrent.Executors
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import org.apache.spark.storage.StorageLevel
import scala.concurrent.duration.Duration 
import scala.concurrent.{Await, ExecutionContext, Future} 
import scala.util.Random
import org.apache.hadoop.fs._

sealed trait ITree

case class ITreeBranch(left: ITree, right: ITree, split_column: Int, split_value: Double) extends ITree

case class ITreeLeaf(size: Long) extends ITree

/** @param trees      trained trees
  * @param maxSamples The number of samples to train each base tree
  */
case class IForest(trees: Array[ITree], maxSamples: Int) {

  def predict(x: Array[Double]) = {
    if (trees.forall(_ == null)) {
      throw new Exception("Please train before predict!!")
    } else {
      val predictions = trees.map(s => pathLength(x, s, 0)).toList
      math.pow(2, -(predictions.sum / predictions.size) / cost(maxSamples))
    }
  }

  @scala.annotation.tailrec
  final def pathLength(x: Array[Double], tree: ITree, path_length: Int): Double = {
    tree match {
      case ITreeLeaf(size) =>
        path_length + cost(size)

      case ITreeBranch(left, right, split_column, split_value) =>
        val sample_value = x(split_column)
        if (sample_value < split_value)
          pathLength(x, left, path_length + 1)
        else
          pathLength(x, right, path_length + 1)
    }
  }

  private def cost(num_items: Long): Double =
    if (num_items <= 1) 1.0 else 2.0 * (math.log(num_items - 1.0) + 0.577215664901532860606512090082402431) - (2.0 * (num_items - 1.0) / num_items)

}

object IForest {

  /**
    * @param numTrees    The number of base tree in the ensemble
    * @param maxSamples  The number of samples to train each base tree ,should be small!! should be small!! should be small!!
    *                    should be small!! should be small!! should be small!!
    * @param maxFeatures The fraction of features to train each base tree value in (0.0,1.0]
    *                    //    * @param withReplacement whether sampling is done with replacement, do something in future
    * @param nJobs       The number of jobs to run in parallel for fit ,do something in future
    */
  def buildForest(data: RDD[Array[Double]], numTrees: Int = 100, maxSamples: Int = 256, maxFeatures: Double = 1.0, nJobs: Int = 10, distribute: Boolean = false) = {
    val sc = data.sparkContext
    val cacheData = if (sc.getRDDStorageInfo.filter(_.id == data.id).nonEmpty) data else data.persist(StorageLevel.MEMORY_AND_DISK)
    val dataCnt = data.count()
    println(s"AllSmaples =>${dataCnt}")

    val numFeatures = cacheData.take(1)(0).size
    checkData(cacheData, numFeatures)
    val sampleNumSamples = Math.min(maxSamples, dataCnt).toInt
    val sampleNumFeatures = (maxFeatures * numFeatures).toInt
    val maxDepth = Math.ceil((math.log(math.max(sampleNumSamples, 2)) / math.log(2))).toInt

    val sampleRatio = Math.min(sampleNumSamples * 1.0 / dataCnt * 2, 1.0)
    val trees =
      if (distribute) {
        implicit val xc = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nJobs))
        val tasks = (0 until numTrees).map {
          i =>
            val sampleData = cacheData.sample(false, sampleRatio, System.currentTimeMillis()).zipWithIndex().filter(_._2 <= sampleNumSamples).map(_._1)
            parallizeGrow(sampleData, maxDepth, numFeatures, sampleNumFeatures)
        }
        val results = Await.result(Future.sequence(tasks), Duration.Inf)
        results.toArray
      }
      else
        (0 until numTrees).sliding(nJobs, nJobs).map {
          arr =>
            sc.union(
              arr.map {
                i =>
                  cacheData.sample(false, sampleRatio, System.currentTimeMillis()).zipWithIndex().filter(_._2 <= sampleNumSamples)
                    .map(_._1).repartition(1).mapPartitions {
                    iter =>
                      val delta = iter.toArray
                      val sampleFeatures = if (sampleNumFeatures < numFeatures) Random.shuffle((0 until numFeatures).toList).take(sampleNumFeatures) else (0 until numFeatures).toList
                      Iterator(growTree(delta, maxDepth, sampleFeatures, 0))
                  }
              }
            ).collect()
        }.reduce(_ ++ _)

    new IForest(trees, maxSamples)
  }

  def saveModel(sc: SparkContext, iforest: IForest, path: String) = {
    val hdfs=FileSystem.get(sc.hadoopConfiguration)
    hdfs.delete(new Path(path), true)
    sc.parallelize(Seq(iforest), 1).saveAsObjectFile(path)
  }

  def loadModel(sc: SparkContext, path: String) = {
    sc.objectFile[IForest](path).collect()(0)
  }

  private def growTree(data: Array[Array[Double]], maxDepth: Int, sampleFeatures: Seq[Int], currentDepth: Int): ITree = {
    val numSamples = data.length
    if (currentDepth >= maxDepth || numSamples <= 1 || data.distinct.length == 1) {
      new ITreeLeaf(numSamples)
    } else {
      val splitColumn = sampleFeatures(Random.nextInt(sampleFeatures.length))
      val columnValue = data.map(_.apply(splitColumn))
      val colMin = columnValue.min
      val colMax = columnValue.max
      val splitValue = colMin + Random.nextDouble() * (colMax - colMin)
      val dataLeft = data.filter(_ (splitColumn) < splitValue)
      val dataRight = data.filter(_ (splitColumn) >= splitValue)
      new ITreeBranch(growTree(dataLeft, maxDepth, sampleFeatures, currentDepth + 1),
        growTree(dataRight, maxDepth, sampleFeatures, currentDepth + 1),
        splitColumn, splitValue)
    }
  }

  private def parallizeGrow(data: RDD[Array[Double]], maxDepth: Int, numFeatures: Int, sampleNumFeatures: Int)(implicit xc: ExecutionContext) = Future {
    val sampleFeatures = if (sampleNumFeatures < numFeatures) Random.shuffle((0 until numFeatures).toList).take(sampleNumFeatures) else (0 until numFeatures)
    growTree(data, maxDepth, sampleFeatures, 0)
  }

  private def growTree(data: RDD[Array[Double]], maxDepth: Int, sampleFeatures: Seq[Int], currentDepth: Int): ITree = {
    val sc = data.sparkContext
    val cacheData = if (sc.getRDDStorageInfo.filter(_.id == data.id).length > 0) data else data.persist(StorageLevel.MEMORY_AND_DISK)
    val numSamples = cacheData.count()
    val ret = if (currentDepth >= maxDepth || numSamples <= 1 || cacheData.distinct.count() == 1) {
      new ITreeLeaf(numSamples)
    } else {
      val splitColumn = sampleFeatures(Random.nextInt(sampleFeatures.length))
      val columnValue = cacheData.map(_ (splitColumn))
      val colMin = columnValue.min()
      val colMax = columnValue.max()
      val splitValue = colMin + Random.nextDouble() * (colMax - colMin)
      val dataLeft = cacheData.filter(_ (splitColumn) < splitValue)
      val dataRight = cacheData.filter(_ (splitColumn) >= splitValue)
      new ITreeBranch(growTree(dataLeft, maxDepth, sampleFeatures, currentDepth + 1),
        growTree(dataRight, maxDepth, sampleFeatures, currentDepth + 1),
        splitColumn, splitValue)
    }

    cacheData.unpersist()
    ret
  }

  private def checkData(data: RDD[Array[Double]], numFeatures: Int) = {
    assert(data.filter(arr => !(arr.length == numFeatures)).isEmpty(), "data must in equal column size")
  }

}

 

  代碼說明:api

  1. 代碼主要參考 https://github.com/hsperr/first_steps_in_scala 
  2. 原始代碼中有錯誤,具體在predict 函數中num_samples 參數應該是每棵樹的樣本數量,而不是全部的樣本數量。
  3. 原始代碼中,不是並行的,關鍵在於trees.map(s=>growTree(getRandomSubsample(data, subSampleSize/numSamples.toDouble, seed), maxHeight, numColumns)) 這一行,在spark的driver端進行解析中,是一個個action串行執行的。
  4. 原始代碼中其實漏掉了一個樹的中止分裂的條件,那就是若是剩餘的樣本都相同的話,也中止生長。另外兩個的中止生長的條件是達到樹的最大深度和只剩下小於等於1個樣本。
  5. buildForest函數,參數的具體含義參照註釋,基本是仿照python的參數來實現的,惟一值得解釋的是distribute,默認值是false。當該參數爲true時,代碼會在driver端起njobs個線程,而後每一個線程監控執行一個action算子去生成一棵樹,具體調用的是 growTree(data: RDD[Array[Double]]...)這個函數;參數爲false時,實際上每一個partition裏面的樣本是對原始樣本上的小採樣,而後在小採樣的樣本上進行構建一個棵樹,你會發現裏面的實現和單機是同樣的,惟一區別是在分佈式的大數據上進行的採樣,以及生成大批量的一堆樹,具體實現參照 growTree(data: Array[Array[Double]]...) 函數。
  6. 每顆樹的深度是樣本數目取log2以後算出來的,這個和python的api保持一致。
  7. 至於什麼樣的樣本纔是異常的,這個根據打出來的分數,降序排列。而後能夠根據百分比進行設置閾值,或者根據具體的分數進行設置閾值。惟一抓住的核心是,要看一下分數在整體樣本上的一個分佈,而後根據分佈作決策。

3.總結併發

1.代碼已經測試經過,直接mvn編譯打包,運行環境爲hadoop3.1.0和spark2.3,你們放心使用。app

2.若有疑問歡迎指正,你們相互學習交流。

相關文章
相關標籤/搜索