機器學習 | 算法筆記- K均值(K-Means)

前言

本系列爲機器學習算法的總結和概括,目的爲了清晰闡述算法原理,同時附帶上手代碼實例,便於理解。

目錄

   決策樹
   線性迴歸
  組合算法(Ensemble Method)
   K-Means
  機器學習算法總結
 
本章主要介紹無監督學習中的k-means,以及簡單代碼實現。

1、算法簡介

k-Means算法是一種聚類算法,它是一種無監督學習算法,目的是將類似的對象歸到同一個蔟中。蔟內的對象越類似,聚類的效果就越好。聚類和分類最大的不一樣在於,分類的目標事先已知,而聚類則不同。其產生的結果和分類相同,而只是類別沒有預先定義。

1.1 算法原理

設計的目的: 使各個樣本與所在簇的質心的均值的偏差平方和達到最小(這也是評價K-means算法最後聚類效果的評價標準)。

1.2 算法特色

· 優勢:容易實現
· 缺點:可能收斂到局部最小值,在大規模數據上收斂較慢
適合數據類型:數值型數據

1.3 聚類過程

1)建立k個點做爲k個簇的起始質心(常常隨機選擇)。
2)分別計算剩下的元素到k個簇中心的相異度(距離),將這些元素分別劃歸到相異度最低的簇。
3)根據聚類結果,從新計算k個簇各自的中心,計算方法是取簇中全部元素各自維度的算術平均值。
4)將D中所有元素按照新的中心從新聚類。
5)重複第4步,直到聚類結果再也不變化。
6)最後,輸出聚類結果。

2、算法實現

2.1 僞代碼

1 建立k個點做爲K個簇的起始質心(常常隨機選擇)
2 當任意一個點的蔟分配結果發生變化時(初始化爲True)
3     對數據集中的每一個數據點,從新分配質心
4         對每一個質心
5             計算質心到數據點之間的距離
6         將數據點分配到距其最近的蔟
7     對每一個蔟,計算蔟中全部點的均值並將均值做爲新的質心

2.2 手寫實現

代碼主要包括兩部分,一個是Kmeans分類器的構建,裏面包含兩個算法,一個是Kmeans,一個是二分K-means。另外一個是一個測試文件代碼,用於執行測試test.txt數據文件html

# -*- coding: utf-8 -*-
import numpy as np

class KMeansClassifier():
    
    def __init__(self, k=3, initCent='random', max_iter=500 ):
        
        self._k = k
        self._initCent = initCent
        self._max_iter = max_iter
        self._clusterAssment = None
        self._labels = None
        self._sse = None
        
    def _calEDist(self, arrA, arrB):
        """
        功能:歐拉距離距離計算
        輸入:兩個一維數組
        """
        return np.math.sqrt(sum(np.power(arrA-arrB, 2)))
    
    def _calMDist(self, arrA, arrB):
        """
        功能:曼哈頓距離距離計算
        輸入:兩個一維數組
        """
        return sum(np.abs(arrA-arrB))


    def _randCent(self, data_X, k):
        """
        功能:隨機選取k個質心
        輸出:centroids #返回一個m*n的質心矩陣
        """
        n = data_X.shape[1] #獲取特徵的維數
        centroids = np.empty((k,n))  #使用numpy生成一個k*n的矩陣,用於存儲質心
        for j in range(n):
            minJ = min(data_X[:, j])
            rangeJ  = float(max(data_X[:, j] - minJ))
            #使用flatten拉平嵌套列表(nested list)
            centroids[:, j] = (minJ + rangeJ * np.random.rand(k, 1)).flatten()
        return centroids 
    
    def fit(self, data_X):
        """
        輸入:一個m*n維的矩陣
        """
        if not isinstance(data_X, np.ndarray) or \
               isinstance(data_X, np.matrixlib.defmatrix.matrix):
            try:
                data_X = np.asarray(data_X)
            except:
                raise TypeError("numpy.ndarray resuired for data_X")
                
        m = data_X.shape[0]  #獲取樣本的個數
        #一個m*2的二維矩陣,矩陣第一列存儲樣本點所屬的族的索引值,
        #第二列存儲該點與所屬族的質心的平方偏差
        self._clusterAssment = np.zeros((m,2)) 
        
        if self._initCent == 'random':
            self._centroids = self._randCent(data_X, self._k)
            
        clusterChanged = True
        for _ in range(self._max_iter): #使用"_"主要是由於後面沒有用到這個值
            clusterChanged = False
            for i in range(m):   #將每一個樣本點分配到離它最近的質心所屬的族
                minDist = np.inf #首先將minDist置爲一個無窮大的數
                minIndex = -1    #將最近質心的下標置爲-1
                for j in range(self._k): #次迭代用於尋找最近的質心
                    arrA = self._centroids[j,:]
                    arrB = data_X[i,:]
                    distJI = self._calEDist(arrA, arrB) #計算偏差值
                    if distJI < minDist:
                        minDist = distJI
                        minIndex = j
                if self._clusterAssment[i, 0] != minIndex or self._clusterAssment[i, 1] > minDist**2:
                    clusterChanged = True
                    self._clusterAssment[i,:] = minIndex, minDist**2
            if not clusterChanged:#若全部樣本點所屬的族都不改變,則已收斂,結束迭代
                break
            for i in range(self._k):#更新質心,將每一個族中的點的均值做爲質心
                index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值
                value = np.nonzero(index_all==i) #取出全部屬於第i個簇的索引值
                ptsInClust = data_X[value[0]]    #取出屬於第i個簇的全部樣本點
                self._centroids[i,:] = np.mean(ptsInClust, axis=0) #計算均值
        
        self._labels = self._clusterAssment[:,0]
        self._sse = sum(self._clusterAssment[:,1])
    
    def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族
        #類型檢查
        if not isinstance(X,np.ndarray):
            try:
                X = np.asarray(X)
            except:
                raise TypeError("numpy.ndarray required for X")
        
        m = X.shape[0]#m表明樣本數量
        preds = np.empty((m,))
        for i in range(m):#將每一個樣本點分配到離它最近的質心所屬的族
            minDist = np.inf
            for j in range(self._k):
                distJI = self._calEDist(self._centroids[j,:], X[i,:])
                if distJI < minDist:
                    minDist = distJI
                    preds[i] = j
        return preds

        
