機器學習-KMeans算法原理 && Spark實現

不懂算法的數據開發者不是一個好的算法工程師,還記得研究生時候,導師講過的一些數據挖掘算法,很有興趣,可是無奈工做後接觸少了,數據工程師的鄙視鏈,模型>實時>離線數倉>ETL工程師>BI工程師(不喜勿噴哈),如今作的工做主要是離線數倉,固然前期也作過一些ETL的工做,爲了職業的長遠發展,拓寬本身的技術邊界,有必要逐步深刻實時和模型,因此從本篇文章開始,也是列個FLAG,深刻學習實時和模型部分。git

改變本身,從提高本身不擅長領域的事情開始。github

1. KMeans - 算法簡介

K-Means算法是無監督的聚類算法,它實現起來比較簡單,聚類效果也不錯,所以應用很普遍,算法

  • K-means算法,也稱爲K-平均或者K-均值,通常做爲掌握聚類算法的第一個算法。sql

  • 這裏的K爲常數,需事先設定,通俗地說該算法是將沒有標註的 M 個樣本經過迭代的方式彙集成K個簇。apache

  • 在對樣本進行彙集的過程每每是以樣本之間的距離做爲指標來劃分。app

file
核心:K-means聚類算法是一種迭代求解的聚類分析算法,其步驟是隨機選取K個對象做爲初始的聚類中心,而後計算每一個對象與各個種子聚類中心之間的距離,把每一個對象分配給距離它最近的聚類中心。聚類中心以及分配給它們的對象就表明一個聚類。每分配一個樣本,聚類的聚類中心會根據聚類中現有的對象被從新計算。這個過程將不斷重複直到知足某個終止條件。終止條件能夠是沒有(或最小數目)對象被從新分配給不一樣的聚類,沒有(或最小數目)聚類中心再發生變化,偏差平方和局部最小dom

2.KMeans 算法流程

2.1 讀取文件,準備數據,對數據進行預處理

2.2 隨機找K個點,做爲初始的中心點

2.3 遍歷數據集,計算每個點到3箇中心的距離,距離那個中心點最近就屬於哪一個中心點

2.4 根據新的分類計算新的中心點

2.5 使用新的中心點開始下一次循環(繼續循環步驟2.3)

退出循環的條件函數

1.指定循環次數學習

2.全部的中心點幾乎再也不移動(即中心點移動的距離總和小於咱們給定的一個常熟,好比0.00001)大數據

3. KMeans算法優缺點

K值的選擇: k 值對最終結果的影響相當重要,而它卻必需要預先給定。給定合適的 k 值,須要先驗知識,憑空估計很困難,或者可能致使效果不好。

異常點的存在:K-means算法在迭代的過程當中使用全部點的均值做爲新的質點(中心點),若是簇中存在異常點,將致使均值誤差比較嚴重。 好比一個簇中有二、四、六、八、100五個數據,那麼新的質點爲24,顯然這個質點離絕大多數點都比較遠;在當前狀況下,使用中位數6可能比使用均值的想法更好,使用中位數的聚類方式叫作K-Mediods聚類(K中值聚類)

初值敏感:K-means算法是初值敏感的,選擇不一樣的初始值可能致使不一樣的簇劃分規則。爲了不這種敏感性致使的最終結果異常性,能夠採用初始化多套初始節點構造不一樣的分類規則,而後選擇最優的構造規則。針對這點後面所以衍生了:二分K-Means算法、K-Means++算法、K-Means||算法、Canopy算法等

實現簡單、移動、伸縮性良好等優勢使得它成爲聚類中最經常使用的算法之一。

4.KMeans算法Spark實現

4.1 數據下載和說明

連接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell
複製這段內容後打開百度網盤手機App,操做更方便哦

鳶尾花數據集,數據集包含3類共150調數據,每類含50個數據,每條記錄含4個特徵:花萼長度、花萼寬度、花瓣長度、花瓣寬度

