前一篇:Spark數據挖掘-基於 LSA 隱層語義分析理解APP描述信息(1)apache
Spark 經過調用 RowMatrix 的 computeSVD 方法會獲得三個重要的矩陣 U、S、V , 並且:原始矩陣 近似等於 U * S * V
它們含義分別以下:微信
經過這個文檔,首先想到的是文檔中最重要的概念是什麼?概念每每對應話題,這樣基本就能肯定文檔的主題了,而後每一個主題經過V矩陣能夠獲得重要的詞,這樣就能夠給文檔添加標籤了,可是其實能夠走的更遠,本文將重點研究如何使用這兩個矩陣,這裏的用途很容易推廣到LDA模型,LDA 模型獲得 phi(詞與topic關係矩陣) 和 theta(文檔與topic的關係矩陣) 兩個矩陣以後也能夠幹這些事。接下來主要嘗試回答下面三個問題:機器學習
其實從最原始的詞文檔矩陣能夠獲得上面這些問題粗淺的答案:好比詞與詞的重要程度能夠計算詞文檔矩陣中對應列之間的餘弦類似性。餘弦類似度計算的是高維文檔空間中兩個點之間向量的夾角,越相同方向的點認爲類似度越高。餘弦類似度計算的就是兩個向量的點積除以兩個向量長度的乘積。通用的計算行之間的餘弦距離就獲得了文檔與文檔的類似度。而詞與文檔的重要程度就對應該矩陣中這兩個對象交叉的位置。
然而這些重要性得分都是很粗淺的,由於這些都是來自於對文檔中詞簡單計數統計,根本沒有考慮詞之間的語義關係等。分佈式
LSA 提供更深刻的理解語料庫的得分矩陣。基於這些矩陣能夠獲得更加有深度的結論。例如:一些文件只出現「新聞」,可是不出現「資訊」,而另一下文章恰好相反,可是它們可能會經過「閱讀」這個詞聯繫在一塊兒。
LSA 這種表示的方式從效率的角度來看也更有優點。LSA 將重要的信息壓縮到低維空間來代替原始的詞文檔矩陣。這樣使得不少計算更加快。由於計算原始詞文檔矩陣中詞與其餘詞的重要程度須要的時間複雜度正比於單詞的數量乘以文檔的數量。LSA 能夠經過將概念空間表示映射到詞空間達到通用的效果,時間複雜度正比於單詞的數量乘以概念的個數 K。數據之間的相關性經過這種低秩近似從新編碼從而使得不須要訪問整個語料庫。學習
LSA 是如何理解詞與詞之間的距離的呢?其實經過將 SVD 奇異值分解獲得的三個矩陣相乘就會獲得原始矩陣的一個近似,那麼這個近似矩陣列之間的餘弦距離就是原始的詞距離的一個近似,只不過如今有下面的三點優化:大數據
這樣就會使得詞之間的距離更加合理。幸運的是不須要從新將三個因子矩陣相乘再去計算詞之間的類似性,線性代數已經證實:相乘以後的矩陣列與列之間的餘弦就等於 St(V):(t 表示轉置)這個矩陣對應列之間的餘弦。
考慮給定一個詞計算最相關的特定詞這個任務:因爲S是對角矩陣,那麼S的轉置就等於S,那麼S*t(V)的列就變爲了VS的行。經過對VS的每一行長度歸一化,而後將VS乘以給定詞對應的列轉變的列向量就獲得了這個詞與每一個詞之間的餘弦。具體代碼以下:優化
import breeze.linalg.{DenseVector => BDenseVector} import breeze.linalg.{DenseMatrix => BDenseMatrix} def topTermsForTerm( normalizedVS: BDenseMatrix[Double], termId: Int): Seq[(Double, Int)] = { //獲得termId對應的行 val rowVec = new BDenseVector[Double]( row(normalizedVS, termId).toArray) //將VS歸一化的矩陣乘上面對應的行,獲得該term與每一個單詞的餘弦距離 val termScores = (normalizedVS * rowVec).toArray.zipWithIndex termScores.sortBy(-_._1).take(10) } //計算 VS val VS = multiplyByDiagonalMatrix(svd.V, svd.s) //將 VS 的行歸一化 val normalizedVS = rowsNormalized(VS) def printRelevantTerms(term: String) { val id = idTerms(term) printIdWeights(topTermsForTerm(normalizedVS, id, termIds) }
利用上面的代碼查詢了和「銀行」相關的詞,結果以下:編碼
銀行:1.0000000000000007 農商:0.5731472845623417 浦發:0.5582996267582955 靈犀:0.5546113928156614 烏海:0.5181220508630512 郵政:0.49403542009285284 花旗:0.4767076670441433 渣打:0.4646580481689233 通暢:0.46282711600593196 縫隙:0.4500830196782121
語料庫不足是致使效果通常的最大問題。spa
文檔與文檔的相關性與詞與詞之間的相關性思路徹底同樣,只不過此次用的矩陣是U,U是分佈式存儲的,因此代碼有點不一樣:.net
import org.apache.spark.mllib.linalg.Matrices def topDocsForDoc(normalizedUS: RowMatrix, docId: Long) : Seq[(Double, Long)] = { val docRowArr = row(normalizedUS, docId) val docRowVec = Matrices.dense(docRowArr.length, 1, docRowArr) val docScores = normalizedUS.multiply(docRowVec) val allDocWeights = docScores.rows.map(_.toArray(0)). zipWithUniqueId() allDocWeights.filter(!_._1.isNaN).top(10) } val US = multiplyByDiagonalMatrix(svd.U, svd.s) val normalizedUS = rowsNormalized(US) def printRelevantDocs(doc: String) { val id = idDocs(doc) printIdWeights(topDocsForDoc(normalizedUS, id, docIds) }
一樣的道理,詞文檔之間的相關性也是經過 USV 這個矩陣中的每一個位置的元素去近似的,好比詞 t 與文檔 d 的關係就是: U(d) * S * V(t),根據線性代數的基本理論,能夠很容易獲得詞 t 與全部文檔的關係爲:U * S * V(t) 或者文檔 d 與全部詞的關係爲:U(d) * S * V。這樣就很容易知道與某個文檔最相關的前幾個詞,以及與某個詞最相關的文檔。具體的應用代碼以下:
def topDocsForTerm(US: RowMatrix, V: Matrix, termId: Int) : Seq[(Double, Long)] = { //獲得詞對應的行 val rowArr = row(V, termId).toArray //將改行轉爲列向量 val rowVec = Matrices.dense(termRowArr.length, 1, termRowArr) //計算 US 乘以列向量 val docScores = US.multiply(termRowVec) //獲得全部文檔與詞之間的關係的得分 val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId() //選擇最重要的10篇文檔 allDocWeights.top(10) } //打印結果 def printRelevantDocs(term: String) { val id = idTerms(term) printIdWeights(topDocsForTerm(normalizedUS, svd.V, id, docIds) }
查詢一個詞,至關於上面的詞與文檔的關係,其實就是將 V 轉置以後乘以一個長度爲詞向量長並且只有一個元素爲 1 的列向量,值爲 1 的位置對應的就是該詞的位置,而多個詞,那麼就經過乘以一個長度爲詞向量長並且對應查詢詞位置都是該詞的idf權重其餘位置爲0的列向量便可。具體代碼以下:
import breeze.linalg.{SparseVector => BSparseVector} //獲得查詢詞對應位置爲idf權重,其餘位置爲0的向量 def termsToQueryVector( terms: Seq[String], idTerms: Map[String, Int], idfs: Map[String, Double]): BSparseVector[Double] = { //先獲得查詢詞在整個詞向量的下標索引位置 val indices = terms.map(idTerms(_)).toArray //將對應位置的idf權重找出來 val values = terms.map(idfs(_)).toArray //轉爲向量 長度爲詞向量長度,不少位置的值爲零,查詢詞位置值爲 idf 權重 new BSparseVector[Double](indices, values, idTerms.size) } //獲得 US*t(t(V)*上面方法獲得的向量) def topDocsForTermQuery( US: RowMatrix, V: Matrix, query: BSparseVector[Double]): Seq[(Double, Long)] = { val breezeV = new BDenseMatrix[Double](V.numRows, V.numCols, V.toArray) //計算 t(V)*上面方法獲得的向量 val termRowArr = (breezeV.t * query).toArray //獲得 t(t(V)*上面方法獲得的向量) val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr) //計算 US*t(t(V)*上面方法獲得的向量) val docScores = US.multiply(termRowVec) val allDocWeights = docScores.rows.map(_.toArray(0)). zipWithUniqueId() allDocWeights.top(10) } def printRelevantDocs(terms: Seq[String]) { val queryVec = termsToQueryVector(terms, idTerms, idfs) printIdWeights(topDocsForTermQuery(US, svd.V, queryVec), docIds) }
上面代碼中用到一些輔助的方法,由於比較簡單就不詳細分析,這裏簡單作一個彙總:
def row(normalizedVS: DenseMatrix[Double], termId: Int) = { (0 until normalizedVS.cols).map(i => normalizedVS(termId, i)) } def multiplyByDiagonalMatrix(mat: Matrix, s: Vector) = { val sArr = s.toArray new BDenseMatrix[Double](mat.numRows, mat.numCols, mat.toArray) .mapPairs{case ((r, c), v) => v * sArr(c)} } def rowsNormalized(bm: BDenseMatrix[Double]) = { val newMat = new BDenseMatrix[Double](bm.rows, bm.cols) for (r <- 0 until bm.rows) { val len = math.sqrt((0 until bm.cols).map{c => math.pow(bm(r, c), 2)}.sum) (0 until bm.cols).foreach{c => newMat.update(r, c, bm(r, c)/len)} } newMat }
歡迎關注本人微信公衆號,會定時發送關於大數據、機器學習、Java、Linux 等技術的學習文章,並且是一個系列一個系列的發佈,無任何廣告,純屬我的興趣。