class biKMeansClassifier():
    "this is a binary k-means classifier"
    
    def __init__(self, k=3):
        
        self._k = k
        self._centroids = None
        self._clusterAssment = None
        self._labels = None
        self._sse = None
        
    
    def _calEDist(self, arrA, arrB):
        """
        功能:歐拉距離距離計算
        輸入:兩個一維數組
        """
        return np.math.sqrt(sum(np.power(arrA-arrB, 2)))
        
    def fit(self, X):
        m = X.shape[0]
        self._clusterAssment = np.zeros((m,2))
        centroid0 = np.mean(X, axis=0).tolist()
        centList =[centroid0]
        for j in range(m):#計算每一個樣本點與質心之間初始的平方偏差
            self._clusterAssment[j,1] = self._calEDist(np.asarray(centroid0), \
                                        X[j,:])**2
        
        while (len(centList) < self._k):
            lowestSSE = np.inf
            #嘗試劃分每一族,選取使得偏差最小的那個族進行劃分
            for i in range(len(centList)):
                index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值
                value = np.nonzero(index_all==i) #取出全部屬於第i個簇的索引值
                ptsInCurrCluster = X[value[0],:] #取出屬於第i個簇的全部樣本點
                clf = KMeansClassifier(k=2)
                clf.fit(ptsInCurrCluster)
                #劃分該族後,所獲得的質心、分配結果及偏差矩陣
                centroidMat, splitClustAss = clf._centroids, clf._clusterAssment
                sseSplit = sum(splitClustAss[:,1])
                index_all = self._clusterAssment[:,0] 
                value = np.nonzero(index_all==i)
                sseNotSplit = sum(self._clusterAssment[value[0],1])
                if (sseSplit + sseNotSplit) < lowestSSE:
                    bestCentToSplit = i
                    bestNewCents = centroidMat
                    bestClustAss = splitClustAss.copy()
                    lowestSSE = sseSplit + sseNotSplit
            #該族被劃分紅兩個子族後,其中一個子族的索引變爲原族的索引
            #另外一個子族的索引變爲len(centList),而後存入centList
            bestClustAss[np.nonzero(bestClustAss[:,0]==1)[0],0]=len(centList)
            bestClustAss[np.nonzero(bestClustAss[:,0]==0)[0],0]=bestCentToSplit
            centList[bestCentToSplit] = bestNewCents[0,:].tolist()
            centList.append(bestNewCents[1,:].tolist())
            self._clusterAssment[np.nonzero(self._clusterAssment[:,0] == \
                                        bestCentToSplit)[0],:]= bestClustAss 
                   
        self._labels = self._clusterAssment[:,0] 
        self._sse = sum(self._clusterAssment[:,1])
        self._centroids = np.asarray(centList)
                                
    def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族
        #類型檢查
        if not isinstance(X,np.ndarray):
            try:
                X = np.asarray(X)
            except:
                raise TypeError("numpy.ndarray required for X")
        
        m = X.shape[0]#m表明樣本數量
        preds = np.empty((m,))
        for i in range(m):#將每一個樣本點分配到離它最近的質心所屬的族
            minDist = np.inf
            for j in range(self._k):
                distJI = self._calEDist(self._centroids[j,:],X[i,:])
                if distJI < minDist:
                    minDist = distJI
                    preds[i] = j
        return preds
