K-means算法解析及代碼復現

K-means算法是最經常使用的聚類算法之一,本文將對該算法進行解析和numpy復現代碼。python

K-means解析

定義

K-means基於的一個假設是同類樣本點會在特徵空間造成簇。在K-means算法中,會給定樣本集 X 的 n 個數據點,簇的個數 k。每一個簇都有一個類別中心 c。K-means的優化目標以下式,
image
式子的意思是讓全部數據點離它們所屬的類別中心(最近的中心)的歐式距離之和最小。算法

求解步驟

求解這個方程通常用下面步驟求解:dom

  1. 隨機選取 k 個類別中心 C = {c1, c2,···, ck}。
  2. 把每一個數據點歸到其最近的類別中心的簇,即給每一個點打上假標籤。獲得 k 個簇集。
  3. 經過步驟2獲得的 k 個簇集,從新計算類別中心,計算方式爲,
    image
  4. 重複步驟2和步驟3直到類別中心再也不更新爲止。

能夠看出K-means的求解十分簡單,其關鍵在於類別中心的初始化。最簡單的初始化是隨機選取 k 個點看成類別中心,但可能會遇到下圖狀況。下圖四個簇對應四個類,當初始點(星)以下圖所示時,類別中心沒法收斂到正確的位置上。
image函數

k-means++算法 [1] 就是爲解決這個問題所提出的。
K-means++選取初始類別中心步驟爲:優化

  1. 隨機選取第一個類別中心。
  2. 計算全部樣本點與其最近的類別中心的距離 D(x),如下式機率
    image
    從樣本集 X 中選取下一個類別中心。
  3. 重複步驟2,直到選取到k個類別中心。

關鍵點在步驟2,其實質是當一個點屬於已選取的類別中心的簇的機率越大,它被選取的機率越小。其目的是使得算法儘量不在同一簇裏不選取兩個類別中心。不過算法以機率的形式選取,也沒法保證不出現上圖的狀況。所以,通常K-means算法會運行屢次,選取目標函數最小的類別中心。spa

算法代碼

初始化形心

使用的是K-means++的方式:code

def ini_centers(self,x):
    cs = np.array([x[np.random.randint(0, len(x), size = 1).item()]])
    for j in range(self.class_num - 1):
        for i, c in enumerate(cs):
            d = np.sqrt(np.sum((x - c) ** 2, 1).reshape(-1, 1))
            if i == 0:
                dist = d
            else:
                dist = np.concatenate((dist, d), 1)
                # n, class_num
    dist = dist.min(1)
    dist = dist**2/sum(dist**2)
    index = np.random.choice(np.arange(len(x)), p=dist.ravel())
    new_c = x[index]
    cs = np.concatenate((cs,[new_c]), 0)
    return cs

打標籤和從新計算類別中心

cnt = 0
flag = True
while flag and cnt < self.max_iter:
    # predict
    label, score = self.predict(x,cs)
    # update
    new_cs = np.array([x[label==i].mean(0) for i in range(self.class_num)])
        if (cs == new_cs).all(): flag = False
    cs = new_cs
    cnt+=1

預測函數

def predict(self,x,cs):
    for i,c in enumerate(cs):
        d = np.sqrt(np.sum((x - c)**2,1).reshape(-1,1))
        if i == 0:
            dist = d
        else:
            dist = np.concatenate((dist,d),1)
    label = dist.argmin(1)
    score = dist.min(1).sum()
    return label, score

屢次計算

def fit(self,x):
    sc = float("inf")
    for t in range(self.n_init):
    cs = self.ini_centers(x)
        # initial
    cnt = 0
    flag = True
    while flag and cnt < self.max_iter:
        # predict
        label, score = self.predict(x,cs)
        # update
        new_cs = np.array([x[label==i].mean(0) for i in range(self.class_num)])
            if (cs == new_cs).all(): flag = False
        cs = new_cs
        cnt+=1
    if score < sc:
        sc = score
        self.cluster_centers_ = cs
    return self.cluster_centers_

整體函數

class my_Kmeans():
    def __init__(self, class_num, max_iter=300, n_init=10):
        self.class_num = class_num
        self.cluster_centers_ = None
        self.max_iter = max_iter
        self.n_init = n_init
    def ini_centers(self,x):
        cs = np.array([x[np.random.randint(0, len(x), size = 1).item()]])
        for j in range(self.class_num - 1):
            for i, c in enumerate(cs):
                d = np.sqrt(np.sum((x - c) ** 2, 1).reshape(-1, 1))
                if i == 0:
                    dist = d
                else:
                    dist = np.concatenate((dist, d), 1)
                    # n, class_num
        dist = dist.min(1)
        dist = dist**2/sum(dist**2)
        index = np.random.choice(np.arange(len(x)), p=dist.ravel())
        new_c = x[index]
        cs = np.concatenate((cs,[new_c]), 0)
        return cs

    def fit(self,x):
        sc = float("inf")
        for t in range(self.n_init):
        cs = self.ini_centers(x)
            # initial
        cnt = 0
        flag = True
        while flag and cnt < self.max_iter:
            # predict
            label, score = self.predict(x,cs)
            # update
            new_cs = np.array([x[label==i].mean(0) for i in range(self.class_num)])
                if (cs == new_cs).all(): flag = False
            cs = new_cs
            cnt+=1
        if score < sc:
            sc = score
            self.cluster_centers_ = cs
        return self.cluster_centers_

    def predict(self,x,cs):
        for i,c in enumerate(cs):
            d = np.sqrt(np.sum((x - c)**2,1).reshape(-1,1))
            if i == 0:
                dist = d
            else:
                dist = np.concatenate((dist,d),1)
        label = dist.argmin(1)
        score = dist.min(1).sum()
        return label, score

    def fit_predict(self,x):
        self.fit(x)
        label, score = self.predict(x,self.cluster_centers_)
        return label

上述代碼經試驗基本功能完備,可是效果跟效率要差於sklearn庫。有能夠改進的地方歡迎跟我交流。get

[1] Arthur, David and Sergei Vassilvitskii. 「k-means++: the advantages of careful seeding.」 SODA '07 (2007).it

相關文章
相關標籤/搜索