過這4個 特徵,將花聚類,假設將K取值爲3,看看與實際結果的差異

4.2 實現

沒有使用mlb庫,而是使用scala原生實現

package com.hoult.work

import org.apache.commons.lang3.math.NumberUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.math.{pow, sqrt}
import scala.util.Random

object KmeansDemo {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()

    val sc = spark.sparkContext
    val dataset = spark.read.textFile("data/lris.csv")
      .rdd.map(_.split(",").filter(NumberUtils.isNumber _).map(_.toDouble))
      .filter(!_.isEmpty).map(_.toSeq)


    val res: RDD[(Seq[Double], Int)] = train(dataset, 3)

    res.sample(false, 0.1, 1234L)
      .map(tp => (tp._1.mkString(","), tp._2))
      .foreach(println)
  }

  // 定義一個方法 傳入的參數是 數據集、K、最大迭代次數、代價函數變化閾值
  // 其中 最大迭代次數和代價函數變化閾值是設定了默認值,能夠根據須要作相應更改
  def train(data: RDD[Seq[Double]], k: Int, maxIter: Int = 40, tol: Double = 1e-4) = {

    val sc: SparkContext = data.sparkContext

    var i = 0 // 迭代次數
    var cost = 0D //初始的代價函數
    var convergence = false   //判斷收斂,即代價函數變化小於閾值tol

    // step1 :隨機選取 k個初始聚類中心
    var initk: Array[(Seq[Double], Int)] = data.takeSample(false, k, Random.nextLong()).zip(Range(0, k))

    var res: RDD[(Seq[Double], Int)] = null

    while (i < maxIter && !convergence) {

      val bcCenters = sc.broadcast(initk)

      val centers: Array[(Seq[Double], Int)] = bcCenters.value

      val clustered: RDD[(Int, (Double, Seq[Double], Int))] = data.mapPartitions(points => {

        val listBuffer = new ListBuffer[(Int, (Double, Seq[Double], Int))]()

        // 計算每一個樣本點到各個聚類中心的距離
        points.foreach { point =>

          // 計算聚類id以及最小距離平方和、樣本點、1
          val cost: (Int, (Double, Seq[Double], Int)) = centers.map(ct => {

            ct._2 -> (getDistance(ct._1.toArray, point.toArray), point, 1)

          }).minBy(_._2._1)  // 將該樣本歸屬到最近的聚類中心
          listBuffer.append(cost)
        }

        listBuffer.toIterator
      })
      //
      val mpartition: Array[(Int, (Double, Seq[Double]))] = clustered
        .reduceByKey((a, b) => {
          val cost = a._1 + b._1   //代價函數
          val count = a._3 + b._3   // 每一個類的樣本數累加
          val newCenters = a._2.zip(b._2).map(tp => tp._1 + tp._2)    // 新的聚類中心點集
          (cost, newCenters, count)
        })
        .map {
          case (clusterId, (costs, point, count)) =>
            clusterId -> (costs, point.map(_ / count))   // 新的聚類中心
        }
        .collect()
      val newCost = mpartition.map(_._2._1).sum   // 代價函數
      convergence =  math.abs(newCost - cost) <= tol    // 判斷收斂,即代價函數變化是否小於小於閾值tol
      // 變換新的代價函數
      cost = newCost
      // 變換初始聚類中心
      initk = mpartition.map(tp => (tp._2._2, tp._1))
      // 聚類結果 返回樣本點以及所屬類的id
      res = clustered.map(tp=>(tp._2._2,tp._1))
      i += 1
    }
    // 返回聚類結果
    res
  }

  def getDistance(x:Array[Double],y:Array[Double]):Double={
    sqrt(x.zip(y).map(z=>pow(z._1-z._2,2)).sum)
  }


}

完整代碼:https://github.com/hulichao/bigdata-spark/blob/master/src/main/scala/com/hoult/work/KmeansDemo.scala

結果截圖:
file
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注
file

相關文章
相關標籤/搜索