手寫KMeans算法

KMeans算法是一種無監督學習,它會將類似的對象歸到同一類中。 其基本思想是: 1.隨機計算k個類中心做爲起始點。 2. 將數據點分配到理其最近的類中心。 3.移動類中心。 4.重複2,3直至類中心再也不改變或者達到限定迭代次數。 具體的實現以下:python

from numpy import *
import matplotlib.pyplot as plt
import pandas as pd
# Load dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
dataset = pd.read_csv(url, names=names)
dataset['class'][dataset['class']=='Iris-setosa']=0
dataset['class'][dataset['class']=='Iris-versicolor']=1
dataset['class'][dataset['class']=='Iris-virginica']=2
#對類別進行編碼,3個類別分別賦值0,1,2

#算距離
def distEclud(vecA, vecB):                  #兩個向量間歐式距離
    return sqrt(sum(power(vecA - vecB, 2))) #la.norm(vecA-vecB)

#初始化聚類中心:經過在區間範圍隨機產生的值做爲新的中心點
def randCent(dataSet, k):
    #獲取特徵維度
    n = shape(dataSet)[1]
    #建立聚類中心0矩陣 k x n
    centroids = mat(zeros((k,n)))
    #遍歷n維特徵
    for j in range(n):
        #第j維特徵屬性值min   ,1x1矩陣
        minJ = min(dataSet[:,j])
        #區間值max-min,float數值
        rangeJ = float(max(dataSet[:,j]) - minJ)
        #第j維,每次隨機生成k箇中心
        centroids[:,j] = mat(minJ + rangeJ * random.rand(k,1))
    return centroids

def randChosenCent(dataSet,k):
    # 樣本數
    m=shape(dataSet)[0]
    # 初始化列表
    centroidsIndex=[]
    #生成相似於樣本索引的列表
    dataIndex=list(range(m))
    for i in range(k):
        #生成隨機數
        randIndex=random.randint(0,len(dataIndex))
        #將隨機產生的樣本的索引放入centroidsIndex
        centroidsIndex.append(dataIndex[randIndex])
        #刪除已經被抽中的樣本
        del dataIndex[randIndex]
    #根據索引獲取樣本
    centroids = dataSet.iloc[centroidsIndex]
    return mat(centroids)


def kMeans(dataSet, k):
    # 樣本總數
    m = shape(dataSet)[0]
    # 分配樣本到最近的簇:存[簇序號,距離的平方]
    # m行  2 列
    clusterAssment = mat(zeros((m, 2)))

    # step1:
    # 經過隨機產生的樣本點初始化聚類中心
    centroids = randChosenCent(dataSet, k)
    print('最初的中心=', centroids)

    # 標誌位,若是迭代先後樣本分類發生變化值爲Tree,不然爲False
    clusterChanged = True
    # 查看迭代次數
    iterTime = 0
    # 全部樣本分配結果再也不改變,迭代終止
    while clusterChanged:
        clusterChanged = False
        # step2:分配到最近的聚類中心對應的簇中
        for i in range(m):
            # 初始定義距離爲無窮大
            minDist = inf;
            # 初始化索引值
            minIndex = -1
            # 計算每一個樣本與k箇中心點距離
            for j in range(k):
                # 計算第i個樣本到第j箇中心點的距離
                distJI = distEclud(centroids[j, :], dataSet.values[i, :])
                # 判斷距離是否爲最小
                if distJI < minDist:
                    # 更新獲取到最小距離
                    minDist = distJI
                    # 獲取對應的簇序號
                    minIndex = j
            # 樣本上次分配結果跟本次不同,標誌位clusterChanged置True
            if clusterAssment[i, 0] != minIndex:
                clusterChanged = True
            clusterAssment[i, :] = minIndex, minDist ** 2  # 分配樣本到最近的簇
        iterTime += 1
        sse = sum(clusterAssment[:, 1])
        print('the SSE of %d' % iterTime + 'th iteration is %f' % sse)
        # step3:更新聚類中心
        for cent in range(k):  # 樣本分配結束後,從新計算聚類中心
            # 獲取該簇全部的樣本點
            ptsInClust = dataSet.iloc[nonzero(clusterAssment[:, 0].A == cent)[0]]
            # 更新聚類中心:axis=0沿列方向求均值。
            centroids[cent, :] = mean(ptsInClust, axis=0)
    return centroids, clusterAssment

