不管是ICF基於物品的協同過濾、UCF基於用戶的協同過濾、基於內容的推薦,最基本的環節都是計算類似度。若是樣本特徵維度很高或者<user, item, score>的維度很大,都會致使沒法直接計算。設想一下100w*100w的二維矩陣,計算類似度怎麼算?html
更多內容參考——個人大數據學習之路——xingoo算法
在spark中RowMatrix提供了一種並行計算類似度的思路,下面就來看看其中的奧妙吧!sql
類似度有不少種,每一種適合的場景都不太同樣。好比:apache
在Spark中使用的是夾角餘弦,爲何選這個,道理就在下面!數組
上面兩個向量
\[ \left( { x }_{ 1 },{ y }_{ 1 } \right) \]
和
\[ \left( { x }_{ 2 },{ y }_{ 2 } \right) \]
計算其夾角的餘弦值就是兩個向量方向的類似度。app
公式爲:
\[ cos(\theta )=\frac { a\cdot b }{ ||a||\ast ||b|| } \\ =\quad \frac { { x }_{ 1 }\ast { x }_{ 2 }\quad +\quad { y }_{ 1 }\ast y_{ 2 } }{ \sqrt { { x }_{ 1 }^{ 2 }+{ x }_{ 2 }^{ 2 } } \ast \sqrt { { y }_{ 1 }^{ 2 }+{ y }_{ 2 }^{ 2 } } } \]dom
其中,\(||a||\)表示a的模,即每一項的平方和再開方。ide
那麼若是向量不僅是兩維,而是n維呢?好比有兩個向量:
\[ 第一個向量:({x}_{1}, {x}_{2}, {x}_{3}, ..., {x}_{n})\\ 第二個向量:({y}_{1}, {y}_{2}, {y}_{3}, ..., {y}_{n}) \]
他們的類似度計算方法套用上面的公式爲:
\[ cos(\theta )\quad =\quad \frac { \sum _{ i=1 }^{ n }{ ({ x }_{ i }\ast { y }_{ i }) } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 }\ast { y }_{ 1 }+{ x }_{ 2 }\ast { y }_{ 2 }+...+{ x }_{ n }\ast { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 }\ast { y }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +\frac { { x }_{ 2 }\ast { y }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +...+\frac { { x }_{ n }\ast { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +\frac { { x }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +...+\frac { { x }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \]學習
經過上面的公式就能夠發現,夾角餘弦能夠拆解成每一項與另外一項對應位置的乘積\({ x }_{ 1 }\ast { y }_{ 1 }\),再除以每一個向量本身的
\[ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \]
就能夠了。大數據
畫個圖看看,首先建立下面的矩陣:
注意,矩陣裏面都是一列表明一個向量....上面是建立矩陣時的三元組,若是在spark中想要建立matrix,能夠這樣:
val df = spark.createDataFrame(Seq( (0, 0, 1.0), (1, 0, 1.0), (2, 0, 1.0), (3, 0, 1.0), (0, 1, 2.0), (1, 1, 2.0), (2, 1, 1.0), (3, 1, 1.0), (0, 2, 3.0), (1, 2, 3.0), (2, 2, 3.0), (0, 3, 1.0), (1, 3, 1.0), (3, 3, 4.0) )) val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)
而後計算每個向量的normL2,即平方和開根號。
以第一個和第二個向量計算爲例,第一個向量爲(1,1,1,1),第二個向量爲(2,2,1,1),每一項除以對應的normL2,獲得後面的兩個向量:
\[ 0.5*0.63+0.5*0.63+0.5*0.31+0.5*0.31 \approx 0.94 \]
兩個向量最終的類似度爲0.94。
那麼在Spark如何快速並行處理呢?經過上面的例子,能夠看到兩個向量的類似度,須要把每一維度乘積後相加,可是一個向量通常都是跨RDD保存的,因此能夠先計算全部向量的第一維,得出結果
\[ (向量1的第1維,向量2的第1維,value)\\ (向量1的第2維,向量2的第2維,value)\\ ...\\ (向量1的第n維,向量2的第n維,value)\\ (向量1的第1維,向量3的第1維,value)\\ ..\\ (向量1的第n維,向量3的第n維,value)\\ \]
最後對作一次reduceByKey累加結果便可.....
首先建立dataframe造成matrix:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.sql.SparkSession object MatrixSimTest { def main(args: Array[String]): Unit = { // 建立dataframe,轉換成matrix val spark = SparkSession.builder().master("local[*]").appName("sim").getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val df = spark.createDataFrame(Seq( (0, 0, 1.0), (1, 0, 1.0), (2, 0, 1.0), (3, 0, 1.0), (0, 1, 2.0), (1, 1, 2.0), (2, 1, 1.0), (3, 1, 1.0), (0, 2, 3.0), (1, 2, 3.0), (2, 2, 3.0), (0, 3, 1.0), (1, 3, 1.0), (3, 3, 4.0) )) val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD) // 調用sim方法 val x = matrix.toRowMatrix().columnSimilarities() // 獲得類似度結果 x.entries.collect().foreach(println) } }
獲得的結果爲:
MatrixEntry(0,3,0.7071067811865476) MatrixEntry(0,2,0.8660254037844386) MatrixEntry(2,3,0.2721655269759087) MatrixEntry(0,1,0.9486832980505139) MatrixEntry(1,2,0.9128709291752768) MatrixEntry(1,3,0.596284793999944)
直接進入columnSimilarities方法看看是怎麼個流程吧!
def columnSimilarities(): CoordinateMatrix = { columnSimilarities(0.0) }
內部調用了帶閾值的類似度方法,這裏的閾值是指類似度小於該值時,輸出結果時,會自動過濾掉。
def columnSimilarities(threshold: Double): CoordinateMatrix = { //檢查參數... val gamma = if (threshold < 1e-6) { Double.PositiveInfinity } else { 10 * math.log(numCols()) / threshold } columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) }
這裏的gamma用於採樣,具體的作法我們來繼續看源碼。而後看一下computeColumnSummaryStatistics().normL2.toArray
這個方法:
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = { val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)( (aggregator, data) => aggregator.add(data), (aggregator1, aggregator2) => aggregator1.merge(aggregator2)) updateNumRows(summary.count) summary }
以前有介紹這個treeAggregate是一種帶「預reduce」的map-reduce,返回的summary,裏面幫咱們統計了每個向量的不少指標,好比
currMean 爲 每個向量的平均值 currM2 爲 每一個向量的每一維的平方和 currL1 爲 每一個向量的絕對值的和 currMax 爲 每一個向量的最大值 currMin 爲 每一個向量的最小值 nnz 爲 每一個向量的非0個數
這裏咱們只須要currM2,它是每一個向量的平方和。summary調用的normL2方法:
override def normL2: Vector = { require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") val realMagnitude = Array.ofDim[Double](n) var i = 0 val len = currM2.length while (i < len) { realMagnitude(i) = math.sqrt(currM2(i)) i += 1 } Vectors.dense(realMagnitude) }
上面這步就是對平方和開個根號,這樣就求出來了每一個向量的分母部分。
下面就是最關鍵的地方了:
private[mllib] def columnSimilaritiesDIMSUM( colMags: Array[Double], gamma: Double): CoordinateMatrix = { // 一些參數校驗 // 對gamma進行開方 val sg = math.sqrt(gamma) // sqrt(gamma) used many times // 這裏把前面算的平方根的值設置一個默認值,由於若是爲0,除0會報異常,因此設置爲1 val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x) // 把抽樣機率數組 和 平方根數組進行廣播 val sc = rows.context val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c)) val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c))) // 遍歷每一行,計算每一個向量該維的乘積,造成三元組 val sims = rows.mapPartitionsWithIndex { (indx, iter) => val p = pBV.value val q = qBV.value // 得到隨機值 val rand = new XORShiftRandom(indx) val scaled = new Array[Double](p.size) iter.flatMap { row => row match { case SparseVector(size, indices, values) => // 若是是稀疏向量,遍歷向量的每一維,除以平方根 val nnz = indices.size var k = 0 while (k < nnz) { scaled(k) = values(k) / q(indices(k)) k += 1 } // 遍歷向量數組,計算每個數值與其餘數值的伺機。 // 好比向量(1, 2, 0 ,1) // 獲得的結果爲 (0,1,value)(0,3,value)(2,3,value) Iterator.tabulate (nnz) { k => val buf = new ListBuffer[((Int, Int), Double)]() val i = indices(k) val iVal = scaled(k) // 判斷當前列是否符合採樣範圍,若是小於採樣值,就忽略 if (iVal != 0 && rand.nextDouble() < p(i)) { var l = k + 1 while (l < nnz) { val j = indices(l) val jVal = scaled(l) if (jVal != 0 && rand.nextDouble() < p(j)) { // 計算每一維與其餘維的值 buf += (((i, j), iVal * jVal)) } l += 1 } } buf }.flatten case DenseVector(values) => // 跟稀疏同理 val n = values.size var i = 0 while (i < n) { scaled(i) = values(i) / q(i) i += 1 } Iterator.tabulate (n) { i => val buf = new ListBuffer[((Int, Int), Double)]() val iVal = scaled(i) if (iVal != 0 && rand.nextDouble() < p(i)) { var j = i + 1 while (j < n) { val jVal = scaled(j) if (jVal != 0 && rand.nextDouble() < p(j)) { buf += (((i, j), iVal * jVal)) } j += 1 } } buf }.flatten } } // 最後再執行一個reduceBykey,累加全部的值,就是i和j的類似度 }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i.toLong, j.toLong, sim) } new CoordinateMatrix(sims, numCols(), numCols()) }
這樣把全部向量的平方和廣播後,每一行均可以在不一樣的節點並行處理了。
總結來講,Spark提供的這個計算類似度的方法有兩點優點:
不過傑卡德目前並不能使用這種方法來計算,由於傑卡德中間有一項須要對向量求dot,這種方式就不適合了;若是傑卡德想要快速計算,能夠去參考LSH局部敏感哈希算法,這裏就不詳細說明了。