Gaussian Mixture Model(GMM)是一個很流行的聚類算法。它與K-Means的很像,可是K-Means的計算結果是算出每一個數據點所屬的簇,而GMM是計算出這些數據點分配到各個類別的機率。與K-Means對比K-Means存在一些缺點,好比K-Means的聚類結果易受樣本中的一些極值點影響。此外GMM的計算結果因爲是得出一個機率,得出一個機率包含的信息量要比簡單的一個結果多,對於49%和51%的發生的事件若是僅僅使用簡單的50%做爲閾值來分爲兩個類別是很是危險的。
Gaussian Mixture Model,顧名思義,它是假設數據服從高斯混合分佈,或者說是從多個高斯分佈中生成出來的。每一個GMM由K個高斯分佈組成,每一個高斯分佈稱爲一個"Component",這些Component線性加在一塊兒就組成了GMM的機率密度函數:算法
使用GMM作聚類的方法,咱們先使用R等工具採樣數據繪出數據點分佈的圖觀察是否符合高斯混合分佈,或者直接假設咱們的數據是符合高斯混合分佈的,以後根據數據推算出GMM的機率分佈,對應的每一個高斯分佈就是每一個類別,由於咱們已知(假設)了機率密度分佈的形式,要去求出其中參數,因此是一個參數估計的過程,咱們要推導出每一個混合成分的參數(均值向量mu,協方差矩陣sigma,權重weight),高斯混合模型在訓練時使用了極大似然估計法,最大化如下對數似然函數:
apache
該式沒法直接解析求解,所以採用了指望-最大化方法(Expectation-Maximization,EM)方法求解,具體步驟以下:
1.根據給定的K值,初始化K個多元高斯分佈以及其權重;
2.根據貝葉斯定理,估計每一個樣本由每一個成分生成的後驗機率;(EM方法中的E步)
3.根據均值,協方差的定義以及2步求出的後驗機率,更新均值向量、協方差矩陣和權重;(EM方法的M步)重複2~3步,直到似然函數增長值已小於收斂閾值,或達到最大迭代次數
接下來進行模型的訓練與分析,咱們採用了mllib包封裝的GMM算法,具體代碼以下數組
package com.xj.da.gmm import breeze.linalg.DenseVector import breeze.numerics.sqrt import org.apache.commons.math.stat.correlation.Covariance import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel} import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vectors} import org.apache.spark.mllib.stat.distribution.MultivariateGaussian import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer /** * author : kongcong * number : 27 * date : 2017/7/19 */ object GMMWithMultivariate { def main(args: Array[String]): Unit = { val conf = new SparkConf() //.setMaster("local") .setAppName("GMMWithMultivariate") val sc = new SparkContext(conf) val rawData: RDD[String] = sc.textFile("hdfs://master:8020/home/kongc/data/query_result.csv") //val rawData: RDD[String] = sc.textFile("data/query_result.csv") println("count: " + rawData.count()) //println(rawData.count()) // col1, col2, status val data: RDD[linalg.Vector] = rawData.map { line => val raw: Array[String] = line.split(",") Vectors.dense(raw(0).toDouble, raw(1).toDouble, raw(4).toDouble) } // data.collect().take(10).foreach(println(_)) // col1, col2, status val trainData: RDD[linalg.Vector] = rawData.map { line => val raw: Array[String] = line.split(",") Vectors.dense(raw(0).toDouble, raw(1).toDouble) } // trainData.collect().take(10).foreach(println(_)) // 指定初始模型 // 0 val filter0: RDD[linalg.Vector] = data.filter(_.toArray(2) == 0) println(filter0.count()) //23195 // 1 val filter1: RDD[linalg.Vector] = data.filter(_.toArray(2) == 1) println(filter1.count()) //14602 val w1: Double = (filter0.count()/319377.toDouble) val w2: Double = (filter1.count()/319377.toDouble) println(s"w1 = $w1") // 均值 val m0x: Double = filter0.map(_.toArray(0)).mean() val m0y: Double = filter0.map(_.toArray(1)).mean() val m1x: Double = filter1.map(_.toArray(0)).mean() val m1y: Double = filter1.map(_.toArray(1)).mean() // 方差 val vx0: Double = filter0.map(_.toArray(0)).variance() val vy0: Double = filter0.map(_.toArray(1)).variance() val vx1: Double = filter1.map(_.toArray(0)).variance() val vy1: Double = filter1.map(_.toArray(1)).variance() // 均值向量 val mu1: linalg.Vector = Vectors.dense(Array(m0x, m0y)) val mu2: linalg.Vector = Vectors.dense(Array(m1x, m1y)) println(s"mu1 : $mu1") println(s"mu2 : $mu2") val array: RDD[Array[Double]] = rawData.map { line => val raw: Array[String] = line.split(",") Array(raw(0).toDouble, raw(1).toDouble, raw(4).toDouble) } val f0: RDD[Array[Double]] = array.filter(_(2) == 0) val f1: RDD[Array[Double]] = array.filter(_(2) == 1) println("f0.count:"+f0.count()) println("f1.count:"+f1.count()) // 0 x,y求協方差矩陣 val x0: RDD[Double] = f0.map(_(0)) val y0: RDD[Double] = f0.map(_(1)) //println(x0.collect().length == y0.collect().length) // 1 x,y求協方差矩陣 val x1: RDD[Double] = f1.map(_(0)) val y1: RDD[Double] = f1.map(_(1)) val ma0: Array[Array[Double]] = Array(x0.collect(),y0.collect()) val ma1: Array[Array[Double]] = Array(x1.collect(),y1.collect()) val r0: RDD[Array[Double]] = sc.parallelize(ma0) val r1: RDD[Array[Double]] = sc.parallelize(ma1) val rdd0: RDD[linalg.Vector] = r0.map(f => Vectors.dense(f)) val rdd1: RDD[linalg.Vector] = r1.map(f => Vectors.dense(f)) val RM0: RowMatrix = new RowMatrix(rdd0) val RM1: RowMatrix = new RowMatrix(rdd1) // 計算協方差矩陣 //println(RM0.computeCovariance().numCols) /*val i: Double = DenseVector(1.0, 2.0, 3.0, 4.0) dot DenseVector(1.0, 1.0, 1.0, 1.0) val c0yx: Double = i - m0x * m0y*/ val c0yx: Double = DenseVector(x0.collect()) dot DenseVector(y0.collect()) - m0x * m0y val c1yx: Double = DenseVector(x1.collect()) dot DenseVector(y1.collect()) - m1x * m1y //cov(Vectors.dense(x0.collect()),Vectors.dense(y0.collect())) val sigma1 = Matrices.dense(2, 2, Array(vx0, c0yx, c0yx, vy0)) val sigma2 = Matrices.dense(2, 2, Array(vx1, c1yx, c1yx, vy1)) val gmm1 = new MultivariateGaussian(mu1, sigma1) val gmm2 = new MultivariateGaussian(mu2, sigma2) val gaussians = Array(gmm1, gmm2) // 構建一個GaussianMixtureModel須要兩個參數 一個是權重數組 一個是組成混合高斯分佈的每一個高斯分佈 val initModel = new GaussianMixtureModel(Array(w1, w2), gaussians) for (i <- 0 until initModel.k) { println("weight=%f\nmu=%s\nsigma=\n%s\n" format (initModel.weights(i), initModel.gaussians(i).mu, initModel.gaussians(i).sigma)) } val gaussianMixture = new GaussianMixture() val mixtureModel = gaussianMixture .setInitialModel(initModel) .setK(2) .setConvergenceTol(0.0001) .run(trainData) val predict: RDD[Int] = mixtureModel.predict(trainData) rawData.zip(predict).saveAsTextFile("hdfs://master:8020/home/kongc/data/out/gmm/predict2") for (i <- 0 until mixtureModel.k) { println("weight=%f\nmu=%s\nsigma=\n%s\n" format (mixtureModel.weights(i), mixtureModel.gaussians(i).mu, mixtureModel.gaussians(i).sigma)) } } }
參考:http://blog.pluskid.org/?p=39函數
http://dblab.xmu.edu.cn/blog/1456/工具