Spark Mlib TFIDF源碼詳讀 筆記

在提取文本特徵時,常常用到TF-IDF算法。Spark Mlib實現了該算法。下面是Spark Mlib中,TF_IDF算法調用的一個實例:html

def main(args:Array[String]){
        val sc: SparkContext = null                         
      // Load documents (one per line).
        val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)
       val hashingTF = new HashingTF()
      //計算tf 
        val tf: RDD[Vector] = hashingTF.transform(documents)
        tf.cache()
      //獲得idfModel對象 
        val idf = new IDF().fit(tf)
      //獲得tf-idf值
        val tfidf: RDD[Vector] = idf.transform(tf)

要求輸入數據  必須是一行一篇文章(切過詞的),Spark Mlib中沒有提供切詞的工具,但給出了建議使用的切詞工具 Stanford NLP Group and scalanlp/chalkgit

一、TF源碼詳讀github

在調用的代碼中,咱們找到算法

val hashingTF = new HashingTF()
//計算tf 
val tf: RDD[Vector] = hashingTF.transform(documents)

  獲取TF,主要是經過HashingTF類的 transform方法,跟蹤該方法apache

 

  /**
   * Transforms the input document to term frequency vectors.
   */
  @Since("1.1.0")
  def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = {
    dataset.map(this.transform)
  }

 

SparkMlib是基於RDD的,因此在看源碼前,必需要對RDD熟悉。再看 dataset.map(this.transform)中的transform方法:api

 

 /**
   * Transforms the input document into a sparse term frequency vector.
   */
  @Since("1.1.0")
  def transform(document: Iterable[_]): Vector = {
    //定義詞頻的map
    val termFrequencies = mutable.HashMap.empty[Int, Double]
    //循環每篇文章裏的每一個詞
    document.foreach { term =>
    //獲取詞項term對應的向量位置
      val i = indexOf(term)
    //i即表明這個詞,統計次數放入termFrequencies
      termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0)
    }
    //將詞特徵映射到一個很大維度的向量中去 稀疏向量 numFeatures是類HashingTF的成員變量 能夠在調用HashingTF傳入,若是沒有傳入,默認爲2的20次方
    Vectors.sparse(numFeatures, termFrequencies.toSeq)
  }

 

transform方法對每一行(即每篇文章)都會執行一次,主要是計算每篇文章裏的詞的詞頻,轉存入一個維度很大的稀疏向量中,每一個詞在該向量中對應的位置就是:工具

 @Since("1.1.0")
  def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)

term.##至關於hashcode(),獲得每一個詞的hash值,而後對numFeatures 取模,是個Int型的值this

到此爲止,TF就計算完了,最終的結果是一個存放詞的位置,以及該詞對應詞頻的 向量,即SparseVector(size, indices, values)spa

二、IDF源碼詳讀     scala

      //獲得idfModel對象 輸入的tf類型是SparseVector(size, indices, values)
        val idf = new IDF().fit(tf)
      //獲得tf-idf值
        val tfidf: RDD[Vector] = idf.transform(tf)

IDF實現主要經過兩步:

第一步: val idf = new IDF().fit(tf)

 /**
   * Computes the inverse document frequency.
   * @param dataset an RDD of term frequency vectors
   */
  @Since("1.1.0")
  def fit(dataset: RDD[Vector]): IDFModel = {
    //返回 IDF向量 類型是DenseVector(values)
    val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
          minDocFreq = minDocFreq))(///minDocFreq是詞最小出現頻率,不填是默認0
      seqOp = (df,v) => df.add(v),//計算
      combOp = (df1, df2) => df1.merge(df2)//合併
    ).idf()
    new IDFModel(idf)
  }

上面treeAggregate方法原型是def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) =>U, depth: Int = 2): U    

treeAggregate是使用mapPartition進行計算的,需定義兩個操做符,一個用來計算,一個用來合併結果

 seqOp 用來計算分區結果的操做符 (an operator used to accumulate results within a partition)

combOp 用來組合來自不一樣分區結果的關聯操做符( an associative operator used to combine results from different partitions)

該方法的調用返回new IDF.DocumentFrequencyAggregator對象,接着又調用DocumentFrequencyAggregator的idf方法,返回idf向量,而後又經過new IDFModel(idf)返回IDFModel對象

