Spark 貝葉斯分類算法

  1、貝葉斯定理數學基礎算法

  咱們都知道條件機率的數學公式形式爲apache

  即B發生的條件下A發生的機率等於A和B同時發生的機率除以B發生的機率。dom

  根據此公式變換,獲得貝葉斯公式:  即貝葉斯定律是關於隨機事件A和B的條件機率(或邊緣機率)的一則定律。一般,事件A在事件B發生的條件溪的機率,與事件B在事件A的條件下的機率是不同的,而貝葉斯定律就是描述兩者之間的關係的。
測試

  更進一步將貝葉斯公式進行推廣,假設事件A發生的機率是由一系列的因素(A1,A2,A3,...An)決定的,則事件A的全機率公式爲:this

  

   2、樸素貝葉斯分類spa

  樸素貝葉斯分類是一種十分簡單的分類算法,其思想基礎是:對於給定的待分類項,求解在此項出現的條件下各個類別出現的機率,哪一個最大,就認爲此待分類項就屬於哪一個類別。scala

  假設V=(v1,v2,v3....vn)是一個待分項,而vn爲V的每一個特徵向量;code

         B=(b1,b2,b3...bn)是一個分類集合,bn爲每一個具體的分類;對象

    若是須要測試某個Vn歸屬於B集合中的哪一個具體分類,則須要計算P(bn|V),即在V發生的條件下,歸屬於b1,b2,b3,....bn中哪一個可能性最大。即:blog

    

    所以,這個問題轉換成求每一個待分項分配到集合中具體分類的機率是多少。而這個·具體機率的求法可使用貝葉斯定律。

    

    通過變換得出:

    

   3、MLlib對應的API

  一、貝葉斯分類伴生對象NativeBayes,原型:

object NaiveBayes extends scala.AnyRef with scala.Serializable {
  def train(input : org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]) : org.apache.spark.mllib.classification.NaiveBayesModel = { /* compiled code */ }
  def train(input : org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], lambda : scala.Double) : org.apache.spark.mllib.classification.NaiveBayesModel = { /* compiled code */ }
}

  其主要定義了訓練貝葉斯分類模型的train方法,其中input爲訓練樣本,lambda爲平滑因子參數。

  二、train方法,其是NativeBayes對象的靜態方法,根據設置的樸素貝葉斯分類參數新建樸素貝葉斯分類類,並執行run方法進行訓練。

  三、樸素貝葉斯分類類NaiveBayes,原型:

class NaiveBayes private (private var lambda : scala.Double) extends scala.AnyRef with scala.Serializable with org.apache.spark.Logging {
  def this() = { /* compiled code */ }
  def setLambda(lambda : scala.Double) : org.apache.spark.mllib.classification.NaiveBayes = { /* compiled code */ }
  def run(data : org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]) : org.apache.spark.mllib.classification.NaiveBayesModel = { /* compiled code */ }
}

 

  四、run方法,該方法主要計算先驗機率和條件機率。首先對全部樣本數據進行聚合,以label爲key,聚合同一個label的特徵features,獲得全部label的統計(label,features之和),而後根據label統計數據,再計算p(i),和theta(i)(j),最後,根據類別標籤列表、類別先驗機率、各種別下的每一個特徵的條件機率生成貝葉斯模型。

  先驗機率並取對數p(i)=log(p(yi))=log((i類別的次數+平滑因子)/(總次數+類別數*平滑因子)))

  各個特徵屬性的條件機率,並取對數

  theta(i)(j)=log(p(ai|yi))=log(sumTermFreqs(j)+平滑因子)-thetaLogDenom

  其中,theta(i)(j)是類別i下特徵j的機率,sumTermFreqs(j)是特徵j出現的次數,thetaLogDenom通常分2種狀況,以下:

    1.多項式模型

      thetaLogDenom=log(sumTermFreqs.values.sum+ numFeatures* lambda)

      其中,sumTermFreqs.values.sum類別i的總數,numFeatures特徵數量,lambda平滑因子

    2.伯努利模型

      thetaLogDenom=log(n+2.0*lambda)

 

  五、aggregated:對全部樣本進行聚合統計,統計沒個類別下的每一個特徵值之和及次數。

  六、pi表示各種別·的·先驗機率取天然對數的值

  七、theta表示各個特徵在各個類別中的條件機率值

  八、predict:根據模型的先驗機率、條件機率,計算樣本屬於每一個類別的機率,取最大項做爲樣本的類別

  九、貝葉斯分類模型NaiveBayesModel包含參數:類別標籤列表(labels)、類別先驗機率(pi)、各個特徵在各個類別中的條件機率(theta)。

 

  4、使用示例

  一、樣本數據:

0,1 0 0
0,2 0 0
1,0 1 0
1,0 2 0
2,0 0 1
2,0 0 2

  

import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}

object Bayes {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("BayesDemo").setMaster("local")
    val sc=new SparkContext(conf)
    //讀取樣本數據,此處使用自帶的處理數據方式·
    val data=MLUtils.loadLabeledPoints(sc,"d://bayes.txt")
    //訓練貝葉斯模型
    val model=NaiveBayes.train(data,1.0)
    //model.labels.foreach(println)
    //model.pi.foreach(println)
    val test=Vectors.dense(0,0,100)
    val res=model.predict(test)
    println(res)//輸出結果爲2.0
  }
}

  

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}

object Bayes {
  def main(args: Array[String]): Unit = {
    //建立spark對象
    val conf=new SparkConf().setAppName("BayesDemo").setMaster("local")
    val sc=new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)
    //讀取樣本數據
    val data=sc.textFile("d://bayes.txt")//讀取數據
    val demo=data.map{ line=>//處理數據
      val parts=line.split(',')//分割數據·
      LabeledPoint(parts(0).toDouble,//標籤數據轉換
        Vectors.dense(parts(1).split(' ').map(_.toDouble)))//向量數據轉換
    }
    //將樣本數據分爲訓練樣本和測試樣本
    val sp=demo.randomSplit(Array(0.6,0.4),seed = 11L)//對數據進行分配
    val train=sp(0)//訓練數據
    val testing=sp(1)//測試數據
    //創建貝葉斯分類模型,並進行訓練
    val model=NaiveBayes.train(train,lambda = 1.0)

    //對測試樣本進行測試
    val pre=testing.map(p=>(model.predict(p.features),p.label))//驗證模型
    val prin=pre.take(20)
    println("prediction"+"\t"+"label")
    for(i<- 0 to prin.length-1){
      println(prin(i)._1+"\t"+prin(i)._2)
    }
    
val accuracy=1.0 *pre.filter(x=>x._1==x._2).count()//計算準確度

println(accuracy)
} }
相關文章
相關標籤/搜索