MapReduce常見編程實例集錦。java
(1) 輸入輸出數據庫
輸入數據:apache
file1.csv內容
hellod world
file2.csv內容
hellod hadoop
輸出結果:編程
hadoop 1 hello 2 world 1
(2) 代碼實現及分析app
package com.hadoop.kwang; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { /** * Mapper類 * * Object和Text是輸入數據的<key,value>類型 * Text和IntWritable是輸出數據的<key,value>類型 */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //讀取一行的文本,並進行分割 StringTokenizer itr = new StringTokenizer(value.toString()); //遍歷讀取並記錄分割後的每個單詞 while (itr.hasMoreTokens()) { word.set(itr.nextToken()); //輸出的<key,value>形式都是:<"word",1> context.write(word, one); } } } /** * Reducer類 * */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //統計單詞次數 int sum = 0; //values是某個key對應的value的集合,即<key,value-list>,好比<hello, <1,1>>,values是值的集合 for (IntWritable val : values) { //對全部value進行累加 sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //配置輸入輸出路徑 String input = "hdfs://0.0.0.0:xxx/hadoop/wordcount/input/"; String output = "hdfs://0.0.0.0:xxx/hadoop/wordcount/output/"; Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //爲job設置Mapper類 job.setCombinerClass(IntSumReducer.class); //爲job設置Conbiner類 job.setReducerClass(IntSumReducer.class); //爲job設置Reducer類 job.setOutputKeyClass(Text.class); //設置輸出key類型 job.setOutputValueClass(IntWritable.class); //設置輸出value類型 FileInputFormat.addInputPath(job, new Path(input)); //設置數據輸入路徑 FileOutputFormat.setOutputPath(job, new Path(output)); //設置數據輸出路徑 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
(1) 輸入輸出ide
輸入數據:工具
file1.csv內容
2017-12-09 a 2017-12-10 a 2017-12-11 a 2017-12-12 b 2017-12-13 b
file2.csv內容
2017-12-09 b 2017-12-10 b 2017-12-11 b 2017-12-12 b 2017-12-13 b
輸出結果:oop
2017-12-09 a 2017-12-09 b 2017-12-10 a 2017-12-10 b 2017-12-11 a 2017-12-11 b 2017-12-12 b 2017-12-13 b
(2) 代碼實現及分析ui
import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DedupClean { /* * Mapper類 */ public static class DedupCleanMapper extends Mapper<LongWritable, Text, Text, Text> { private static Text line = new Text(); private static Text nullString = new Text(""); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //直接讀取一行的數據做爲key line = value; //寫入key和value context.write(line, nullString); } } /* * Recuder類 */ public static class DedupCleanReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //寫入key和空value,重複的key覆蓋 context.write(key, new Text("")); } } public static void main(String[] args) throws Exception { final String FILE_IN_PATH = "hdfs://0.0.0.0:XXX/hadoop/dedupclean/input/"; final String FILE_OUT_PATH = "hdfs://0.0.0.0:XXX/hadoop/dedupclean/ouput/"; Configuration conf = new Configuration(); //刪除已經存在的輸出目錄 FileSystem fs = FileSystem.get(new URI(FILE_OUT_PATH), conf); if (fs.exists(new Path(FILE_OUT_PATH))) { fs.delete(new Path(FILE_OUT_PATH), true); } Job job = Job.getInstance(conf, "DedupClean"); job.setJarByClass(DedupClean.class); job.setMapperClass(DedupCleanMapper.class); job.setReducerClass(DedupCleanReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH)); FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3. 倒排索引搜索引擎
(1) 介紹
文檔是由許多的單詞組成的,其中每一個單詞也能夠在同一個文檔中重複出現屢次,固然,同一個單詞也能夠在不一樣的文檔中。
正排索引(forward index):從文檔角度看其中的單詞,標識每一個文檔(用文檔ID標識)都含有哪些單詞,以及每一個單詞出現了多少次(詞頻)及出現的位置(相對於文檔首部的偏移量)。
倒排索引(inverted index):從單詞角度看文檔,標識每一個單詞分別在哪些文檔中出現(文檔ID),以及在各自的文檔中每一個單詞分別出現了多少次(詞頻)及其出現的位置(相對於該文檔首部的偏移量)。
簡單記爲:
正排索引:文檔 ——> 單詞
倒排索引:單詞 ——> 文檔
應用場景:好比搜索引擎、大規模數據庫索引、文檔檢索、信息檢索領域等,總之,倒排索引在檢索領域是很重要的一種索引機制。
(2) 輸入輸出及原理圖
輸入數據:
a.txt內容
hello you hello
b.txt內容
hello hans
輸出結構:
hans b.txt:1 hello b.txt:1;a.txt:2 you a.txt:1
具體的原理實現示意圖以下圖所示:
(3) 代碼實現及分析
import java.io.IOException; import java.net.URI; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InvertedIndex { /* * Mapper類 * * 輸出<word:filename, value>格式,如<hello:a.txt, 1> * <hello:a.txt, 1> * <hello:b.txt, 1> */ public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //獲取文件名 //文件路徑:hdfs://10.20.14.47:8020/hadoop/invertedindex/input/a.txt (split.getPath()方法) FileSplit split = (FileSplit)context.getInputSplit(); //fileName:a.txt String fileName = StringUtil.getShortPath(split.getPath().toString()); //以<word:filename, value>形式存儲 (便於Combiner中統計統一文件中相同單詞數量) StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()) { String word = st.nextToken().toLowerCase(); word = word + ":" + fileName; context.write(new Text(word), new Text("1")); } } } /* * Conbiner類 * * 輸入<word:filename, value>格式,如<hello:a.txt, 1> * <hello:a.txt, 1> * <hello:b.txt, 1> * * 輸出<word, filename:values>格式,如<hello, a.txt:2> * <hello, b.txt:1> */ public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { long sum = 0; //統計同一個單詞在同一個文件中的次數 for(Text val : values) { sum += Integer.valueOf(val.toString()); } //將key(hello:a.txt) 分割爲newKey(hello)和fileKey(a.txt) String newKey = StringUtil.getSplitByIndex(key.toString(), ":", 0); String fileKey = StringUtil.getSplitByIndex(key.toString(), ":", 1); context.write(new Text(newKey), new Text(fileKey + ":" + String.valueOf(sum))); } } /* * Recuder類 * * 輸入<word, filename:values>格式,如<hello, a.txt:2> * <hello, b.txt:1> * * 輸出<word, filename1:values;filename2:values>格式,如<hello, a.txt:2;b.txt:1> */ public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); //聚合同一單詞出如今的文件及出現次數 for(Text val : values) { sb.append(val.toString() + ";"); } context.write(key, new Text(sb.toString())); } } //指定輸入輸出路徑 private static final String FILE_IN_PATH = "hdfs://0.0.0.0:xxx/hadoop/invertedindex/input"; private static final String FILE_OUT_PATH = "hdfs://0.0.0.0:xxx/hadoop/invertedindex/output"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //刪除已經存在的輸出路徑 FileSystem fs = FileSystem.get(new URI(FILE_OUT_PATH), conf); if (fs.exists(new Path(FILE_OUT_PATH))) { fs.delete(new Path(FILE_OUT_PATH), true); } Job job = Job.getInstance(conf, "InvertedIndex"); job.setJarByClass(InvertedIndex.class); job.setMapperClass(InvertedIndexMapper.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH)); FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); } } /* * 工具類 * 獲取文件路徑 */ class StringUtil { /* * 獲取文件路徑名 */ public static String getShortPath(String filePath) { if (filePath.length() == 0) { return filePath; } return filePath.substring(filePath.lastIndexOf("/") + 1); } /* * 根據regex分割str,並返回index位置的值 */ public static String getSplitByIndex(String str, String regex, int index) { String[] splits = str.split(regex); if (splits.length < index) { return ""; } return splits[index]; } }