在上一篇使用hadoop mapreduce分析mongodb數據:(1)中,介紹瞭如何使用Hadoop MapReduce鏈接MongoDB數據庫以及如何處理數據庫,本文結合一個案例來進一步說明Hadoop MapReduce處理MongoDB的細節html
> db.stackin.find({}) { "_id" : ObjectId("575ce909aa02c3b21f1be0bb"), "summary" : "good good day", "url" : "url_1" } { "_id" : ObjectId("575ce909aa02c3b21f1be0bc"), "summary" : "hello world good world", "url" : "url_2" } { "_id" : ObjectId("575ce909aa02c3b21f1be0bd"), "summary" : "hello world good hello good", "url" : "url_3" } { "_id" : ObjectId("575ce909aa02c3b21f1be0be"), "summary" : "hello world hello", "url" : "url_4" }
每個記錄表示一個網頁,summary對應的值是網頁的文章,url對應的值是該文章的連接java
> db.stackout.find({}) { "_id" : "world", "data" : [ { "url_2" : 2 }, { "url_3" : 1 }, { "url_4" : 1 } ], "index" : 0, "maxindex" : 3 } { "_id" : "good", "data" : [ { "url_1" : 2 }, { "url_2" : 1 }, { "url_3" : 2 } ], "index" : 0, "maxindex" : 3 } { "_id" : "day", "data" : [ { "url_1" : 1 } ], "index" : 0, "maxindex" : 1 } { "_id" : "hello", "data" : [ { "url_2" : 1 }, { "url_3" : 2 }, { "url_4" : 2 } ], "index" : 0, "maxindex" : 3 }
咱們須要統計每一個單詞在每一個網頁中分別出現的次數,從結果可知,單詞world在每一個url出現的次數mongodb
1 import java.util.*; 2 import java.io.*; 3 4 import org.bson.*; 5 6 import com.mongodb.hadoop.MongoInputFormat; 7 import com.mongodb.hadoop.MongoOutputFormat; 8 import com.mongodb.hadoop.io.BSONWritable; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.io.*; 12 import org.apache.hadoop.mapreduce.*; 13 14 15 public class WordCount { 16 17 public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, BSONWritable> { 18 //private final static 19 private Text word = new Text(); 20 21 public void map(Object key, BSONObject value, Context context ) 22 throws IOException, InterruptedException { 23 String url = value.get("url").toString(); 24 StringTokenizer itr = new StringTokenizer(value.get("summary").toString(). 25 replaceAll("\\p{Punct}|\\d","").replaceAll("\r\n", " ").replace("\r", " "). 26 replace("\n", " ").toLowerCase()); 27 while (itr.hasMoreTokens()) { 28 word.set(itr.nextToken()); 29 BasicBSONObject urlCounts = new BasicBSONObject(); 30 urlCounts.put(url, 1); 31 context.write(word, new BSONWritable(urlCounts)); 32 } 33 } 34 } 35 36 public static class IntSumReducer extends Reducer<Text, BSONWritable, Text, BSONWritable> { 37 //private BasicBSONObject result = new BasicBSONObject(); 38 39 public void reduce(Text key, Iterable<BSONWritable> values, Context context) 40 throws IOException, InterruptedException { 41 HashMap<String, Integer> mymap = new HashMap<String, Integer>(); 42 BasicBSONObject result = new BasicBSONObject(); 43 BasicBSONObject urlcount = new BasicBSONObject(); 44 for (BSONWritable val : values) { 45 @SuppressWarnings("unchecked") 46 BSONObject temp2 = val.getDoc(); 47 @SuppressWarnings("unchecked") 48 HashMap<String, Integer> temp = (HashMap<String, Integer>) val.getDoc().toMap(); 49 for (Map.Entry<String, Integer> entry : temp.entrySet()) { 50 if (mymap.containsKey(entry.getKey())) { 51 mymap.put(entry.getKey(), entry.getValue()+1); 52 } 53 else { 54 mymap.put(entry.getKey(), 1); 55 } 56 } 57 } 58 result.putAll(mymap); 59 context.write(key, new BSONWritable(result)); 60 } 61 } 62 63 public static void main(String[] args) throws Exception { 64 Configuration conf = new Configuration(); 65 conf.set( "mongo.input.uri" , "mongodb://localhost/stackoverflow.stackin" ); 66 conf.set( "mongo.output.uri" , "mongodb://localhost/stackoverflow.stackout" ); 67 @SuppressWarnings("deprecation") 68 Job job = new Job(conf, "word count"); 69 job.setJarByClass(WordCount.class); 70 job.setMapperClass(TokenizerMapper.class); 71 //job.setCombinerClass(IntSumReducer.class); 72 job.setReducerClass(IntSumReducer.class); 73 job.setMapOutputKeyClass(Text.class); 74 job.setMapOutputValueClass(BSONWritable.class); 75 job.setOutputKeyClass(Text.class); 76 job.setOutputValueClass(BSONWritable.class); 77 job.setInputFormatClass( MongoInputFormat.class ); 78 job.setOutputFormatClass( MongoOutputFormat.class ); 79 System.exit(job.waitForCompletion(true) ? 0 : 1); 80 } 81 }
設計的思路是,在map部分獲得一個word以及鍵爲url值爲1的Bson對象,而後寫入content中。對應的,在reduce部分對傳入的值進行統計。數據庫
總結:本案例很簡單,可是須要明白Hadoop MapReduce的原理以及mongo-hadoop API中對象的使用。若是有疑問,能夠在評論區提出~apache