package org.apache.spark.mllib.classification import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} import org.apache.spark.{SparkException, Logging} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD /** * Model for Naive Bayes Classifiers. * * @param labels list of labels * @param pi log of class priors, whose dimension is C, number of labels * @param theta log of class conditional probabilities, whose dimension is C-by-D, * where D is number of features */ class NaiveBayesModel private[mllib] ( val labels: Array[Double], val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { private val brzPi = new BDV[Double](pi) private val brzTheta = new BDM[Double](theta.length, theta(0).length) { // Need to put an extra pair of braces to prevent Scala treating `i` as a member. var i = 0 while (i < theta.length) { var j = 0 while (j < theta(i).length) { brzTheta(i, j) = theta(i)(j) j += 1 } i += 1 } } override def predict(testData: RDD[Vector]): RDD[Double] = { val bcModel = testData.context.broadcast(this) testData.mapPartitions { iter => val model = bcModel.value } } override def predict(testData: Vector): Double = { labels(brzArgmax(brzPi + brzTheta * testData.toBreeze)) } } /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * * This is the Multinomial NB ([[]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[]]). The input feature values must be nonnegative. */ class NaiveBayes private (private var lambda: Double) extends Serializable with Logging { def this() = this(1.0) /** Set the smoothing parameter. Default: 1.0. */ def setLambda(lambda: Double): NaiveBayes = { this.lambda = lambda this } /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. */ def run(data: RDD[LabeledPoint]) = { val requireNonnegativeValues: Vector => Unit = (v: Vector) => { val values = v match { case sv: SparseVector => sv.values case dv: DenseVector => dv.values } if (!values.forall(_ >= 0.0)) { throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") } } // Aggregates term frequencies per label. // TODO: Calling combineByKey and collect creates two stages, we can implement something // TODO: similar to reduceByKeyLocally to save one stage. val aggregated = => (p.label, p.features)).combineByKey[(Long, BDV[Double])]( createCombiner = (v: Vector) => { requireNonnegativeValues(v) (1L, v.toBreeze.toDenseVector) }, mergeValue = (c: (Long, BDV[Double]), v: Vector) => { requireNonnegativeValues(v) (c._1 + 1L, c._2 += v.toBreeze) }, mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) => (c1._1 + c2._1, c1._2 += c2._2) ).collect() val numLabels = aggregated.length var numDocuments = 0L aggregated.foreach { case (_, (n, _)) => numDocuments += n } val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } val labels = new Array[Double](numLabels) val pi = new Array[Double](numLabels) val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) val piLogDenom = math.log(numDocuments + numLabels * lambda) var i = 0 aggregated.foreach { case (label, (n, sumTermFreqs)) => labels(i) = label val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda) pi(i) = math.log(n + lambda) - piLogDenom var j = 0 while (j < numFeatures) { theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom j += 1 } i += 1 } new NaiveBayesModel(labels, pi, theta) } } /** * Top-level methods for calling naive Bayes. */ object NaiveBayes { /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * * This is the Multinomial NB ([[]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[]]). * * This version of the method uses a default smoothing parameter of 1.0. * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. */ def train(input: RDD[LabeledPoint]): NaiveBayesModel = { new NaiveBayes().run(input) } /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * * This is the Multinomial NB ([[]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[]]). * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. * @param lambda The smoothing parameter */ def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } }
package org.apache.spark.mllib.classification import org.apache.spark.annotation.Experimental import import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD /** * :: Experimental :: * Represents a classification model that predicts to which of a set of categories an example * belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc. */ @Experimental trait ClassificationModel extends Serializable { /** * Predict values for the given data set using the model trained. * * @param testData RDD representing data points to be predicted * @return an RDD[Double] where each entry contains the corresponding prediction */ def predict(testData: RDD[Vector]): RDD[Double] /** * Predict values for a single data point using the model trained. * * @param testData array representing a single data point * @return predicted category from the trained model */ def predict(testData: Vector): Double /** * Predict values for examples stored in a JavaRDD. * @param testData JavaRDD representing data points to be predicted * @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction */ def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] = predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]] }
已知集合: C={y1,y2,..,yn} 和 I={x1,x2,..,xm,..} ,肯定映射規則 y=f(x),使得任意 xi∈I 有且僅有一個 yj∈C 使得 yj=f(xi) 成立。java
貝葉斯公式,或者叫作貝葉斯定理,是貝葉斯分類的基礎。而貝葉斯分類是一類分類算法的統稱,這一類算法的基礎都是貝葉斯公式。目前研究較多的四種貝葉斯分類算法有:Naive Bayes、TAN、BAN和GBN。ide
樸素貝葉斯分類,Naive Bayes,你也能夠叫它NB算法。其核心思想很是簡單:對於某一預測項,分別計算該預測項爲各個分類的機率,而後選擇機率最大的分類爲其預測分類。就好像你預測一個娘炮是女人的可能性是40%,是男人的可能性是41%,那麼就能夠判斷他是男人。atom
Naive Bayes的數學定義以下:url
Additive smoothing,又叫Laplacian smoothing或Lidstone smoothing。
當某個類別下某個特徵項劃分沒有出現時, P(ai|yj)=0 ,這樣最後乘出來的結果會讓精確度大大的下降,因此引入Additive smoothing來解決這個問題。其思想是對於這樣等於0的狀況,將其計數值加1,這樣若是訓練樣本集數量充分大時,並不會對結果產生影響,而且解決了上述頻率爲0的尷尬局面。