MLlib 卡方檢驗

一、卡方檢驗理論

1.一、  簡介

整體的分佈函數徹底未知或只知形式、但不知其參數的狀況,爲了推斷整體的某些未知特性,提出某些關於整體的假設。咱們要根據樣本對所提出的假設做出是接受,仍是拒絕的決策。假設檢驗是做出這一決策的過程。卡方檢驗便是假設檢驗的一種。html

1.二、卡方檢驗基本思想

首先假設H0成立,基於此前提計算出χ2值,它表示觀察值與理論值之間的偏離程度。根據χ2分佈及自由度能夠肯定在H0假設成立的狀況下得到當前統計量及更極端狀況的機率P。若是P值很小,說明觀察值與理論值偏離程度太大,應當拒絕無效假設,表示比較資料之間有顯著差別;不然就不能拒絕無效假設,尚不能認爲樣本所表明的實際狀況和理論假設有差異算法

1.三、數學公式

Pearson χ2 計算公式:express

 卡方公式

其中:k爲分類數,i水平的觀察頻數,n爲總頻數,i水平的指望機率apache

由卡方的計算公式可知,當觀察頻數與指望頻數徹底一致時,χ2值爲0;觀察頻數與指望頻數越接近,二者之間的差別越小,χ2值越小;反之,觀察頻數與指望頻數差異越大,二者之間的差別越大,χ2值越大。換言之,大的χ2值代表觀察頻數遠離指望頻數,即代表遠離假設。小的χ2值代表觀察頻數接近指望頻數,接近假設。所以,χ2是觀察頻數與指望頻數之間距離的一種度量指標,也是假設成立與否的度量指標。若是χ2值「小」,研究者就傾向於不拒絕H0;若是χ2值大,就傾向於拒絕H0。至於χ2在每一個具體研究中究竟要大到什麼程度才能拒絕H0,則要藉助於卡方分佈求出所對應的P值(p-value)來肯定。數組

1.四、卡方檢驗做用

         卡方檢驗主要有如下兩種做用:app

1) 皮爾森獨立性檢驗(Pearson's independence test)less

驗證從兩個變量抽出的配對觀察值組是否互相獨立。例如:例如:每次都從A國和B國各抽一我的,看他們的反應是否與國籍無關。機器學習

2)適度檢驗(Goodness of Fit test)ide

實際執行多項式試驗而獲得的觀察次數,與虛無假設的指望次數相比較,稱爲卡方適度檢驗,即在於檢驗兩者接近的程度,利用樣本數據以檢驗整體分佈是否爲某一特定分佈的統計方法。函數

二、MLlib與卡方檢驗

2.一、MLlib庫簡介

MLlib 是Spark對經常使用的機器學習算法的實現庫,同時包括相關的測試和數據生成器。

2.二、MLlib卡方檢驗

卡方檢驗是在spark mllib 1.1版本新增長的功能,其源碼文件在stat包中。

        

2.2.一、實現

         MLlib中卡方檢驗依照皮爾森卡方(pearson)檢驗實現,能夠用其進行獨立性檢驗及適度檢驗。其對外提供接口有以下三種:

(1)    def chiSquaredMatrix(counts:Matrix,methodName:String=PEARSON.name)

:ChiSqTestResult

(2)    def chiSquaredFeatures(data: RDD[LabeledPoint],methodName: String = PEARSON.name)

:Array[ChiSqTestResult]

(3)     def chiSquared(observed: Vector, expected: Vector = Vectors.dense(Array[Double]()),

          methodName: String = PEARSON.name):ChiSqTestResult

 

其中(1)、(2)用於獨立性檢驗,(3)用於適度檢驗。

方法(1) 中參數是統計後的對象的不一樣特徵在特定取值區間出現的頻數。所以可直接對傳入進行卡方數學計算。因爲統計後數據較小,程序採用串行方式處理。

方法(2) 中參數爲RDD[LabeledPoint]類型的數據。LabeledPoint格式的數據多用於機器學習方面,如迴歸分析等,其數據格式爲label:features。此種類型的數據多爲收集的原始數據或通過維歸約、彙集等手段處理過的數據,須要進行頻數統計纔可進行卡方數學計算。經過分析源碼本方法先將RDD[LabeledPoint]類型的數據轉換成矩陣,而後調用方法(1)。