def kMeansSSE(dataSet,k,distMeas=distEclud, createCent=randChosenCent):
    m = shape(dataSet)[0]
    #分配樣本到最近的簇:存[簇序號,距離的平方]
    clusterAssment=mat(zeros((m,2)))
    #step1:#初始化聚類中心
    centroids = createCent(dataSet, k)
    print('initial centroids=',centroids)
    sseOld=0
    sseNew=inf
    iterTime=0 #查看迭代次數
    while(abs(sseNew-sseOld)>0.0001):
        sseOld=sseNew
        #step2:將樣本分配到最近的質心對應的簇中
        for i in range(m):
            minDist=inf;minIndex=-1
            for j in range(k):
                #計算第i個樣本與第j個質心之間的距離
                distJI=distMeas(centroids[j,:],dataSet.values[i,:])
                #獲取到第i樣本最近的質心的距離,及對應簇序號
                if distJI<minDist:
                    minDist=distJI;minIndex=j
            clusterAssment[i,:]=minIndex,minDist**2 #分配樣本到最近的簇
        iterTime+=1
        sseNew=sum(clusterAssment[:,1])
        print('the SSE of %d'%iterTime + 'th iteration is %f'%sseNew)
        #step3:更新聚類中心
        for cent in range(k):
            #樣本分配結束後,從新計算聚類中心
            ptsInClust=dataSet[nonzero(clusterAssment[:,0].A==cent)[0]]
            #按列取平均,mean()對array類型
            centroids[cent,:] = mean(ptsInClust, axis=0)
    return centroids, clusterAssment


# 2維數據聚類效果顯示
def datashow(dataSet, k, centroids, clusterAssment):  # 二維空間顯示聚類結果
    from matplotlib import pyplot as plt
    num, dim = shape(dataSet)  # 樣本數num ,維數dim

    if dim != 2:
        print('sorry,the dimension of your dataset is not 2!')
        return 1
    marksamples = ['or', 'ob', 'og', 'ok', '^r', '^b', '<g']  # 樣本圖形標記
    if k > len(marksamples):
        print('sorry,your k is too large,please add length of the marksample!')
        return 1
        # 繪全部樣本
    for i in range(num):
        markindex = int(clusterAssment[i, 0])  # 矩陣形式轉爲int值, 簇序號
        # 特徵維對應座標軸x,y;樣本圖形標記及大小
        plt.plot(dataSet.iat[i, 0], dataSet.iat[i, 1], marksamples[markindex], markersize=6)

    # 繪中心點
    markcentroids = ['o', '*', '^']  # 聚類中心圖形標記
    label = ['0', '1', '2']
    c = ['yellow', 'pink', 'red']
    for i in range(k):
        plt.plot(centroids[i, 0], centroids[i, 1], markcentroids[i], markersize=15, label=label[i], c=c[i])
        plt.legend(loc='upper left')
    plt.xlabel('sepal length')
    plt.ylabel('sepal width')

    plt.title('k-means cluster result')  # 標題
    plt.show()