View Code
# -*- coding: utf-8 -*-

import pandas as pd
import numpy as np
from kmeans import KMeansClassifier
import matplotlib.pyplot as plt

#加載數據集,DataFrame格式,最後將返回爲一個matrix格式
def loadDataset(infile):
    df = pd.read_csv(infile, sep='\t', header=0, dtype=str, na_filter=False)
    return np.array(df).astype(np.float)

if __name__=="__main__":
    data_X = loadDataset(r"data/testSet.txt")
    k = 3
    clf = KMeansClassifier(k)
    clf.fit(data_X)
    cents = clf._centroids
    labels = clf._labels
    sse = clf._sse
    colors = ['b','g','r','k','c','m','y','#e24fff','#524C90','#845868']
    for i in range(k):
        index = np.nonzero(labels==i)[0]
        x0 = data_X[index, 0]
        x1 = data_X[index, 1]
        y_i = i
        for j in range(len(x0)):
            plt.text(x0[j], x1[j], str(y_i), color=colors[i], \
                        fontdict={'weight': 'bold', 'size': 6})
        plt.scatter(cents[i,0],cents[i,1],marker='x',color=colors[i],\
                    linewidths=7)
    
    plt.title("SSE={:.2f}".format(sse))
    plt.axis([-7,7,-7,7])
    outname = "./result/k_clusters" + str(k) + ".png"
    plt.savefig(outname)
    plt.show()
    
    
View Code

3、算法總結與討論

雖然K-Means算法原理簡單,可是也有自身的缺陷:
  • 首先,聚類的簇數K值須要事先給定,但在實際中這個 K 值的選定是很是難以估計的,不少時候,事先並不知道給定的數據集應該分紅多少個類別才最合適。
  • Kmeans須要人爲地肯定初始聚類中心,不一樣的初始聚類中心可能致使徹底不一樣的聚類結果,不能保證K-Means算法收斂於全局最優解。
    • 針對此問題,在K-Means的基礎上提出了二分K-means算法。該算法首先將全部點看作是一個簇,而後一分爲二,找到最小SSE的聚類質心。接着選擇其中一個簇繼續一分爲二,此處哪個簇須要根據劃分後的SSE值來判斷。
  • 對離羣點敏感。
  • 結果不穩定 (受輸入順序影響)。
  • 時間複雜度高O(nkt),其中n是對象總數,k是簇數,t是迭代次數。

- K-Means算法K值如何選擇?
《大數據》中提到:給定一個合適的類簇指標,好比平均半徑或直徑,只要咱們假設的類簇的數目等於或者高於真實的類簇的數目時,該指標上升會很緩慢,而一旦試圖獲得少於真實數目的類簇時,該指標會急劇上升。
 
簇的直徑是指簇內任意兩點之間的最大距離。
簇的半徑是指簇內全部點到簇中心距離的最大值。
 
- 如何優化K-Means算法搜索的時間複雜度?
可使用K-D樹來縮短最近鄰的搜索時間(NN算法均可以使用K-D樹來優化時間複雜度)。
 
- 如何肯定K個簇的初始質心?
1) 選擇批次距離儘量遠的K個點
 首先隨機選擇一個點做爲第一個初始類簇中心點,而後選擇距離該點最遠的那個點做爲第二個初始類簇中心點,而後再選擇距離前兩個點的最近距離最大的點做爲第三個初始類簇的中心點,以此類推,直至選出K個初始類簇中心點。
 2) 選用層次聚類或者Canopy算法進行初始聚類,而後利用這些類簇的中心點做爲KMeans算法初始類簇中心點。
 
聚類擴展:密度聚類、層次聚類。詳見:
 

參考:http://www.csuldw.com/2015/06/03/2015-06-03-ml-algorithm-K-means/算法

相關文章
相關標籤/搜索