方法(3) 與方法(1)類似,直接利用參數提供數據,根據卡方定義進行數學計算。

 

2.2.二、具體流程

         經過2.2.1分析可知,上述方法(2)過程最爲複雜,其包含數據預處理階段(頻數統計並轉換成矩陣)及處理階段(卡方數學計算)兩個過程,下面以方法(2)程序邏輯爲基礎就這兩個過程分別進行討論。

2.2.2.一、預處理階段

        

         預處理階段要作的主要任務爲:將RDD[LabeledPoint]類型的數據轉換爲三元組,而後分別對轉換後的三元組進行頻數統計,最後根據統計信息分別將每一個特徵所對應三元組轉換成矩陣並進行卡方計算。

         下面經過舉例,詳細說明預處理過程,假如採集到以下數據:

        

姓名

性別

年齡

文化程度

結婚次數

……

張三

22

大學

1

 

李四

20

未讀大學

1

 

王五

29

未讀大學

3

 

趙六

27

大學

2

 

孫七

21

未讀大學

2

 

錢八

23

大學

1

 

蔣九

37

未讀大學

3

 

……

……

……

……

……

 

 

 

 

 

 

 

 

         如今要用這批數據檢驗結婚次數是否與文化程度有關, 經過降維、彙集等手段進行轉換,結果以下圖所示:

文化程度

結婚次數

大學

1次

未讀大學

1次

未讀大學

超過1次

大學

超過1次

未讀大學

超過1次

大學

1次

未讀大學

超過1次

                 

         而後用上述處理過的數據建立RDD[LabeledPoint],其中label爲「大學|未讀大學」,features爲「{1次,超過1次}」,而後交由2.2.1中所述MLlib卡方提供的接口(2)處理。處理過程以下:

1>     將RDD[LabeledPoint] 映射成RDD[(col,feature,label),1],其中col爲feature在features中所在的列的下標,feature爲特徵,label爲某對象,1爲初始頻數。對應上表來講數據樣例爲:(0,一次,大學): 1 ;(0,一次以上,大學): 1

2>     將RDD[(col,feature,label),1] 按(col,feature,label)進行合併,生成Map[(col, feature,

label), n],其中n表明(col, feature,label)一共出現n次。對應上表數據樣例爲:(0,一次,大學): 2 ;(0,一次以上,大學): 1

3>     將Map[(col,feature,label),n]以三元組(col,feature,label)中col進行分組。而後統計每一個組中無重複feature值的個數及根據惟一label個數建立矩陣,矩陣單元格的值爲(col,feature,label)的個數n。因樣例數據feature爲1維因此2>中結果即爲最終結果。

4>     對矩陣進行數學計算得出結論(此部分將在下文處理階段詳細說明)。

其中1>,2>步爲並行處理部分,3>,4>步串行處理部分。上述程序流程圖爲:

 數據轉換流程

其中countByKey操做中RDD轉換過程以下圖:

 countByKey

三元組轉換爲矩陣的具體流程爲:

                                                 三元組轉矩陣                                                                                         

 

        

2.2.2.二、處理階段

         利用預處理階段產生的矩陣,計算數學指望,自由度,p值,經過p值決定接受仍是拒絕原假設。此階段爲翻譯卡方檢驗數學公式再也不詳述。

2.2.三、不足

         1)本程序僅根據皮爾森卡方進行實現,當某特徵出現理論(指望)頻數小於5時,會使「近似於卡方分配」的假設不可信,本程序的處理方法是直接拋出異常。能否經過修正(葉氏連續修正),使存在指望頻數小於5時仍得出可信的結果。    

   2)本程序預處理階段將RDD[LabeledPoint]轉換成Map[(col,feature,label),num]過程是並行的,從Map[(col,feature,label),num]轉換成矩陣過程及計算卡方的過程爲串行處理。由源碼分析知RDD 作爲參數的卡方檢驗接口是檢測每個feature與label的獨立性。不一樣feature這間無影響,能夠進行並行處理。

 

 

附源碼:

 

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.mllib.stat.test

import breeze.linalg.{DenseMatrix => BDM}
import org.apache.commons.math3.distribution.ChiSquaredDistribution

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

import scala.collection.mutable