# 畫出實際圖像
def trgartshow(dataSet, k, labels):
    from matplotlib import pyplot as plt
    num, dim = shape(dataSet)
    label = ['0', '1', '2']
    marksamples = ['ob', 'or', 'og', 'ok', '^r', '^b', '<g']
    # 經過循環的方式,完成分組散點圖的繪製
    for i in range(num):
        plt.plot(datamat.iat[i, 0], datamat.iat[i, 1], marksamples[int(labels.iat[i, 0])], markersize=6)
    for i in range(0, num, 50):
        plt.plot(datamat.iat[i, 0], datamat.iat[i, 1], marksamples[int(labels.iat[i, 0])], markersize=6,
                 label=label[int(labels.iat[i, 0])])
    plt.legend(loc='upper left')
    # 添加軸標籤和標題

    plt.xlabel('sepal length')
    plt.ylabel('sepal width')

    plt.title('iris true result')  # 標題

    # 顯示圖形
    plt.show()
    # label=labels.iat[i,0]

#聚類前,繪製原始的樣本點
def originalDatashow(dataSet):
        #樣本的個數和特徵維數
    num,dim=shape(dataSet)
    marksamples=['ob'] #樣本圖形標記
    for i in range(num):
        plt.plot(datamat.iat[i,0],datamat.iat[i,1],marksamples[0],markersize=5)
    plt.title('original dataset')
    plt.xlabel('sepal length')
    plt.ylabel('sepal width') #標題
    plt.show()


if __name__ == '__main__':
    # =====kmeans聚類
    # # #獲取樣本數據
    datamat = dataset.loc[:, ['sepal-length', 'sepal-width']]
    # 真實的標籤
    labels = dataset.loc[:, ['class']]
    # #原始數據顯示
    originalDatashow(datamat)

    # #*****kmeans聚類
    k = 3  # 用戶定義聚類數
    mycentroids, clusterAssment = kMeans(datamat, k)
    # mycentroids,clusterAssment=kMeansSSE(datamat,k)

    # 繪圖顯示
    datashow(datamat, k, mycentroids, clusterAssment)
    trgartshow(datamat, 3, labels)

下面,使用TensorFlow,實現以下:算法

import tensorflow as tf
import numpy as np
from tensorflow.contrib.factorization import KMeans
import os

os.environ['CUDA_VISIBLE_DEVICES']=''

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('/tmp/data',one_hot=True)

full_data_x = mnist.train.images

num_steps = 50
batch_size = 1024
k = 25
num_classes = 10
num_features = 28*28

X = tf.placeholder(tf.float32,[None,num_features])
y = tf.placeholder(tf.float32,[None,num_classes])

kmeans = KMeans(inputs=X,num_clusters=k,distance_metric='cosine',use_mini_batch=True)

# Build KMeans graph
all_scores, cluster_idx, scores, cluster_centers_initialized,init_op, training_op = kmeans.training_graph()

cluster_idx = cluster_idx[0]
avg_distance = tf.reduce_mean(scores)

# Initialize the variables (i.e. assign their default value)
init_vars = tf.global_variables_initializer()

sess = tf.Session()
sess.run(init_vars, feed_dict={X: full_data_x})
sess.run(init_op, feed_dict={X: full_data_x})

# Training
for i in range(1, num_steps + 1):
    _, d, idx = sess.run([training_op, avg_distance, cluster_idx],
                         feed_dict={X: full_data_x})
    if i % 10 == 0 or i == 1:
        print("Step %i, Avg Distance: %f" % (i, d))

counts = np.zeros(shape=(k, num_classes))
for i in range(len(idx)):
    counts[idx[i]] += mnist.train.labels[i]
# Assign the most frequent label to the centroid
labels_map = [np.argmax(c) for c in counts]
labels_map = tf.convert_to_tensor(labels_map)

# Evaluation ops
# Lookup: centroid_id -> label
cluster_label = tf.nn.embedding_lookup(labels_map, cluster_idx)
# Compute accuracy
correct_prediction = tf.equal(cluster_label, tf.cast(tf.argmax(y, 1), tf.int32))
accuracy_op = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

# Test Model
test_x, test_y = mnist.test.images, mnist.test.labels
print("Test Accuracy:", sess.run(accuracy_op, feed_dict={X: test_x, y: test_y}))
相關文章
相關標籤/搜索