使用hadoop mapreduce分析mongodb數據:(2)

在上一篇使用hadoop mapreduce分析mongodb數據:(1)中,介紹瞭如何使用Hadoop MapReduce鏈接MongoDB數據庫以及如何處理數據庫,本文結合一個案例來進一步說明Hadoop MapReduce處理MongoDB的細節html

  • 原始數據
  • > db.stackin.find({})
    { "_id" : ObjectId("575ce909aa02c3b21f1be0bb"), "summary" : "good good day",  "url" : "url_1" }
    { "_id" : ObjectId("575ce909aa02c3b21f1be0bc"), "summary" : "hello world good world", "url" : "url_2" }
    { "_id" : ObjectId("575ce909aa02c3b21f1be0bd"), "summary" : "hello world good hello good", "url" : "url_3" }
    { "_id" : ObjectId("575ce909aa02c3b21f1be0be"), "summary" : "hello world hello",  "url" : "url_4" }

    每個記錄表示一個網頁,summary對應的值是網頁的文章,url對應的值是該文章的連接java

  • 目標結果
  • > db.stackout.find({})
    { "_id" : "world", "data" : [  {  "url_2" : 2 },  {  "url_3" : 1 },  {  "url_4" : 1 } ], "index" : 0, "maxindex" : 3 }
    { "_id" : "good", "data" : [  {  "url_1" : 2 },  {  "url_2" : 1 },  {  "url_3" : 2 } ], "index" : 0, "maxindex" : 3 }
    { "_id" : "day", "data" : [  {  "url_1" : 1 } ], "index" : 0, "maxindex" : 1 }
    { "_id" : "hello", "data" : [  {  "url_2" : 1 },  {  "url_3" : 2 },  {  "url_4" : 2 } ], "index" : 0, "maxindex" : 3 }

    咱們須要統計每一個單詞在每一個網頁中分別出現的次數,從結果可知,單詞world在每一個url出現的次數mongodb

  • 設計代碼
     1 import java.util.*; 
     2 import java.io.*;
     3 
     4 import org.bson.*;
     5 
     6 import com.mongodb.hadoop.MongoInputFormat;
     7 import com.mongodb.hadoop.MongoOutputFormat;
     8 import com.mongodb.hadoop.io.BSONWritable;
     9 
    10 import org.apache.hadoop.conf.Configuration;
    11 import org.apache.hadoop.io.*;
    12 import org.apache.hadoop.mapreduce.*;
    13 
    14 
    15 public class WordCount {
    16 
    17     public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, BSONWritable> {
    18         //private final static 
    19         private Text word = new Text();
    20 
    21         public void map(Object key, BSONObject value, Context context ) 
    22                 throws IOException, InterruptedException {
    23             String url = value.get("url").toString();
    24             StringTokenizer itr = new StringTokenizer(value.get("summary").toString().
    25                     replaceAll("\\p{Punct}|\\d","").replaceAll("\r\n", " ").replace("\r", " ").
    26                     replace("\n", " ").toLowerCase());
    27             while (itr.hasMoreTokens()) {
    28                 word.set(itr.nextToken());
    29                 BasicBSONObject urlCounts = new BasicBSONObject();
    30                 urlCounts.put(url, 1);
    31                 context.write(word, new BSONWritable(urlCounts));
    32             }
    33         }
    34     }
    35 
    36     public static class IntSumReducer extends Reducer<Text, BSONWritable, Text, BSONWritable> {
    37         //private BasicBSONObject result = new BasicBSONObject();
    38         
    39         public void reduce(Text key, Iterable<BSONWritable> values, Context context)
    40             throws IOException, InterruptedException {
    41             HashMap<String, Integer> mymap = new HashMap<String, Integer>();
    42             BasicBSONObject result = new BasicBSONObject();
    43             BasicBSONObject urlcount = new BasicBSONObject();
    44             for (BSONWritable val : values) {
    45                 @SuppressWarnings("unchecked")
    46                 BSONObject temp2 = val.getDoc();
    47                 @SuppressWarnings("unchecked")
    48                 HashMap<String, Integer> temp = (HashMap<String, Integer>) val.getDoc().toMap();
    49                 for (Map.Entry<String, Integer> entry :  temp.entrySet()) {
    50                     if (mymap.containsKey(entry.getKey())) {
    51                         mymap.put(entry.getKey(), entry.getValue()+1);
    52                     }
    53                     else {
    54                         mymap.put(entry.getKey(), 1);
    55                     }
    56                 }
    57             }
    58             result.putAll(mymap);
    59             context.write(key, new BSONWritable(result));
    60         }
    61     }
    62 
    63     public static void main(String[] args) throws Exception {
    64         Configuration conf = new Configuration();
    65         conf.set( "mongo.input.uri" , "mongodb://localhost/stackoverflow.stackin" );
    66         conf.set( "mongo.output.uri" , "mongodb://localhost/stackoverflow.stackout" );
    67         @SuppressWarnings("deprecation")
    68         Job job = new Job(conf, "word count");
    69         job.setJarByClass(WordCount.class);
    70         job.setMapperClass(TokenizerMapper.class);
    71         //job.setCombinerClass(IntSumReducer.class);
    72         job.setReducerClass(IntSumReducer.class);
    73         job.setMapOutputKeyClass(Text.class);
    74         job.setMapOutputValueClass(BSONWritable.class);
    75         job.setOutputKeyClass(Text.class);
    76         job.setOutputValueClass(BSONWritable.class);
    77         job.setInputFormatClass( MongoInputFormat.class );
    78         job.setOutputFormatClass( MongoOutputFormat.class );
    79         System.exit(job.waitForCompletion(true) ? 0 : 1);
    80     }
    81 }

    設計的思路是,在map部分獲得一個word以及鍵爲url值爲1的Bson對象,而後寫入content中。對應的,在reduce部分對傳入的值進行統計。數據庫

 

總結:本案例很簡單,可是須要明白Hadoop MapReduce的原理以及mongo-hadoop API中對象的使用。若是有疑問,能夠在評論區提出~apache

相關文章
相關標籤/搜索