Pearson χ2 計算公式:
1) 皮爾森獨立性檢驗(Pearson's independence test)
2)適度檢驗(Goodness of Fit test)
MLlib 是Spark對經常使用的機器學習算法的實現庫,同時包括相關的測試和數據生成器。
卡方檢驗是在spark mllib 1.1版本新增長的功能,其源碼文件在stat包中。
(1) def chiSquaredMatrix(counts:Matrix,methodName:String=PEARSON.name)
(2) def chiSquaredFeatures(data: RDD[LabeledPoint],methodName: String = PEARSON.name)
(3) def chiSquared(observed: Vector, expected: Vector = Vectors.dense(Array[Double]()),
methodName: String = PEARSON.name):ChiSqTestResult
方法(1) 中參數是統計後的對象的不一樣特徵在特定取值區間出現的頻數。所以可直接對傳入進行卡方數學計算。因爲統計後數據較小,程序採用串行方式處理。
方法(2) 中參數爲RDD[LabeledPoint]類型的數據。LabeledPoint格式的數據多用於機器學習方面,如迴歸分析等,其數據格式爲label:features。此種類型的數據多爲收集的原始數據或通過維歸約、彙集等手段處理過的數據,須要進行頻數統計纔可進行卡方數學計算。經過分析源碼本方法先將RDD[LabeledPoint]類型的數據轉換成矩陣,而後調用方法(1)。
方法(3) 與方法(1)類似,直接利用參數提供數據,根據卡方定義進行數學計算。
姓名 |
性別 |
年齡 |
文化程度 |
結婚次數 |
…… |
張三 |
男 |
22 |
大學 |
1 |
李四 |
女 |
20 |
未讀大學 |
1 |
王五 |
男 |
29 |
未讀大學 |
3 |
趙六 |
男 |
27 |
大學 |
2 |
孫七 |
女 |
21 |
未讀大學 |
2 |
錢八 |
男 |
23 |
大學 |
1 |
蔣九 |
女 |
37 |
未讀大學 |
3 |
…… |
…… |
…… |
…… |
…… |
如今要用這批數據檢驗結婚次數是否與文化程度有關, 經過降維、彙集等手段進行轉換,結果以下圖所示:
文化程度 |
結婚次數 |
大學 |
1次 |
未讀大學 |
1次 |
未讀大學 |
超過1次 |
大學 |
超過1次 |
未讀大學 |
超過1次 |
大學 |
1次 |
未讀大學 |
超過1次 |
1> 將RDD[LabeledPoint] 映射成RDD[(col,feature,label),1],其中col爲feature在features中所在的列的下標,feature爲特徵,label爲某對象,1爲初始頻數。對應上表來講數據樣例爲:(0,一次,大學): 1 ;(0,一次以上,大學): 1
2> 將RDD[(col,feature,label),1] 按(col,feature,label)進行合併,生成Map[(col, feature,
label), n],其中n表明(col, feature,label)一共出現n次。對應上表數據樣例爲:(0,一次,大學): 2 ;(0,一次以上,大學): 1
3> 將Map[(col,feature,label),n]以三元組(col,feature,label)中col進行分組。而後統計每一個組中無重複feature值的個數及根據惟一label個數建立矩陣,矩陣單元格的值爲(col,feature,label)的個數n。因樣例數據feature爲1維因此2>中結果即爲最終結果。
4> 對矩陣進行數學計算得出結論(此部分將在下文處理階段詳細說明)。
2)本程序預處理階段將RDD[LabeledPoint]轉換成Map[(col,feature,label),num]過程是並行的,從Map[(col,feature,label),num]轉換成矩陣過程及計算卡方的過程爲串行處理。由源碼分析知RDD 作爲參數的卡方檢驗接口是檢測每個feature與label的獨立性。不一樣feature這間無影響,能夠進行並行處理。
package org.apache.spark.mllib.stat.test
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.commons.math3.distribution.ChiSquaredDistribution
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import scala.collection.mutable
* Conduct the chi-squared test for the input RDDs using the specified method.
* Goodness-of-fit test is conducted on two `Vectors`, whereas test of independence is conducted
* on an input of type `Matrix` in which independence between columns is assessed.
* We also provide a method for computing the chi-squared statistic between each feature and the
* label for an input `RDD[LabeledPoint]`, return an `Array[ChiSquaredTestResult]` of size =
* number of features in the input RDD.
* Supported methods for goodness of fit: `pearson` (default)
* Supported methods for independence: `pearson` (default)
* More information on Chi-squared test: http://en.wikipedia.org/wiki/Chi-squared_test
* 卡方檢驗
private[stat] object ChiSqTest extends Logging {
* @param name String name for the method.
* @param chiSqFunc Function for computing the statistic given the observed and expected counts.
* case 樣例類,進行模式匹配
* @param name method名稱
* @param chiSqFunc 用給定的觀察值及指望值 計算分析
case class Method(name: String, chiSqFunc: (Double, Double) => Double)
// Pearson's chi-squared test: http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test
* 樣例類Method 實例
* 皮爾森卡方計算公式
* 根據(observed-expected)^2 /expected計算 卡方值
val PEARSON = new Method("pearson", (observed: Double, expected: Double) => {
val dev = observed - expected
dev * dev / expected
// Null hypothesis for the two different types of chi-squared tests to be included in the result.
//零假設:又稱原假設,指進行統計檢驗時預先創建的假設, 零假設成立時,有關統計量應服從已知的某種機率分佈.
object NullHypothesis extends Enumeration {
type NullHypothesis = Value
val goodnessOfFit = Value("observed follows the same distribution as expected.")
val independence = Value("the occurrence of the outcomes is independent.")
// Method identification based on input methodName string
* 根據 methodName 獲取卡方檢驗Method實例 , 本方法如今: 只提供皮爾森方法
* @param methodName 卡方檢驗方法名
private def methodFromString(methodName: String): Method = {
methodName match { //…… 模式匹配
case PEARSON.name => PEARSON
case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.")
* Conduct Pearson's independence test for each feature against the label across the input RDD.
* The contingency table is constructed from the raw (feature, label) pairs and used to conduct
* the independence test.
* Returns an array containing the ChiSquaredTestResult for every feature against the label.
* 皮爾森獨立性檢驗:驗證從兩個變量抽出的配對觀察值組是否互相獨立
* (例如:每次都從A國和B國各抽一我的,看他們的反應是否與國籍無關)
* @param data 待檢測的數據 RDD[LabeledPoint] 類型
* @param methodName 使用的檢驗方法
* @result Array[ChiSqTestResult] 返回值
def chiSquaredFeatures(data: RDD[LabeledPoint],
methodName: String = PEARSON.name): Array[ChiSqTestResult] = {
val maxCategories = 10000 //最大分類
val numCols = data.first().features.size //特徵值中有多少種數據
val results = new Array[ChiSqTestResult](numCols) //存儲卡方檢驗結果:numCols個元素的卡方檢驗結果數組
var labels: Map[Double, Int] = null
// at most 1000 columns at a time
val batchSize = 1000 //每批數據量
var batch = 0 //批次
while (batch * batchSize < numCols) { //處理
// The following block of code can be cleaned up and made public as
// chiSquared(data: RDD[(V1, V2)])
val startCol = batch * batchSize //開始執行位置
val endCol = startCol + math.min(batchSize, numCols - startCol) //相對startCol的偏移量
val pairCounts = data.mapPartitions { iter => //對每一個分區執行iter指向操做
val distinctLabels = mutable.HashSet.empty[Double] //不一樣的標籤(研究對象分類)
val allDistinctFeatures: Map[Int, mutable.HashSet[Double]] =
Map((startCol until endCol).map(col => (col, mutable.HashSet.empty[Double])): _*)
var i = 1 //計數器,避免頻繁進行判斷
/*對此分片進行flatMap操做, 將LabeledPoint(label, features) 轉換成三元組*/
iter.flatMap { case LabeledPoint(label, features) =>
if (i % 1000 == 0) {
if (distinctLabels.size > maxCategories) {
throw new SparkException(s"Chi-square test expect factors (categorical values) but "
+ s"found more than $maxCategories distinct label values.")
allDistinctFeatures.foreach { case (col, distinctFeatures) =>
if (distinctFeatures.size > maxCategories) {
throw new SparkException(s"Chi-square test expect factors (categorical values) but "
+ s"found more than $maxCategories distinct values in column $col.")
i += 1
distinctLabels += label
/*將features,加上索引,而後切片,再轉將其經過map 操做 賦值到allDistinctFeatures*/
features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) =>
allDistinctFeatures(col) += feature
(col, feature, label)
if (labels == null) {
// Do this only once for the first column since labels are invariant across features.
*取出col 是startCol(實質每行一個分類,此filter即至關於從原數據每一行中拿出第一個)的數據,
* 方法僅執行一次。
labels =
pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap
/*標籤,標識矩陣的列, (由於矩陣是列優先存儲的)*/
val numLabels = labels.size
pairCounts.keys.groupBy(_._1).map { case (col, keys) =>
val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap
val numRows = features.size //特徵值個數, 矩陣的行
val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) //建立(空)矩陣
keys.foreach { case (_, feature, label) =>
val i = features(feature)
val j = labels(label)
contingency(i, j) += pairCounts((col, feature, label))
/*轉換爲Spark mllib庫中的矩陣,並對矩陣做卡方檢驗*/
results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName)
batch += 1
* Pearson's goodness of fit test on the input observed and expected counts/relative frequencies.
* Uniform distribution is assumed when `expected` is not passed in.
* 適度檢驗(Goodness of Fit test):實際執行多項式試驗而獲得的觀察次數,與虛無假設的指望次數相比較,
* 稱爲卡方適度檢驗,即在於檢驗兩者接近的程度,利用樣本數據以檢驗整體分佈是否爲某一特定分佈的統計方法
* @param observed 觀察值
* @param expected 指望值(理論值), 若爲均勻分佈, 此參數可爲空。
* @param methodName 檢驗方法名(默認皮爾森, 現階段只實現了皮爾森)
def chiSquared(observed: Vector,
expected: Vector = Vectors.dense(Array[Double]()),
methodName: String = PEARSON.name): ChiSqTestResult = {
// Validate input arguments
val method = methodFromString(methodName) //獲取檢驗方法
if (expected.size != 0 && observed.size != expected.size) {
throw new IllegalArgumentException("observed and expected must be of the same size.")
val size = observed.size
if (size > 1000) {
logWarning("Chi-squared approximation may not be accurate due to low expected frequencies "
+ s" as a result of a large number of categories: $size.")
val obsArr = observed.toArray
/*若傳入的指望值不存在元素 則Array初始化爲 1.0/size,不然傳入參數值 */
val expArr = if (expected.size == 0) Array.tabulate(size)(_ => 1.0 / size) else expected.toArray
if (!obsArr.forall(_ >= 0.0)) { //判斷是否存在< 0.0元素
throw new IllegalArgumentException("Negative entries disallowed in the observed vector.")
if (expected.size != 0 && ! expArr.forall(_ >= 0.0)) {
throw new IllegalArgumentException("Negative entries disallowed in the expected vector.")
// Determine the scaling factor for expected
val obsSum = obsArr.sum //觀察值的頻次和
val expSum = if (expected.size == 0.0) 1.0 else expArr.sum //指望值之和
val scale = if (math.abs(obsSum - expSum) < 1e-7) 1.0 else obsSum / expSum //???????????
// compute chi-squared statistic
* zip拉鍊操做, 組成(obs, exp)對,
* 而後對經過foldLeft(0.0) 求每一對的卡方值,並累加… 其中參數 0.0 爲累加和的初始值
val statistic = obsArr.zip(expArr).foldLeft(0.0) { case (stat, (obs, exp)) =>
if (exp == 0.0) {
if (obs == 0.0) {
throw new IllegalArgumentException("Chi-squared statistic undefined for input vectors due"
+ " to 0.0 values in both observed and expected.")
} else {
return new ChiSqTestResult(0.0, size - 1, Double.PositiveInfinity, PEARSON.name,
if (scale == 1.0) {
stat + method.chiSqFunc(obs, exp)
} else {
stat + method.chiSqFunc(obs, exp * scale)
val df = size - 1
val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
new ChiSqTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString)
* Pearson's independence test on the input contingency matrix.
* TODO: optimize for SparseMatrix when it becomes supported.
* 獨立性檢驗
* @param counts 要檢測的數據(矩陣)
* @param methodName 使用的檢測方法(默認:皮爾森)
def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSqTestResult = {
val method = methodFromString(methodName) //獲取Method (case類)
val numRows = counts.numRows //矩陣行數
val numCols = counts.numCols //矩陣列數
// get row and column sums
val colSums = new Array[Double](numCols) //存放每列數據和(用於計算機率)
val rowSums = new Array[Double](numRows) //存放每行數據和
val colMajorArr = counts.toArray //存在全部數據(用於計算 樣本總數)
var i = 0
while (i < colMajorArr.size) {
val elem = colMajorArr(i)
if (elem < 0.0) {
throw new IllegalArgumentException("Contingency table cannot contain negative entries.")
colSums(i / numRows) += elem //賦值
rowSums(i % numRows) += elem //賦值
i += 1
val total = colSums.sum //計算樣本總量
// second pass to collect statistic
var statistic = 0.0
var j = 0
while (j < colMajorArr.size) {
val col = j / numRows
val colSum = colSums(col)
if (colSum == 0.0) {
throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
+ s"0 sum in column [$col].")
val row = j % numRows
val rowSum = rowSums(row)
if (rowSum == 0.0) {
throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
+ s"0 sum in row [$row].")
val expected = colSum * rowSum / total //計算指望值
statistic += method.chiSqFunc(colMajorArr(j), expected) //計算卡方值、併疊加至statistic中
j += 1
val df = (numCols - 1) * (numRows - 1) //計算自由度
if (df == 0) {
// 1 column or 1 row. Constant distribution is independent of anything.
// pValue = 1.0 and statistic = 0.0 in this case.
new ChiSqTestResult(1.0, 0, 0.0, methodName, NullHypothesis.independence.toString)
} else {
* cumulativeProbability() 依據下面理論進行實現
* <a href="http://mathworld.wolfram.com/Chi-SquaredDistribution.html">
* 自由度爲n的卡方分佈是 a=n/2 和 λ = 1/2 的伽馬分佈
* 此方法計算的機率是 P{X <= x} ;
* 當x是某個常數,選擇x是爲了獲得想要的檢驗顯著性水平a, 也就是說x的選擇知足Ho爲真時, 應爲:P(X > x) = a;
* 與上述方法計算值互補
* 又pValue = P{X > x} = a;
* pValue 爲拒絕原假設的最小顯著性水平
val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString)