KMeans算法是一種無監督學習,它會將類似的對象歸到同一類中。 其基本思想是: 1.隨機計算k個類中心做爲起始點。 2. 將數據點分配到理其最近的類中心。 3.移動類中心。 4.重複2,3直至類中心再也不改變或者達到限定迭代次數。 具體的實現以下:python
from numpy import * import matplotlib.pyplot as plt import pandas as pd # Load dataset url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class'] dataset = pd.read_csv(url, names=names) dataset['class'][dataset['class']=='Iris-setosa']=0 dataset['class'][dataset['class']=='Iris-versicolor']=1 dataset['class'][dataset['class']=='Iris-virginica']=2 #對類別進行編碼,3個類別分別賦值0,1,2 #算距離 def distEclud(vecA, vecB): #兩個向量間歐式距離 return sqrt(sum(power(vecA - vecB, 2))) #la.norm(vecA-vecB) #初始化聚類中心:經過在區間範圍隨機產生的值做爲新的中心點 def randCent(dataSet, k): #獲取特徵維度 n = shape(dataSet)[1] #建立聚類中心0矩陣 k x n centroids = mat(zeros((k,n))) #遍歷n維特徵 for j in range(n): #第j維特徵屬性值min ,1x1矩陣 minJ = min(dataSet[:,j]) #區間值max-min,float數值 rangeJ = float(max(dataSet[:,j]) - minJ) #第j維,每次隨機生成k箇中心 centroids[:,j] = mat(minJ + rangeJ * random.rand(k,1)) return centroids def randChosenCent(dataSet,k): # 樣本數 m=shape(dataSet)[0] # 初始化列表 centroidsIndex=[] #生成相似於樣本索引的列表 dataIndex=list(range(m)) for i in range(k): #生成隨機數 randIndex=random.randint(0,len(dataIndex)) #將隨機產生的樣本的索引放入centroidsIndex centroidsIndex.append(dataIndex[randIndex]) #刪除已經被抽中的樣本 del dataIndex[randIndex] #根據索引獲取樣本 centroids = dataSet.iloc[centroidsIndex] return mat(centroids) def kMeans(dataSet, k): # 樣本總數 m = shape(dataSet)[0] # 分配樣本到最近的簇:存[簇序號,距離的平方] # m行 2 列 clusterAssment = mat(zeros((m, 2))) # step1: # 經過隨機產生的樣本點初始化聚類中心 centroids = randChosenCent(dataSet, k) print('最初的中心=', centroids) # 標誌位,若是迭代先後樣本分類發生變化值爲Tree,不然爲False clusterChanged = True # 查看迭代次數 iterTime = 0 # 全部樣本分配結果再也不改變,迭代終止 while clusterChanged: clusterChanged = False # step2:分配到最近的聚類中心對應的簇中 for i in range(m): # 初始定義距離爲無窮大 minDist = inf; # 初始化索引值 minIndex = -1 # 計算每一個樣本與k箇中心點距離 for j in range(k): # 計算第i個樣本到第j箇中心點的距離 distJI = distEclud(centroids[j, :], dataSet.values[i, :]) # 判斷距離是否爲最小 if distJI < minDist: # 更新獲取到最小距離 minDist = distJI # 獲取對應的簇序號 minIndex = j # 樣本上次分配結果跟本次不同,標誌位clusterChanged置True if clusterAssment[i, 0] != minIndex: clusterChanged = True clusterAssment[i, :] = minIndex, minDist ** 2 # 分配樣本到最近的簇 iterTime += 1 sse = sum(clusterAssment[:, 1]) print('the SSE of %d' % iterTime + 'th iteration is %f' % sse) # step3:更新聚類中心 for cent in range(k): # 樣本分配結束後,從新計算聚類中心 # 獲取該簇全部的樣本點 ptsInClust = dataSet.iloc[nonzero(clusterAssment[:, 0].A == cent)[0]] # 更新聚類中心:axis=0沿列方向求均值。 centroids[cent, :] = mean(ptsInClust, axis=0) return centroids, clusterAssment def kMeansSSE(dataSet,k,distMeas=distEclud, createCent=randChosenCent): m = shape(dataSet)[0] #分配樣本到最近的簇:存[簇序號,距離的平方] clusterAssment=mat(zeros((m,2))) #step1:#初始化聚類中心 centroids = createCent(dataSet, k) print('initial centroids=',centroids) sseOld=0 sseNew=inf iterTime=0 #查看迭代次數 while(abs(sseNew-sseOld)>0.0001): sseOld=sseNew #step2:將樣本分配到最近的質心對應的簇中 for i in range(m): minDist=inf;minIndex=-1 for j in range(k): #計算第i個樣本與第j個質心之間的距離 distJI=distMeas(centroids[j,:],dataSet.values[i,:]) #獲取到第i樣本最近的質心的距離,及對應簇序號 if distJI<minDist: minDist=distJI;minIndex=j clusterAssment[i,:]=minIndex,minDist**2 #分配樣本到最近的簇 iterTime+=1 sseNew=sum(clusterAssment[:,1]) print('the SSE of %d'%iterTime + 'th iteration is %f'%sseNew) #step3:更新聚類中心 for cent in range(k): #樣本分配結束後,從新計算聚類中心 ptsInClust=dataSet[nonzero(clusterAssment[:,0].A==cent)[0]] #按列取平均,mean()對array類型 centroids[cent,:] = mean(ptsInClust, axis=0) return centroids, clusterAssment # 2維數據聚類效果顯示 def datashow(dataSet, k, centroids, clusterAssment): # 二維空間顯示聚類結果 from matplotlib import pyplot as plt num, dim = shape(dataSet) # 樣本數num ,維數dim if dim != 2: print('sorry,the dimension of your dataset is not 2!') return 1 marksamples = ['or', 'ob', 'og', 'ok', '^r', '^b', '<g'] # 樣本圖形標記 if k > len(marksamples): print('sorry,your k is too large,please add length of the marksample!') return 1 # 繪全部樣本 for i in range(num): markindex = int(clusterAssment[i, 0]) # 矩陣形式轉爲int值, 簇序號 # 特徵維對應座標軸x,y;樣本圖形標記及大小 plt.plot(dataSet.iat[i, 0], dataSet.iat[i, 1], marksamples[markindex], markersize=6) # 繪中心點 markcentroids = ['o', '*', '^'] # 聚類中心圖形標記 label = ['0', '1', '2'] c = ['yellow', 'pink', 'red'] for i in range(k): plt.plot(centroids[i, 0], centroids[i, 1], markcentroids[i], markersize=15, label=label[i], c=c[i]) plt.legend(loc='upper left') plt.xlabel('sepal length') plt.ylabel('sepal width') plt.title('k-means cluster result') # 標題 plt.show() # 畫出實際圖像 def trgartshow(dataSet, k, labels): from matplotlib import pyplot as plt num, dim = shape(dataSet) label = ['0', '1', '2'] marksamples = ['ob', 'or', 'og', 'ok', '^r', '^b', '<g'] # 經過循環的方式,完成分組散點圖的繪製 for i in range(num): plt.plot(datamat.iat[i, 0], datamat.iat[i, 1], marksamples[int(labels.iat[i, 0])], markersize=6) for i in range(0, num, 50): plt.plot(datamat.iat[i, 0], datamat.iat[i, 1], marksamples[int(labels.iat[i, 0])], markersize=6, label=label[int(labels.iat[i, 0])]) plt.legend(loc='upper left') # 添加軸標籤和標題 plt.xlabel('sepal length') plt.ylabel('sepal width') plt.title('iris true result') # 標題 # 顯示圖形 plt.show() # label=labels.iat[i,0] #聚類前,繪製原始的樣本點 def originalDatashow(dataSet): #樣本的個數和特徵維數 num,dim=shape(dataSet) marksamples=['ob'] #樣本圖形標記 for i in range(num): plt.plot(datamat.iat[i,0],datamat.iat[i,1],marksamples[0],markersize=5) plt.title('original dataset') plt.xlabel('sepal length') plt.ylabel('sepal width') #標題 plt.show() if __name__ == '__main__': # =====kmeans聚類 # # #獲取樣本數據 datamat = dataset.loc[:, ['sepal-length', 'sepal-width']] # 真實的標籤 labels = dataset.loc[:, ['class']] # #原始數據顯示 originalDatashow(datamat) # #*****kmeans聚類 k = 3 # 用戶定義聚類數 mycentroids, clusterAssment = kMeans(datamat, k) # mycentroids,clusterAssment=kMeansSSE(datamat,k) # 繪圖顯示 datashow(datamat, k, mycentroids, clusterAssment) trgartshow(datamat, 3, labels)
下面,使用TensorFlow,實現以下:算法
import tensorflow as tf import numpy as np from tensorflow.contrib.factorization import KMeans import os os.environ['CUDA_VISIBLE_DEVICES']='' from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets('/tmp/data',one_hot=True) full_data_x = mnist.train.images num_steps = 50 batch_size = 1024 k = 25 num_classes = 10 num_features = 28*28 X = tf.placeholder(tf.float32,[None,num_features]) y = tf.placeholder(tf.float32,[None,num_classes]) kmeans = KMeans(inputs=X,num_clusters=k,distance_metric='cosine',use_mini_batch=True) # Build KMeans graph all_scores, cluster_idx, scores, cluster_centers_initialized,init_op, training_op = kmeans.training_graph() cluster_idx = cluster_idx[0] avg_distance = tf.reduce_mean(scores) # Initialize the variables (i.e. assign their default value) init_vars = tf.global_variables_initializer() sess = tf.Session() sess.run(init_vars, feed_dict={X: full_data_x}) sess.run(init_op, feed_dict={X: full_data_x}) # Training for i in range(1, num_steps + 1): _, d, idx = sess.run([training_op, avg_distance, cluster_idx], feed_dict={X: full_data_x}) if i % 10 == 0 or i == 1: print("Step %i, Avg Distance: %f" % (i, d)) counts = np.zeros(shape=(k, num_classes)) for i in range(len(idx)): counts[idx[i]] += mnist.train.labels[i] # Assign the most frequent label to the centroid labels_map = [np.argmax(c) for c in counts] labels_map = tf.convert_to_tensor(labels_map) # Evaluation ops # Lookup: centroid_id -> label cluster_label = tf.nn.embedding_lookup(labels_map, cluster_idx) # Compute accuracy correct_prediction = tf.equal(cluster_label, tf.cast(tf.argmax(y, 1), tf.int32)) accuracy_op = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # Test Model test_x, test_y = mnist.test.images, mnist.test.labels print("Test Accuracy:", sess.run(accuracy_op, feed_dict={X: test_x, y: test_y}))