【大數據分析經常使用算法】4.樸素貝葉斯

簡介

在數據挖掘和機器學習算法中,有不少分類算法。其中樸素貝葉斯(Naive Bayes classiffer,NBC)是最簡單但倒是最有效的算法之一。java

本章節實現一個基於監督學習方法和機率分析器。樸素貝葉斯是一個線性分類器。要理解這個概念,須要瞭解條件機率的概念。算法

處理數值數據時,最好使用聚類技術,例如K均值、K-近鄰算法。不過杜宇名字、符號、電子郵件和文本類型的分類,則最好使用機率方法,如NBC。apache

在某些狀況下,咱們也可使用NBC對數值類型進行分類。api

一、樸素貝葉斯分類方法

NBC是一個基於強(樸素)獨立假設應用貝葉斯定理實現的機率分類器。基本說來,NBC根據輸入的一些屬性(特徵)將輸入分配到K個類別$C_{1},C_{2},...,C_{k}$中的某個類。NBC有不少應用,如垃圾郵件過濾和文檔分類。數據結構

例如,使用NBC的垃圾郵件過濾器把各個電子郵件分配到兩個簇之一:app

垃圾郵件(spammail) 、非垃圾郵件(not spam mail)

前面提到,樸素貝葉斯是一個基於監督學習的算法,這類算法都有一個特點。就是須要兩個階段完成應用:機器學習

  1. 訓練階段: 使用一個優先的數據樣本實例集合中的訓練數據創建一個分類器(分類階段會使用)。這就是說所謂的監督學習方法,即從一個樣本學習(樣本具備目標列信息),而後使用這個信息來完成新數據的分類;
  2. 分類階段:經過訓練階段獲得分類器,使用貝葉斯定律將新數據分類到階段1中明確了的類別中去(所以訓練階段除了獲得一個分類器以外,還能夠得到另一個關鍵信息,那就是這些數據會被分到那些類別$C_{1},...,C_{k}$。

二、Spark實現

第二階段,使用分類器對新數據進行分類,這時候咱們能夠運用公式: $$ C^{predict} = \arg \max P(C=c)\prod_{j=1}^{m}P(X_{j} = u_{j}|C=c) $$ 其中P(C)已經在咱們的第一階段求解得出,即每一類佔數據集的總機率(當數據集足夠大的時候,他已經能夠表明全部現象——理想狀態)。oop

2.一、訓練數據集

1,晴,熱,高,弱,不
2,晴,熱,高,強,不
3,陰,熱,高,弱,是
4,雨,溫暖,高,弱,是
5,雨,冷,正常,弱,是
6,雨,冷,正常,強,不
7,陰,冷,正常,強,是
8,晴,溫暖,高,弱,不
9,晴,冷,正常,弱,是
10,雨,溫暖,正常,弱,是
11,晴,溫暖,正常,強,是
12,陰,溫暖,高,強,是
13,陰,熱,正常,弱,是
14,雨,溫暖,高,強,不

經過該數據集咱們能夠生成一個分類器。post

2.二、階段一:構建分類器(訓練模型)

package com.sunrun.movieshow.autils.nbc;

import edu.umd.cloud9.io.pair.PairOfStrings;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/* 階段1:使用訓練數據創建一個分類器:一組機率表
* 輸入數據(id,天氣,溫度,溼度,風力,是否適合打球)
1,晴,熱,高,弱,不
2,晴,熱,高,強,是
...
*/
public class BuildNBCClassifier implements Serializable {
    /**
     * 1. 獲取Spark 上下文對象
     * @return
     */
    public static JavaSparkContext getSparkContext(String appName){
        SparkConf sparkConf = new SparkConf()
                .setAppName(appName)
                //.setSparkHome(sparkHome)
                .setMaster("local[*]")
                // 串行化器
                .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                .set("spark.testing.memory", "2147480000");

        return new JavaSparkContext(sparkConf);
    }

    // 2. 將Map(Tuple2,Double) -> List<Tuple2<PairOfString,DoubleWritable>>
    // PairOfStrings是一個實現了Writable接口的類,這樣就能夠支持Hadoop寫入了
    public static List<Tuple2<PairOfStrings, DoubleWritable>> toWritableList(Map<Tuple2<String,String>,Double> PT){
        List<Tuple2<PairOfStrings, DoubleWritable>> list = new ArrayList<>();
        for (Map.Entry<Tuple2<String, String>, Double> entry : PT.entrySet()) {
            list.add(new Tuple2<>(new PairOfStrings(entry.getKey()._1,entry.getKey()._2),new DoubleWritable(entry.getValue())));
        }
        return list;
    }

    public static void main(String[] args) {
        JavaSparkContext sc = getSparkContext("buildBNC");
        JavaRDD<String> training = sc.textFile("data/nbc/ball.txt");
        String dfsUrl = "hdfs://10.21.1.24:9000/ball/";

        // == 1.得到數據集大小,用於機率計算
        long trainingDataSize = training.count();

        // == 2.轉換數據集: line -> ((feature,classification),1)
        JavaPairRDD<Tuple2<String, String>, Integer> pairs = training.flatMapToPair(line -> {
            List<Tuple2<Tuple2<String, String>, Integer>> result = new ArrayList<>();
            String[] tokens = line.split(",");
            // 0->id, 1-(n-1) -> A(feature), n -> T(classification)
            String nowClassification = tokens[tokens.length - 1];
            for (int i = 1; i < tokens.length - 1; i++) {
                // like ((晴,是),1)
                result.add(new Tuple2<>(new Tuple2<>(tokens[i], nowClassification), 1));
            }
            // 最後還要統計每一個類別的總出現次數,所以,須要單獨的對最終class進行統計((class,是),1)
            result.add(new Tuple2<>(new Tuple2<>("class", nowClassification), 1));
            return result.iterator();
        });
        //pairs.saveAsTextFile(dfsUrl + "pair");
        /**
         * ((晴,不),1)
         * ((熱,不),1)
         * ((高,不),1)
         * ((弱,不),1)
         * ((class,不),1)
         *  ...
         */

        // == 3.計算每種特徵出現的次數
        JavaPairRDD<Tuple2<String, String>, Integer> count = pairs.reduceByKey((c1, c2) -> {
            return c1 + c2;
        });
        //count.saveAsTextFile(dfsUrl + "count");
        /**
         * [root@h24 ~]# hadoop fs -cat /ball/count/p*
         * ((強,是),3)
         * ((雨,是),3)
         * ((弱,是),6)
         * ((class,不),5)
         * ((晴,不),3)
         * ((class,是),9)
         * ((冷,是),3)
         * ...
         */

        // == 4.將歸約數據轉換爲map
        Map<Tuple2<String, String>, Integer> countAsMap = count.collectAsMap();

        // == 5.創建分類器數據結構:機率表PT、分類表CT
        HashMap<Tuple2<String, String>, Double> PT = new HashMap<>();
        ArrayList<String> CT = new ArrayList<>();
        for (Map.Entry<Tuple2<String, String>, Integer> entry : countAsMap.entrySet()) {
            // (feature,classification)
            Tuple2<String, String> key = entry.getKey();
            String feature = key._1;
            String classification = key._2;

            // K: new Tuple2<>(feature, classification) V: compute probably
            Tuple2<String, String> K = new Tuple2<>(feature, classification);

            // class type:target feature classification P(C)
            if(feature.equals("class")){
                CT.add(classification);
                // target feature times / total,總類型的機率爲類別出現次數/總記錄數
                PT.put(K, (double)entry.getValue() / trainingDataSize);
            }else{
                // 獲取某個分類出現的總次數。(Yes? No?) P(Ai|C=Ci) = 屬性A值在類別C下出現的次數/類別C的出現次數
                Tuple2<String, String> K2 = new Tuple2<>("class", classification);
                Integer times = countAsMap.get(K2);
                // 該類別沒出現過,則機率設爲0.0(其實不可能爲0)
                if(times == 0){
                    PT.put(K,0.0);
                }else{
                    PT.put(K, (double)entry.getValue() / times);
                }
            }
        }
        // System.out.println(PT);
        // (class,是)=0.6428571428571429, (冷,是)=0.3333333333333333, (晴,不)=0.6, (雨,是)=0.3333333333333333, (高,不)=0.8,
        // System.out.println(CT);
        //  [不, 是]

        // == 6. 保存分類器的數據結構
        // ==== 6.1.轉化爲持久存儲數據類型
        List<Tuple2<PairOfStrings, DoubleWritable>> ptList = toWritableList(PT);
        JavaPairRDD<PairOfStrings, DoubleWritable> ptRDD = sc.parallelizePairs(ptList);
        // ==== 6.2.存儲到Hadoop
        ptRDD.saveAsNewAPIHadoopFile(dfsUrl + "nbc/pt", // 存儲路徑
                PairOfStrings.class,// K
                DoubleWritable.class, // V
                SequenceFileOutputFormat.class// 輸出格式類
        );

        // == 7.保存分類列表
        JavaRDD<String> ctRDD = sc.parallelize(CT);
        ctRDD.saveAsTextFile(dfsUrl + "nbc/ct");
        /**
         * [root@h24 ~]# hadoop fs -cat /ball/nbc/ct/*
         * 不
         * 是
         *
         * [root@h24 ~]# hadoop fs -cat /ball/nbc/pt/*
         * SEQ$edu.umd.cloud9.io.pair.PairOfStrings#org.apache.hadoop.io.DoubleWritableռ
         * ...
         */

        System.out.println("complete training...");
    }
}

2.三、階段二:使用分類器對新數據分類(測試模型)

package com.sunrun.movieshow.autils.nbc;

import com.sunrun.movieshow.autils.common.SparkHelper;
import edu.umd.cloud9.io.pair.PairOfStrings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

// 樸素貝葉斯分類器
// 階段1:訓練階段:使用訓練數據創建一個樸素貝葉斯分類器 BuildNBClassifier
// 階段2:測試階段:使用新創建的NBC對新數據進行分類 NBCTester
public class NBCTester {
    public static void main(String[] args) {
        JavaSparkContext sc = SparkHelper.getSparkContext("NBCTester");

        // 實例存儲文件的根目錄
        String dfsUrl = "hdfs://10.21.1.24:9000/ball/";

        // == 1.導入要分類的數據集
        JavaRDD<String> testRdd = sc.textFile("data/nbc/test.txt");
        /**
         * 1,晴,熱,高,弱
         * 2,晴,熱,高,強
         * 3,陰,熱,高,弱
         * 4,雨,溫暖,高,弱
         */

        // == 2.加載分類器模型
        JavaPairRDD<PairOfStrings, DoubleWritable> modelRDD = sc.newAPIHadoopFile(dfsUrl + "nbc/pt",
                SequenceFileInputFormat.class,
                PairOfStrings.class,
                DoubleWritable.class,
                new Configuration()
        );
        // System.out.println(ptRDD.collect());
        /**
         *((高, 不),0.8), ((高, 不),0.8), ((高, 不),0.8),...
         */

        // == 3.使用map複製返回的對象:((高, 不),0.8)
        JavaPairRDD<Tuple2<String, String>, Double> ptRDD = modelRDD.mapToPair(t -> {
            // pairStrings left and right (feature-v,classification)
            Tuple2<String, String> K = new Tuple2<>(t._1.getLeftElement(), t._1.getRightElement());
            // V - the probably
            Double V = new Double(t._2.get());
            return new Tuple2<>(K, V);
        });

        // == 4.廣播分類器
        Broadcast<Map<Tuple2<String, String>, Double>> broadcastPT = sc.broadcast(ptRDD.collectAsMap());

        //  == 5.廣播全部分類類別
        JavaRDD<String> ctRDD = sc.textFile(dfsUrl + "nbc/ct");
        final Broadcast<List<String>> broadcastCT = sc.broadcast(ctRDD.collect());



        // == 6.對新數據進行分類: argMax II(P(C=c) * P(Ai|c))
        JavaPairRDD<String, String> testResult = testRdd.mapToPair(line -> {
            // broadcast value
            Map<Tuple2<String, String>, Double> pt = broadcastPT.getValue();
            List<String> ct = broadcastCT.getValue();

            // 解析新數據的每個特徵值
            String[] featureValues = line.split(",");

            // 選擇類別
            String selectedClasses = "";

            // 當前的最大機率
            double maxPosterior = 0.0;

            // 計算:
            for (String Ci : ct) {
                // P(Ci)
                Double posterior = pt.get(new Tuple2<>("class", Ci));

                for (int i = 0; i < featureValues.length; i++) {
                    // P(Ai|Ci)
                    Double probably = pt.get(new Tuple2<>(featureValues[1], Ci));
                    // 這裏的邏輯有待探討,能夠理解爲,當前類別下,沒有這種特徵值出現,那麼當
                    // 一條數據的特徵值爲此值時,II(P(C=c) * P(Ai|c)) = 0,也就是該類別不可能被選擇。
                    if (probably == null) {
                        posterior = 0.0;
                        break;
                    } else {
                        // P(Ci) * P(Ai|Ci)
                        posterior *= probably.doubleValue();
                    }
                }

                System.out.println(line + "," + Ci + posterior);

                if (selectedClasses == null) {
                    // 計算第1個分類的值
                    selectedClasses = Ci;
                    maxPosterior = posterior;
                } else {
                    if (posterior > maxPosterior) {
                        selectedClasses = Ci;
                        maxPosterior = posterior;
                    }
                }
            }
            return new Tuple2<>(line, selectedClasses);
        });

        testResult.saveAsTextFile(dfsUrl + "test01");
        /***
         * [root@h24 ~]# hadoop fs -cat /ball/ball/test02/p*
         * (1,晴,熱,高,弱,不)
         * (2,晴,熱,高,強,不)
         * (3,陰,熱,高,弱,是)
         * (4,雨,溫暖,高,弱,不)
         */
    }
}

三、使用Spark MLib進行處理

MLib提供了各類各樣的算法提供給咱們使用,這裏也能夠直接偷懶使用MLib提供的算法進行分類器的構建。學習

 

總結

貝葉斯算法比起KNN算法來講,效果高了不少,KNN算法雖然能夠保證結果比較精確,可是其龐大的運算量在許多場景沒法使用,所以,KNN能夠說沒有NBC運用的普遍。

使用樸素貝葉斯算法的思路:

第一階段:訓練分類器階段,該階段須要大量的數據訓練一個分類器,數據量越大,預測結果越準確:

一、獲取整個數據集的記錄總數N

二、計算每一個類別出現的機率,即P(C) $$ P(C_{i}) = \frac{C_{i}}{N} $$ 其中$C_{i}$爲數據集中類別出現的次數。

三、計算每一個特徵值的條件機率(在已知類別以後) $$ P(A_{i}|C_{i}) = \frac{P(A_{i}\cap C_{i})}{P(C_{i})} = \frac{A_{i}\cap C_{i}}{C_{i}} $$ 例如咱們案例中的,求出天氣這個特徵向量在取值爲晴的時候的機率,經過計算獲得((晴,是),4)這個元組,其中的4就爲$(A_{i}\cap C_{i})$出現的次數。

四、經過步驟3的公式,計算出全部特徵值對應的機率,而後將其做爲分類器存儲到Hadoop Support文件系統。同時也從訓練集的目標列中總結出分爲多少類,做爲類別迭代器計算。

第二階段:測試分類器階段。

加載分類器,咱們使用公式 $$ C^{predict} = \arg \max P(C=c)\prod_{j=1}^{m}P(X_{j} = u_{j}|C=c) $$ 代入每一個測試數據,使用類別迭代器,分別測試將測試數據做爲每一個類別以後獲得的條件機率是多少,選取最大的那一個所選擇的類別做爲測試數據的分類結果。

能夠看到:貝葉斯算法的核心思想就是這樣一個常識:當你不能準確知悉一個事物的本質時,你能夠依靠與事物特定本質相關的事件出現的多少去判斷其本質屬性的機率。 用數學語言表達就是:支持某項屬性的事件發生得愈多,則該屬性成立的可能性就愈大。

聯繫到咱們的應用:咱們都是假設給定的數據是類別C的狀況下,他在咱們熟知的知識系統(訓練集)中進行後驗機率的計算以後,其分值(機率)越高,那麼他就最可能就是這個類別。

相關文章
相關標籤/搜索