package org.apache.spark.mllib.classification import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} import org.apache.spark.{SparkException, Logging} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD /** * Model for Naive Bayes Classifiers. * * @param labels list of labels * @param pi log of class priors, whose dimension is C, number of labels * @param theta log of class conditional probabilities, whose dimension is C-by-D, * where D is number of features */ class NaiveBayesModel private[mllib] ( val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { private val brzPi = new BDV[Double](pi) private val brzTheta = new BDM[Double](theta.length, theta(0).length) { // Need to put an extra pair of braces to prevent Scala treating `i` as a member. var i = 0 while (i < theta.length) { var j = 0 while (j < theta(i).length) { brzTheta(i, j) = theta(i)(j) j += 1 } i += 1 } } override def predict(testData: RDD[Vector]): RDD[Double] = { val bcModel = testData.context.broadcast(this) testData.mapPartitions { iter => val model = bcModel.value iter.map(model.predict) } } override def predict(testData: Vector): Double = { labels(brzArgmax(brzPi + brzTheta * testData.toBreeze)) } } /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The input feature values must be nonnegative. */ class NaiveBayes private (private var lambda: Double) extends Serializable with Logging { def this() = this(1.0) /** Set the smoothing parameter. Default: 1.0. */ def setLambda(lambda: Double): NaiveBayes = { this.lambda = lambda this } /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. */ def run(data: RDD[LabeledPoint]) = { val requireNonnegativeValues: Vector => Unit = (v: Vector) => { val values = v match { case sv: SparseVector => sv.values case dv: DenseVector => dv.values } if (!values.forall(_ >= 0.0)) { throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") } } // Aggregates term frequencies per label. // TODO: Calling combineByKey and collect creates two stages, we can implement something // TODO: similar to reduceByKeyLocally to save one stage. val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])]( createCombiner = (v: Vector) => { requireNonnegativeValues(v) (1L, v.toBreeze.toDenseVector) }, mergeValue = (c: (Long, BDV[Double]), v: Vector) => { requireNonnegativeValues(v) (c._1 + 1L, c._2 += v.toBreeze) }, mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) => (c1._1 + c2._1, c1._2 += c2._2) ).collect() val numLabels = aggregated.length var numDocuments = 0L aggregated.foreach { case (_, (n, _)) => numDocuments += n } val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } val labels = new Array[Double](numLabels) val pi = new Array[Double](numLabels) val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) val piLogDenom = math.log(numDocuments + numLabels * lambda) var i = 0 aggregated.foreach { case (label, (n, sumTermFreqs)) => labels(i) = label val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda) pi(i) = math.log(n + lambda) - piLogDenom var j = 0 while (j < numFeatures) { theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom j += 1 } i += 1 } new NaiveBayesModel(labels, pi, theta) } } /** * Top-level methods for calling naive Bayes. */ object NaiveBayes { /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * * This version of the method uses a default smoothing parameter of 1.0. * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. */ def train(input: RDD[LabeledPoint]): NaiveBayesModel = { new NaiveBayes().run(input) } /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. * @param lambda The smoothing parameter */ def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } }
package org.apache.spark.mllib.classification import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD /** * :: Experimental :: * Represents a classification model that predicts to which of a set of categories an example * belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc. */ @Experimental trait ClassificationModel extends Serializable { /** * Predict values for the given data set using the model trained. * * @param testData RDD representing data points to be predicted * @return an RDD[Double] where each entry contains the corresponding prediction */ def predict(testData: RDD[Vector]): RDD[Double] /** * Predict values for a single data point using the model trained. * * @param testData array representing a single data point * @return predicted category from the trained model */ def predict(testData: Vector): Double /** * Predict values for examples stored in a JavaRDD. * @param testData JavaRDD representing data points to be predicted * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction */ def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] = predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]] }
何爲分類算法?簡單來講,就是將具備某些特性的物體歸類對應到一個已知的類別集合中的某個類別上。從數學角度來講,能夠作以下定義:html
已知集合: C={y1,y2,..,yn} 和 I={x1,x2,..,xm,..} ,肯定映射規則 y=f(x),使得任意 xi∈I 有且僅有一個 yj∈C 使得 yj=f(xi) 成立。java
其中,C爲類別集合,I爲待分類的物體,f則爲分類器,分類算法的主要任務就是構造分類器f。算法
分類算法的構造一般須要一個已知類別的集合來進行訓練,一般來講訓練出來的分類算法不可能達到100%的準確率。分類器的質量每每與訓練數據、驗證數據、訓練數據樣本大小等因素相關。apache
舉個例子,咱們平常生活中看到一個陌生人,要作的第一件事情就是判斷其性別,判斷性別的過程就是一個分類的過程。根據以往的生活經驗,一般通過頭髮長短、服飾和體型這三個要素就能判斷出來一我的的性別。這裏的「生活經驗」就是一個訓練好的關於性別判斷的模型,其訓練數據是平常生活中遇到的形形色色的人。忽然有一天,一個娘炮走到了你面前,長髮飄飄,穿着緊身的衣褲,但是體型卻很man,因而你就疑惑了,根據以往的經驗——也就是已經訓練好的模型,沒法判斷這我的的性別。因而你學會了經過喉結來判斷其性別,這樣你的模型被訓練的質量更高了。但不能否認的是,永遠會出現一個讓你沒法判斷性別的人。因此模型永遠沒法達到100%的準確,只會隨着訓練數據的不斷增多而無限接近100%的準確。api
貝葉斯公式,或者叫作貝葉斯定理,是貝葉斯分類的基礎。而貝葉斯分類是一類分類算法的統稱,這一類算法的基礎都是貝葉斯公式。目前研究較多的四種貝葉斯分類算法有:Naive Bayes、TAN、BAN和GBN。ide
理工科的學生在大學應該都學過幾率論,其中最重要的幾個公式中就有貝葉斯公式——用來描述兩個條件機率之間的關係,好比P(A|B)和P(B|A)。如何在已知事件A和B分別發生的機率,和事件B發生時事件A發生的機率,來求得事件A發生時事件B發生的機率,這就是貝葉斯公式的做用。其表述以下:ui
P(B|A)=P(A|B)×P(B)P(A)this
樸素貝葉斯分類,Naive Bayes,你也能夠叫它NB算法。其核心思想很是簡單:對於某一預測項,分別計算該預測項爲各個分類的機率,而後選擇機率最大的分類爲其預測分類。就好像你預測一個娘炮是女人的可能性是40%,是男人的可能性是41%,那麼就能夠判斷他是男人。atom
Naive Bayes的數學定義以下:url
如何獲取第四步中的最大值,也就是如何計算第三步中的各個條件機率最爲重要。能夠採用以下作法:
Additive smoothing,又叫Laplacian smoothing或Lidstone smoothing。
當某個類別下某個特徵項劃分沒有出現時, P(ai|yj)=0 ,這樣最後乘出來的結果會讓精確度大大的下降,因此引入Additive smoothing來解決這個問題。其思想是對於這樣等於0的狀況,將其計數值加1,這樣若是訓練樣本集數量充分大時,並不會對結果產生影響,而且解決了上述頻率爲0的尷尬局面。