在提取文本特徵時,常常用到TF-IDF算法。Spark Mlib實現了該算法。下面是Spark Mlib中,TF_IDF算法調用的一個實例:html
def main(args:Array[String]){ val sc: SparkContext = null // Load documents (one per line). val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq) val hashingTF = new HashingTF() //計算tf val tf: RDD[Vector] = hashingTF.transform(documents) tf.cache() //獲得idfModel對象 val idf = new IDF().fit(tf) //獲得tf-idf值 val tfidf: RDD[Vector] = idf.transform(tf)
要求輸入數據 必須是一行一篇文章(切過詞的),Spark Mlib中沒有提供切詞的工具,但給出了建議使用的切詞工具 Stanford NLP Group and scalanlp/chalkgit
一、TF源碼詳讀github
在調用的代碼中,咱們找到算法
val hashingTF = new HashingTF() //計算tf val tf: RDD[Vector] = hashingTF.transform(documents)
獲取TF,主要是經過HashingTF類的 transform方法,跟蹤該方法apache
/** * Transforms the input document to term frequency vectors. */ @Since("1.1.0") def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = { dataset.map(this.transform) }
SparkMlib是基於RDD的,因此在看源碼前,必需要對RDD熟悉。再看 dataset.map(this.transform)中的transform方法:api
/** * Transforms the input document into a sparse term frequency vector. */ @Since("1.1.0") def transform(document: Iterable[_]): Vector = { //定義詞頻的map val termFrequencies = mutable.HashMap.empty[Int, Double] //循環每篇文章裏的每一個詞 document.foreach { term => //獲取詞項term對應的向量位置 val i = indexOf(term) //i即表明這個詞,統計次數放入termFrequencies termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0) } //將詞特徵映射到一個很大維度的向量中去 稀疏向量 numFeatures是類HashingTF的成員變量 能夠在調用HashingTF傳入,若是沒有傳入,默認爲2的20次方 Vectors.sparse(numFeatures, termFrequencies.toSeq) }
transform方法對每一行(即每篇文章)都會執行一次,主要是計算每篇文章裏的詞的詞頻,轉存入一個維度很大的稀疏向量中,每一個詞在該向量中對應的位置就是:工具
@Since("1.1.0") def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)
term.##至關於hashcode(),獲得每一個詞的hash值,而後對numFeatures 取模,是個Int型的值this
到此爲止,TF就計算完了,最終的結果是一個存放詞的位置,以及該詞對應詞頻的 向量,即SparseVector(size, indices, values)spa
二、IDF源碼詳讀 scala
//獲得idfModel對象 輸入的tf類型是SparseVector(size, indices, values) val idf = new IDF().fit(tf) //獲得tf-idf值 val tfidf: RDD[Vector] = idf.transform(tf)
IDF實現主要經過兩步:
第一步: val idf = new IDF().fit(tf)
/** * Computes the inverse document frequency. * @param dataset an RDD of term frequency vectors */ @Since("1.1.0") def fit(dataset: RDD[Vector]): IDFModel = { //返回 IDF向量 類型是DenseVector(values) val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator( minDocFreq = minDocFreq))(///minDocFreq是詞最小出現頻率,不填是默認0 seqOp = (df,v) => df.add(v),//計算 combOp = (df1, df2) => df1.merge(df2)//合併 ).idf() new IDFModel(idf) }
上面treeAggregate方法原型是def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) =>U, depth: Int = 2): U
treeAggregate是使用mapPartition進行計算的,需定義兩個操做符,一個用來計算,一個用來合併結果
seqOp 用來計算分區結果的操做符 (an operator used to accumulate results within a partition)
combOp 用來組合來自不一樣分區結果的關聯操做符( an associative operator used to combine results from different partitions)
該方法的調用返回new IDF.DocumentFrequencyAggregator對象,接着又調用DocumentFrequencyAggregator的idf方法,返回idf向量,而後又經過new IDFModel(idf)返回IDFModel對象
下面是 DocumentFrequencyAggregator 類的方法,即一個add(seqOp)一個merge(combOp)
private object IDF { /** Document frequency aggregator. */ class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable { /** number of documents 文檔總數量*/ private var m = 0L /** document frequency vector df向量,詞在出現過的文檔個數*/ private var df: BDV[Long] = _ def this() = this(0) //構造方法,若是minDocFreq沒有傳入的話,默認值爲0 /** Adds a new document. 這個地方就是執行的每一個分區裏的計算操做 ,輸入是tf向量*/ def add(doc: Vector): this.type = { if (isEmpty) { df = BDV.zeros(doc.size) } doc match { //tf向量是 SparseVector 因此會走這個case case SparseVector(size, indices, values) => val nnz = indices.size var k = 0 while (k < nnz) { if (values(k) > 0) { df(indices(k)) += 1L //若是詞在文章中出的頻率大於0,則該詞的df+1 } k += 1 } case DenseVector(values) => val n = values.size var j = 0 while (j < n) { if (values(j) > 0.0) { df(j) += 1L } j += 1 } case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") } m += 1L this } /** Merges another. 這個地方就是執行全部分區的合併操做*/ def merge(other: DocumentFrequencyAggregator): this.type = { if (!other.isEmpty) { m += other.m //總文檔數合併 if (df == null) { df = other.df.copy } else { df += other.df //df向量合併 } } this } private def isEmpty: Boolean = m == 0L /** Returns the current IDF vector. 計算idf向量的方法 */ def idf(): Vector = { if (isEmpty) { throw new IllegalStateException("Haven't seen any document yet.") } val n = df.length val inv = new Array[Double](n) var j = 0 while (j < n) { /* * If the term is not present in the minimum * number of documents, set IDF to 0. This * will cause multiplication in IDFModel to * set TF-IDF to 0. * * Since arrays are initialized to 0 by default, * we just omit changing those entries. */ if (df(j) >= minDocFreq) { //若是df大於設定的值,就計算idf的值,若是不大於的話,就直接設置爲0 inv(j) = math.log((m + 1.0) / (df(j) + 1.0)) } j += 1 } Vectors.dense(inv) //返回idf 密集向量 } } }
第二步:經過上面的計算獲得idf向量,剩下的工做就是計算 tf*idf了,會用到IDFMode類中的transform方法 val tfidf: RDD[Vector] = idf.transform(tf)
private object IDFModel { /** * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector * * @param idf an IDF vector * @param v a term frequence vector * @return a TF-IDF vector */ def transform(idf: Vector, v: Vector): Vector = { val n = v.size v match { //會進入這個case case SparseVector(size, indices, values) => val nnz = indices.size val newValues = new Array[Double](nnz) var k = 0 while (k < nnz) { newValues(k) = values(k) * idf(indices(k)) //計算tf*idf k += 1 } Vectors.sparse(n, indices, newValues) //TFIDF向量 case DenseVector(values) => val newValues = new Array[Double](n) var j = 0 while (j < n) { newValues(j) = values(j) * idf(j) j += 1 } Vectors.dense(newValues) case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") } } }
以上就是整個TFIDF的計算過程,用到Spark Mlib 的密集向量(DenseVector)和稀疏向量(SparseVector) 、RDD的聚合操做
主要相關的類有三個:HashingTF 、IDF、IDFModel
還有就是利用spark Mlib 的TFIDF生成的TFIDF向量,位置信息存是詞hash後和向量維度取模後的值,而不是該詞,在後面作一些分類,或者文本推薦的時候,若是須要用到詞自己,還須要作調整