(轉)Mahout Kmeans Clustering 學習

1、Mahout命令使用

合成控制的數據集 synthetic_control.data 能夠從 此處下載,總共由600行X60列double型的數據組成, 意思是有600個元組,每一個元組是一個時間序列。java

1. 把數據拷到集羣上,放到kmeans/目錄下算法

  
hadoop fs -mv synthetic_control.data kmeans/synthetic_control.data
 

 

2. 輸入以下mahout命令進行KMeans聚類分析apache

 

mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job --input kmeans/synthetic_control.data  --numClusters 3 -t1 3 -t2 6 --maxIter 3 --output kmeans/output

 當命令中有這個--numClusters( 表明聚類結果中簇的個數)參數的話,它會採用Kmeans聚類。若是沒有配置這個參數的話,它會先採用Canopy聚類,-t1和-t2是用於Canopy聚類的配置參數。數組

 

2、源碼學習

從Mahout源碼能夠分析出:進行KMeans聚類時,會產生四個步驟。app

  1. 數據預處理,整理規範化數據
  2. 從上述數據中隨機選擇若干個數據看成Cluster的中心
  3. 迭代計算,調整形心
  4. 把數據分給各個Cluster

其中 前倆步就是 KMeans聚類算法的準備工做。dom

主要流程能夠從org.apache.mahout.clustering.syntheticcontrol.kmeans.Job#run()方法裏看出一些端倪。ide

  

  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
      double convergenceDelta, int maxIterations) throws Exception {
    //1. synthetic_control.data存儲的文本格式,轉換成Key/Value格式,存入到output/data目錄。Key爲保存一個Integer的Text類型, Value爲VectorWritable類型。
    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
    log.info("Preparing Input");
    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
    //2. 隨機產生幾個cluster,存入到output/clusters-0/part-randomSeed文件裏。Key爲Text, Value爲ClusterWritable類型。
    log.info("Running random seed to get initial clusters");
    Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
    clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
    //3. 進行聚類迭代運算,爲每個簇從新選出cluster centroid中心
    log.info("Running KMeans");
    KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, measure, convergenceDelta,
        maxIterations, true, 0.0, false);
    //4. 根據上面選出的中心,把output/data裏面的記錄,都分配給各個cluster。輸出運算結果,把sequencefile格式轉化成textfile格式展現出來
    // run ClusterDumper
    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
        "clusteredPoints"));
    clusterDumper.printClusters(null);
  }
  1. RandomAccessSparseVector是一個Vector實現,裏面有一個 OpenIntDoubleMap屬性,該OpenIntDoubleMap不是繼承自HashMap,而是本身實現了一套相似的hashMap,數據是經過一個Int數組和Long數組維護着,所以沒法經過Iterator爲遍歷。
  2. RandomSeedGenerator#buildRandom()是在上面的Vector裏面隨機抽樣k個序列簇Kluster,採用的是一種蓄水池抽樣(Reservoir Sampling)的方法:即先把前k個數放入蓄水池,對第k+1,咱們以k/(k+1)機率決定是否要把它換入蓄水池,最終每一個數都是以相同的機率k/n進入蓄水池。它經過強大的MersenneTwister僞隨機生成器來隨機產生,它產生的隨機數長度可達2^19937 - 1,維度可高達623維,同時數值還能夠精確到32位的均勻分佈。

1. 迭代計算準備工做

真正在作KMeans聚類的代碼是:
 
  public static Path buildClusters(Configuration conf, Path input, Path clustersIn, Path output,
      DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException,
      InterruptedException, ClassNotFoundException {
    
    double convergenceDelta = Double.parseDouble(delta);
    //從output/clusters-0/part-randomSeed文件裏讀出Cluster數據,放入到clusters變量中。
    List<Cluster> clusters = Lists.newArrayList();
    KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
    
    if (clusters.isEmpty()) {
      throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument.");
    }
    //把聚類策略(控制收斂程度)寫進output/clusters-0/_policy文件中
    //同時,每一個簇cluster在output/clusters-0/下對應生成part-000xx文件
    Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
    ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
    ClusterClassifier prior = new ClusterClassifier(clusters, policy);
    prior.writeToSeqFiles(priorClustersPath);
    //開始迭代maxIterations次執行Map/Reduce
    if (runSequential) {
      ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, maxIterations);
    } else {
      ClusterIterator.iterateMR(conf, input, priorClustersPath, output, maxIterations);
    }
    return output;
  }
  

2. 迭代計算

調整cluster中心的Job的代碼以下:oop

 
  public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
    Path clustersOut = null;
    int iteration = 1;
    while (iteration <= numIterations) {
      conf.set(PRIOR_PATH_KEY, priorPath.toString());
      
      String jobName = "Cluster Iterator running iteration " + iteration + " over priorPath: " + priorPath;
      Job job = new Job(conf, jobName);
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(ClusterWritable.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(ClusterWritable.class);
      
      job.setInputFormatClass(SequenceFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);
      //核心算法就在這個CIMapper和CIReducer裏面
      job.setMapperClass(CIMapper.class);
      job.setReducerClass(CIReducer.class);
      
      FileInputFormat.addInputPath(job, inPath);
      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
      priorPath = clustersOut;
      FileOutputFormat.setOutputPath(job, clustersOut);
      
      job.setJarByClass(ClusterIterator.class);
      if (!job.waitForCompletion(true)) {
        throw new InterruptedException("Cluster Iteration " + iteration + " failed processing " + priorPath);
      }
      ClusterClassifier.writePolicy(policy, clustersOut);
      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
      iteration++;
      if (isConverged(clustersOut, conf, fs)) {
        break;
      }
    }
    //把最後一次迭代的結果目錄重命名,加一個final
    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
  }

  

