分析一下自帶的kmeans代碼,這個要稍微複雜一些python
import numpy as np from pyspark import SparkContext #這個函數的目的就是把讀入的數據都轉化爲float類型的數據 def parseVector(line): return np.array([float(x) for x in line.split(' ')]) #此函數的目的就是求取該點應該分到哪一個點集中去,返回的是序號 def closestPoint(p, centers): bestIndex = 0 closest = float("+inf") for i in range(len(centers)): tempDist = np.sum((p - centers[i]) ** 2) if tempDist < closest: closest = tempDist bestIndex = i return bestIndex if __name__ == "__main__": if len(sys.argv) != 4: print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr) exit(-1) print("""WARN: This is a naive implementation of KMeans Clustering and is given as an example! Please refer to examples/src/main/python/mllib/kmeans.py for an example on how to use MLlib's KMeans implementation.""", file=sys.stderr) sc = SparkContext(appName="PythonKMeans") lines = sc.textFile(sys.argv[1]) #此處調用了rdd的map函數把全部的數據都轉換爲float類型 data = lines.map(parseVector).cache() #這裏的K就是設置的中心點數 K = int(sys.argv[2]) #設置的閾值,若是兩次之間的距離小於該閾值的話則中止迭代 convergeDist = float(sys.argv[3]) #從點集中用採樣的方式來抽取K個值 kPoints = data.takeSample(False, K, 1) #中心點調整後的距離差 tempDist = 1.0 #若是距離差大於閾值則執行 while tempDist > convergeDist: #對全部數據執行map過程,最終生成的是(index, (point, 1))的rdd closest = data.map( lambda p: (closestPoint(p, kPoints), (p, 1))) #執行reduce過程,該過程的目的是從新求取中心點,生成的也是rdd pointStats = closest.reduceByKey( lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1])) #生成新的中心點 newPoints = pointStats.map( lambda st: (st[0], st[1][0] / st[1][1])).collect() #計算一下新舊中心點的距離差 tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints) #設置新的中心點 for (iK, p) in newPoints: kPoints[iK] = p print("Final centers: " + str(kPoints)) sc.stop()
這裏就是整個過程了,提及來結合了numpy來使用還比較簡單app