下面是 DocumentFrequencyAggregator 類的方法,即一個add(seqOp)一個merge(combOp

 

private object IDF {

  /** Document frequency aggregator. */
  class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable {

    /** number of documents 文檔總數量*/ 
    private var m = 0L
    /** document frequency vector df向量,詞在出現過的文檔個數*/
    private var df: BDV[Long] = _


    def this() = this(0) //構造方法,若是minDocFreq沒有傳入的話,默認值爲0

    /** Adds a new document. 這個地方就是執行的每一個分區裏的計算操做 ,輸入是tf向量*/
    def add(doc: Vector): this.type = {
      if (isEmpty) {
        df = BDV.zeros(doc.size)
      }
      doc match {
      //tf向量是 SparseVector 因此會走這個case
        case SparseVector(size, indices, values) =>
          val nnz = indices.size
          var k = 0
          while (k < nnz) {
            if (values(k) > 0) {
              df(indices(k)) += 1L //若是詞在文章中出的頻率大於0,則該詞的df+1
            }
            k += 1
          }
        case DenseVector(values) =>
          val n = values.size
          var j = 0
          while (j < n) {
            if (values(j) > 0.0) {
              df(j) += 1L
            }
            j += 1
          }
        case other =>
          throw new UnsupportedOperationException(
            s"Only sparse and dense vectors are supported but got ${other.getClass}.")
      }
      m += 1L
      this
    }

    /** Merges another. 這個地方就是執行全部分區的合併操做*/
    def merge(other: DocumentFrequencyAggregator): this.type = {
      if (!other.isEmpty) {
        m += other.m //總文檔數合併
        if (df == null) {
          df = other.df.copy
        } else {
          df += other.df //df向量合併
        }
      }
      this
    }

    private def isEmpty: Boolean = m == 0L

    /** Returns the current IDF vector. 計算idf向量的方法 */
    def idf(): Vector = {
      if (isEmpty) {
        throw new IllegalStateException("Haven't seen any document yet.")
      }
      val n = df.length
      val inv = new Array[Double](n)
      var j = 0
      while (j < n) {
        /*
         * If the term is not present in the minimum
         * number of documents, set IDF to 0. This
         * will cause multiplication in IDFModel to
         * set TF-IDF to 0.
         *
         * Since arrays are initialized to 0 by default,
         * we just omit changing those entries.
         */
        if (df(j) >= minDocFreq) { //若是df大於設定的值,就計算idf的值,若是不大於的話,就直接設置爲0
          inv(j) = math.log((m + 1.0) / (df(j) + 1.0))
        }
        j += 1
      }
      Vectors.dense(inv) //返回idf 密集向量
    }
  }
}

第二步:經過上面的計算獲得idf向量,剩下的工做就是計算 tf*idf了,會用到IDFMode類中的transform方法 val tfidf: RDD[Vector] = idf.transform(tf)

private object IDFModel {

  /**
   * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector
   *
   * @param idf an IDF vector
   * @param v a term frequence vector
   * @return a TF-IDF vector
   */
  def transform(idf: Vector, v: Vector): Vector = {
    val n = v.size
    v match {
    //會進入這個case
      case SparseVector(size, indices, values) =>
        val nnz = indices.size
        val newValues = new Array[Double](nnz)
        var k = 0
        while (k < nnz) {
          newValues(k) = values(k) * idf(indices(k)) //計算tf*idf
          k += 1
        }
        Vectors.sparse(n, indices, newValues) //TFIDF向量
      case DenseVector(values) =>
        val newValues = new Array[Double](n)
        var j = 0
        while (j < n) {
          newValues(j) = values(j) * idf(j)
          j += 1
        }
        Vectors.dense(newValues)
      case other =>
        throw new UnsupportedOperationException(
          s"Only sparse and dense vectors are supported but got ${other.getClass}.")
    }
  }
}

以上就是整個TFIDF的計算過程,用到Spark Mlib 的密集向量(DenseVector)和稀疏向量(SparseVector) 、RDD的聚合操做

主要相關的類有三個:HashingTF 、IDF、IDFModel 

還有就是利用spark Mlib 的TFIDF生成的TFIDF向量,位置信息存是詞hash後和向量維度取模後的值,而不是該詞,在後面作一些分類,或者文本推薦的時候,若是須要用到詞自己,還須要作調整

 

相關文章
相關標籤/搜索