2.1. Map階段

CIMapper代碼以下:學習

 

 
 @Override
  protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException,
      InterruptedException {
    Vector probabilities = classifier.classify(value.get());
    Vector selections = policy.select(probabilities);
    for (Iterator<Element> it = selections.iterateNonZero(); it.hasNext();) {
      Element el = it.next();
      classifier.train(el.index(), value.get(), el.get());
    }
  }
 

 

 

在這裏面須要釐清ui

org.apache.mahout.clustering.iterator.KMeansClusteringPolicy

org.apache.mahout.clustering.classify.ClusterClassifier

這兩個類。

前者是聚類的策略,能夠說它提供聚類的核心算法。

後者是聚類的分類器,它的功能是基於聚類策略把數據進行分類。

 

2.1.1. ClusterClassifier 求點到Cluster形心的距離

 ClusterClassifier.classify()求得某點到全部cluster中心的距離,獲得的是一個數組。

 

 
@Override
  public Vector classify(Vector data, ClusterClassifier prior) {
    List<Cluster> models = prior.getModels();
    int i = 0;
    Vector pdfs = new DenseVector(models.size());
    for (Cluster model : models) {
      pdfs.set(i++, model.pdf(new VectorWritable(data)));
    }
    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
  }
 

 

上述代碼中的org.apache.mahout.clustering.iterator.DistanceMeasureCluster.pdf(VectorWritable)求該點到Cluster形心的距離,其算法代碼以下:

 

Java代碼 複製代碼  收藏代碼  
@Override
  public double pdf(VectorWritable vw) {
    return 1 / (1 + measure.distance(vw.get(), getCenter()));
  }
 
每一次迭代後,就會從新計算一次centroid,經過AbstractCluster.computeParameters來計算的。
 

pdfs.zSum()是pdfs double數組的和。而後再對pdfs進行歸一化處理。

所以最後select()用於選出類似度最大的cluster的下標,而且對其賦予權重1.0。以下所示:

 

 
@Override
  public Vector select(Vector probabilities) {
    int maxValueIndex = probabilities.maxValueIndex();
    Vector weights = new SequentialAccessSparseVector(probabilities.size());
    weights.set(maxValueIndex, 1.0);
    return weights;
  }
 

 

2.1.2. ClusterClassifier 爲求Cluster新形心作準備

 接下來,爲了從新獲得新的中心,經過org.apache.mahout.clustering.classify.ClusterClassifier.train(int, Vector, double)爲訓練數據,即最後在AbstractCluster裏面準備數據。

 
public void observe(Vector x, double weight) {
    if (weight == 1.0) {
      observe(x);
    } else {
      setS0(getS0() + weight);
      Vector weightedX = x.times(weight);
      if (getS1() == null) {
        setS1(weightedX);
      } else {
        getS1().assign(weightedX, Functions.PLUS);
      }
      Vector x2 = x.times(x).times(weight);
      if (getS2() == null) {
        setS2(x2);
      } else {
        getS2().assign(x2, Functions.PLUS);
      }
    }
  }

 

2.2. Reduce階段

在CIReducer裏面,對屬於同一個Cluster裏面的數據進行合併,而且求出centroid形心。

 
@Override
  protected void reduce(IntWritable key, Iterable<ClusterWritable> values, Context context) throws IOException,
      InterruptedException {
    Iterator<ClusterWritable> iter = values.iterator();
    Cluster first = iter.next().getValue(); // there must always be at least one
    while (iter.hasNext()) {
      Cluster cluster = iter.next().getValue();
      first.observe(cluster);
    }
    List<Cluster> models = Lists.newArrayList();
    models.add(first);
    classifier = new ClusterClassifier(models, policy);
    classifier.close();
    context.write(key, new ClusterWritable(first));
  }

 

2.2.1. Reduce中求centroid形心的算法

 求centroid算法代碼以下:

@Override
  public void computeParameters() {
    if (getS0() == 0) {
      return;
    }
    setNumObservations((long) getS0());
    setTotalObservations(getTotalObservations() + getNumObservations());
    setCenter(getS1().divide(getS0()));
    // compute the component stds
    if (getS0() > 1) {
      setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
    }
    setS0(0);
    setS1(center.like());
    setS2(center.like());
  }

 

 

3. 聚類數據

 

真正對output/data記錄分配給各個簇的代碼是:

 
 private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output,
      Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException,
      ClassNotFoundException {
    
    conf.setFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD,
                  clusterClassificationThreshold.floatValue());
    conf.setBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, emitMostLikely);
    conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
    
    Job job = new Job(conf, "Cluster Classification Driver running over input: " + input);
    job.setJarByClass(ClusterClassificationDriver.class);
    
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    //進行記錄分配
    job.setMapperClass(ClusterClassificationMapper.class);
    job.setNumReduceTasks(0);
    
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(WeightedVectorWritable.class);
    
    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("Cluster Classification Driver Job failed processing " + input);
    }
  }
  

 摘錄地址:http://zcdeng.iteye.com/blog/1859711

相關文章
相關標籤/搜索