合成控制的數據集 synthetic_control.data 能夠從 此處下載,總共由600行X60列double型的數據組成, 意思是有600個元組,每一個元組是一個時間序列。java
1. 把數據拷到集羣上,放到kmeans/目錄下算法
hadoop fs -mv synthetic_control.data kmeans/synthetic_control.data
2. 輸入以下mahout命令進行KMeans聚類分析apache
當命令中有這個--numClusters( 表明聚類結果中簇的個數)參數的話,它會採用Kmeans聚類。若是沒有配置這個參數的話,它會先採用Canopy聚類,-t1和-t2是用於Canopy聚類的配置參數。數組
從Mahout源碼能夠分析出:進行KMeans聚類時,會產生四個步驟。app
其中 前倆步就是 KMeans聚類算法的準備工做。dom
主要流程能夠從org.apache.mahout.clustering.syntheticcontrol.kmeans.Job#run()方法裏看出一些端倪。ide
public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k, double convergenceDelta, int maxIterations) throws Exception { //1. synthetic_control.data存儲的文本格式,轉換成Key/Value格式,存入到output/data目錄。Key爲保存一個Integer的Text類型, Value爲VectorWritable類型。 Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT); log.info("Preparing Input"); InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector"); //2. 隨機產生幾個cluster,存入到output/clusters-0/part-randomSeed文件裏。Key爲Text, Value爲ClusterWritable類型。 log.info("Running random seed to get initial clusters"); Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR); clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure); //3. 進行聚類迭代運算,爲每個簇從新選出cluster centroid中心 log.info("Running KMeans"); KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, measure, convergenceDelta, maxIterations, true, 0.0, false); //4. 根據上面選出的中心,把output/data裏面的記錄,都分配給各個cluster。輸出運算結果,把sequencefile格式轉化成textfile格式展現出來 // run ClusterDumper ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output, "clusteredPoints")); clusterDumper.printClusters(null); }
public static Path buildClusters(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { double convergenceDelta = Double.parseDouble(delta); //從output/clusters-0/part-randomSeed文件裏讀出Cluster數據,放入到clusters變量中。 List<Cluster> clusters = Lists.newArrayList(); KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters); if (clusters.isEmpty()) { throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument."); } //把聚類策略(控制收斂程度)寫進output/clusters-0/_policy文件中 //同時,每一個簇cluster在output/clusters-0/下對應生成part-000xx文件 Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR); ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta); ClusterClassifier prior = new ClusterClassifier(clusters, policy); prior.writeToSeqFiles(priorClustersPath); //開始迭代maxIterations次執行Map/Reduce if (runSequential) { ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, maxIterations); } else { ClusterIterator.iterateMR(conf, input, priorClustersPath, output, maxIterations); } return output; }
調整cluster中心的Job的代碼以下:oop
public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations) throws IOException, InterruptedException, ClassNotFoundException { ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath); Path clustersOut = null; int iteration = 1; while (iteration <= numIterations) { conf.set(PRIOR_PATH_KEY, priorPath.toString()); String jobName = "Cluster Iterator running iteration " + iteration + " over priorPath: " + priorPath; Job job = new Job(conf, jobName); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(ClusterWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(ClusterWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //核心算法就在這個CIMapper和CIReducer裏面 job.setMapperClass(CIMapper.class); job.setReducerClass(CIReducer.class); FileInputFormat.addInputPath(job, inPath); clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration); priorPath = clustersOut; FileOutputFormat.setOutputPath(job, clustersOut); job.setJarByClass(ClusterIterator.class); if (!job.waitForCompletion(true)) { throw new InterruptedException("Cluster Iteration " + iteration + " failed processing " + priorPath); } ClusterClassifier.writePolicy(policy, clustersOut); FileSystem fs = FileSystem.get(outPath.toUri(), conf); iteration++; if (isConverged(clustersOut, conf, fs)) { break; } } //把最後一次迭代的結果目錄重命名,加一個final Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX); FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn); }
CIMapper代碼以下:學習
@Override protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException, InterruptedException { Vector probabilities = classifier.classify(value.get()); Vector selections = policy.select(probabilities); for (Iterator<Element> it = selections.iterateNonZero(); it.hasNext();) { Element el = it.next(); classifier.train(el.index(), value.get(), el.get()); } }
在這裏面須要釐清ui
org.apache.mahout.clustering.iterator.KMeansClusteringPolicy
和
org.apache.mahout.clustering.classify.ClusterClassifier
這兩個類。
前者是聚類的策略,能夠說它提供聚類的核心算法。
後者是聚類的分類器,它的功能是基於聚類策略把數據進行分類。
ClusterClassifier.classify()求得某點到全部cluster中心的距離,獲得的是一個數組。
@Override public Vector classify(Vector data, ClusterClassifier prior) { List<Cluster> models = prior.getModels(); int i = 0; Vector pdfs = new DenseVector(models.size()); for (Cluster model : models) { pdfs.set(i++, model.pdf(new VectorWritable(data))); } return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum()); }
上述代碼中的org.apache.mahout.clustering.iterator.DistanceMeasureCluster.pdf(VectorWritable)求該點到Cluster形心的距離,其算法代碼以下:
@Override public double pdf(VectorWritable vw) { return 1 / (1 + measure.distance(vw.get(), getCenter())); }
pdfs.zSum()是pdfs double數組的和。而後再對pdfs進行歸一化處理。
所以最後select()用於選出類似度最大的cluster的下標,而且對其賦予權重1.0。以下所示:
@Override public Vector select(Vector probabilities) { int maxValueIndex = probabilities.maxValueIndex(); Vector weights = new SequentialAccessSparseVector(probabilities.size()); weights.set(maxValueIndex, 1.0); return weights; }
接下來,爲了從新獲得新的中心,經過org.apache.mahout.clustering.classify.ClusterClassifier.train(int, Vector, double)爲訓練數據,即最後在AbstractCluster裏面準備數據。
public void observe(Vector x, double weight) { if (weight == 1.0) { observe(x); } else { setS0(getS0() + weight); Vector weightedX = x.times(weight); if (getS1() == null) { setS1(weightedX); } else { getS1().assign(weightedX, Functions.PLUS); } Vector x2 = x.times(x).times(weight); if (getS2() == null) { setS2(x2); } else { getS2().assign(x2, Functions.PLUS); } } }
在CIReducer裏面,對屬於同一個Cluster裏面的數據進行合併,而且求出centroid形心。
@Override protected void reduce(IntWritable key, Iterable<ClusterWritable> values, Context context) throws IOException, InterruptedException { Iterator<ClusterWritable> iter = values.iterator(); Cluster first = iter.next().getValue(); // there must always be at least one while (iter.hasNext()) { Cluster cluster = iter.next().getValue(); first.observe(cluster); } List<Cluster> models = Lists.newArrayList(); models.add(first); classifier = new ClusterClassifier(models, policy); classifier.close(); context.write(key, new ClusterWritable(first)); }
求centroid算法代碼以下:
@Override public void computeParameters() { if (getS0() == 0) { return; } setNumObservations((long) getS0()); setTotalObservations(getTotalObservations() + getNumObservations()); setCenter(getS1().divide(getS0())); // compute the component stds if (getS0() > 1) { setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0())); } setS0(0); setS1(center.like()); setS2(center.like()); }
真正對output/data記錄分配給各個簇的代碼是:
private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output, Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException, ClassNotFoundException { conf.setFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD, clusterClassificationThreshold.floatValue()); conf.setBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, emitMostLikely); conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString()); Job job = new Job(conf, "Cluster Classification Driver running over input: " + input); job.setJarByClass(ClusterClassificationDriver.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //進行記錄分配 job.setMapperClass(ClusterClassificationMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(WeightedVectorWritable.class); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); if (!job.waitForCompletion(true)) { throw new InterruptedException("Cluster Classification Driver Job failed processing " + input); } }