/**
 * Conduct the chi-squared test for the input RDDs using the specified method.
 * Goodness-of-fit test is conducted on two `Vectors`, whereas test of independence is conducted
 * on an input of type `Matrix` in which independence between columns is assessed.
 * We also provide a method for computing the chi-squared statistic between each feature and the
 * label for an input `RDD[LabeledPoint]`, return an `Array[ChiSquaredTestResult]` of size =
 * number of features in the input RDD.
 *
 * Supported methods for goodness of fit: `pearson` (default)
 * Supported methods for independence: `pearson` (default)
 *
 * More information on Chi-squared test: http://en.wikipedia.org/wiki/Chi-squared_test
 */
/**
 * 卡方檢驗
 */
private[stat] object ChiSqTest extends Logging {


  /**
   * @param name String name for the method.
   * @param chiSqFunc Function for computing the statistic given the observed and expected counts.
   */
  /**
   * case 樣例類,進行模式匹配
   * @param name method名稱
   * @param chiSqFunc 用給定的觀察值及指望值 計算分析
   */
  case class Method(name: String, chiSqFunc: (Double, Double) => Double)

  // Pearson's chi-squared test: http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test
  /**
   * 樣例類Method 實例
   * 皮爾森卡方計算公式
   * 根據(observed-expected)^2 /expected計算 卡方值
   */
  val PEARSON = new Method("pearson", (observed:  Double, expected: Double) => {
    val dev = observed - expected
    dev * dev / expected
  })

  // Null hypothesis for the two different types of chi-squared tests to be included in the result.
  //零假設:又稱原假設,指進行統計檢驗時預先創建的假設, 零假設成立時,有關統計量應服從已知的某種機率分佈.
  object NullHypothesis extends Enumeration {
    type NullHypothesis = Value
    val goodnessOfFit = Value("observed follows the same distribution as expected.")
    val independence = Value("the occurrence of the outcomes is    independent.")
  }

  // Method identification based on input methodName string
  /**
   * 根據 methodName 獲取卡方檢驗Method實例 ,  本方法如今: 只提供皮爾森方法
   * @param methodName  卡方檢驗方法名
   */
  private def methodFromString(methodName: String): Method = {
    methodName match {  //…… 模式匹配
      case PEARSON.name => PEARSON
      case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.")
    }
  }

  /**
   * Conduct Pearson's independence test for each feature against the label across the input RDD.
   * The contingency table is constructed from the raw (feature, label) pairs and used to conduct
   * the independence test.
   * Returns an array containing the ChiSquaredTestResult for every feature against the label.
   */
  /**
   * 皮爾森獨立性檢驗:驗證從兩個變量抽出的配對觀察值組是否互相獨立
   * (例如:每次都從A國和B國各抽一我的,看他們的反應是否與國籍無關)
   * @param data  待檢測的數據   RDD[LabeledPoint] 類型
   * @param methodName  使用的檢驗方法
   * @result Array[ChiSqTestResult]  返回值
   */
  def chiSquaredFeatures(data: RDD[LabeledPoint],
      methodName: String = PEARSON.name): Array[ChiSqTestResult] = {
    val maxCategories = 10000  //最大分類
    val numCols = data.first().features.size  //特徵值中有多少種數據
    val results = new Array[ChiSqTestResult](numCols)  //存儲卡方檢驗結果:numCols個元素的卡方檢驗結果數組
    var labels: Map[Double, Int] = null
    // at most 1000 columns at a time
    val batchSize = 1000  //每批數據量
    var batch = 0  //批次
    while (batch * batchSize < numCols) { //處理
      // The following block of code can be cleaned up and made public as
      // chiSquared(data: RDD[(V1, V2)])
      val startCol = batch * batchSize  //開始執行位置
      val endCol = startCol + math.min(batchSize, numCols - startCol) //相對startCol的偏移量
      val pairCounts = data.mapPartitions { iter =>     //對每一個分區執行iter指向操做
        val distinctLabels = mutable.HashSet.empty[Double]    //不一樣的標籤(研究對象分類)
        /*建立一個Map對象,並初始化:key爲列的編號,value用空的HashSet*/
        val allDistinctFeatures: Map[Int, mutable.HashSet[Double]] =
          Map((startCol until endCol).map(col => (col, mutable.HashSet.empty[Double])): _*)
        var i = 1 //計數器,避免頻繁進行判斷
        /*對此分片進行flatMap操做, 將LabeledPoint(label, features) 轉換成三元組*/
        iter.flatMap { case LabeledPoint(label, features) =>
          if (i % 1000 == 0) {
            /*若分類過多,致使理論頻數(即指望計數)小於5的太多,會導至皮爾森方法失效*/
            if (distinctLabels.size > maxCategories) {
              throw new SparkException(s"Chi-square test expect factors (categorical values) but "
                + s"found more than $maxCategories distinct label values.")
            }
            allDistinctFeatures.foreach { case (col, distinctFeatures) =>
              if (distinctFeatures.size > maxCategories) {
                throw new SparkException(s"Chi-square test expect factors (categorical values) but "
                  + s"found more than $maxCategories distinct values in column $col.")
              }
            }
          }
          i += 1
          distinctLabels += label
          /*將features,加上索引,而後切片,再轉將其經過map 操做 賦值到allDistinctFeatures*/
          features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) =>
            allDistinctFeatures(col) += feature
            (col, feature, label)
          }
        }
      }.countByValue()

      if (labels == null) {
        // Do this only once for the first column since labels are invariant across features.
        /*
         *取出col 是startCol(實質每行一個分類,此filter即至關於從原數據每一行中拿出第一個)的數據,
         *將label取出加入labels
         * 方法僅執行一次。
         */
        labels =
          pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap
      }
      /*標籤,標識矩陣的列, (由於矩陣是列優先存儲的)*/
      val numLabels = labels.size
      pairCounts.keys.groupBy(_._1).map { case (col, keys) =>
        val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap
        val numRows = features.size  //特徵值個數, 矩陣的行
        val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) //建立(空)矩陣
        /*賦值操做*/
        keys.foreach { case (_, feature, label) =>
          val i = features(feature)
          val j = labels(label)
          contingency(i, j) += pairCounts((col, feature, label))
        }
        /*轉換爲Spark mllib庫中的矩陣,並對矩陣做卡方檢驗*/
        results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName)
      }
      batch += 1
    }
    results
  }

  /*
   * Pearson's goodness of fit test on the input observed and expected counts/relative frequencies.
   * Uniform distribution is assumed when `expected` is not passed in.
   */
  /**
   * 適度檢驗(Goodness of Fit test):實際執行多項式試驗而獲得的觀察次數,與虛無假設的指望次數相比較,
   * 稱爲卡方適度檢驗,即在於檢驗兩者接近的程度,利用樣本數據以檢驗整體分佈是否爲某一特定分佈的統計方法
   * @param observed 觀察值
   * @param expected 指望值(理論值),  若爲均勻分佈, 此參數可爲空。
   * @param methodName 檢驗方法名(默認皮爾森, 現階段只實現了皮爾森)
   */
  def chiSquared(observed: Vector,
      expected: Vector = Vectors.dense(Array[Double]()),
      methodName: String = PEARSON.name): ChiSqTestResult = {

    // Validate input arguments
    val method = methodFromString(methodName)  //獲取檢驗方法
    
    /*指望值存在條件下,判斷觀察值與指望值的個數是否相等(也就是是否配對)*/
    if (expected.size != 0 && observed.size != expected.size) {
      throw new IllegalArgumentException("observed and expected must be of the same size.")
    }
    val size = observed.size
    /*若分類過多,致使理論頻數(即指望計數)小於5的太多,會導至皮爾森方法失效*/
    if (size > 1000) {
      logWarning("Chi-squared approximation may not be accurate due to low expected frequencies "
        + s" as a result of a large number of categories: $size.")
    }
    val obsArr = observed.toArray
    /*若傳入的指望值不存在元素 則Array初始化爲 1.0/size,不然傳入參數值 */
    val expArr = if (expected.size == 0) Array.tabulate(size)(_ => 1.0 / size) else expected.toArray
    if (!obsArr.forall(_ >= 0.0)) {  //判斷是否存在< 0.0元素
      throw new IllegalArgumentException("Negative entries disallowed in the observed vector.")
    }
    if (expected.size != 0 && ! expArr.forall(_ >= 0.0)) {
      throw new IllegalArgumentException("Negative entries disallowed in the expected vector.")
    }

    // Determine the scaling factor for expected
    val obsSum = obsArr.sum  //觀察值的頻次和
    val expSum = if (expected.size == 0.0) 1.0 else expArr.sum  //指望值之和
    val scale = if (math.abs(obsSum - expSum) < 1e-7) 1.0 else obsSum / expSum  //???????????
    
    // compute chi-squared statistic
    /*
     * zip拉鍊操做, 組成(obs, exp)對,
     * 而後對經過foldLeft(0.0) 求每一對的卡方值,並累加…  其中參數 0.0 爲累加和的初始值
     */
    val statistic = obsArr.zip(expArr).foldLeft(0.0) { case (stat, (obs, exp)) =>
      if (exp == 0.0) {
        if (obs == 0.0) {
          throw new IllegalArgumentException("Chi-squared statistic undefined for input vectors due"
            + " to 0.0 values in both observed and expected.")
        } else {
          return new ChiSqTestResult(0.0, size - 1, Double.PositiveInfinity, PEARSON.name,
            NullHypothesis.goodnessOfFit.toString)
        }
      }
      if (scale == 1.0) {
        stat + method.chiSqFunc(obs, exp)
      } else {
        stat + method.chiSqFunc(obs, exp * scale)
      }
    }
    val df = size - 1
    val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
    new ChiSqTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString)
  }

  /*
   * Pearson's independence test on the input contingency matrix.
   * TODO: optimize for SparseMatrix when it becomes supported.
   */
  /**
   * 獨立性檢驗
   * @param counts  要檢測的數據(矩陣)
   * @param methodName  使用的檢測方法(默認:皮爾森)
   */
  def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSqTestResult = {
    val method = methodFromString(methodName)  //獲取Method (case類)
    val numRows = counts.numRows  //矩陣行數
    val numCols = counts.numCols  //矩陣列數

    // get row and column sums
    val colSums = new Array[Double](numCols)  //存放每列數據和(用於計算機率)
    val rowSums = new Array[Double](numRows)  //存放每行數據和
    val colMajorArr = counts.toArray  //存在全部數據(用於計算 樣本總數)
    var i = 0
    while (i < colMajorArr.size) {  
      val elem = colMajorArr(i)
      if (elem < 0.0) {
        throw new IllegalArgumentException("Contingency table cannot contain negative entries.")
      }
      colSums(i / numRows) += elem  //賦值
      rowSums(i % numRows) += elem  //賦值
      i += 1
    }
    val total = colSums.sum  //計算樣本總量

    // second pass to collect statistic
    var statistic = 0.0
    var j = 0
    while (j < colMajorArr.size) {
      val col = j / numRows
      val colSum = colSums(col)
      if (colSum == 0.0) {
        throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
          + s"0 sum in column [$col].")
      }
      val row = j % numRows
      val rowSum = rowSums(row)
      if (rowSum == 0.0) {
        throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
          + s"0 sum in row [$row].")
      }
      val expected = colSum * rowSum / total  //計算指望值
      statistic += method.chiSqFunc(colMajorArr(j), expected) //計算卡方值、併疊加至statistic中
      j += 1
    }
    val df = (numCols - 1) * (numRows - 1)  //計算自由度
    if (df == 0) {
      // 1 column or 1 row. Constant distribution is independent of anything.
      // pValue = 1.0 and statistic = 0.0 in this case.
      new ChiSqTestResult(1.0, 0, 0.0, methodName, NullHypothesis.independence.toString)
    } else {
      /*
       * cumulativeProbability() 依據下面理論進行實現
       * <a href="http://mathworld.wolfram.com/Chi-SquaredDistribution.html">
       * 自由度爲n的卡方分佈是 a=n/2 和 λ = 1/2 的伽馬分佈
       * 此方法計算的機率是 P{X <= x} ;
       *
       * 當x是某個常數,選擇x是爲了獲得想要的檢驗顯著性水平a, 也就是說x的選擇知足Ho爲真時, 應爲:P(X > x) = a;
       * 與上述方法計算值互補
       * 又pValue = P{X > x} = a;
       * pValue 爲拒絕原假設的最小顯著性水平
       */
      val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
      //生成結果(根據pValue值進行比較,得出獨立性強弱)
      new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString)
    }
  }
}

 

以上皆我的理解,請各位批評指正

http://www.cnblogs.com/barrenlake/p/4354579.html

相關文章
相關標籤/搜索