先舉一個簡單的例子: 打個比方咱們有三我的鬥地主, 要數數牌夠不夠, 一種最簡單的方法能夠找一我的數數是否是有54張(傳統單機計算); 還能夠三我的各分一摞牌數各自的(Map階段), 三我的的總數加起來彙總(Reduce階段).java
因此MapReduce的思想即: "分治"+"彙總". 大數據量下, 一臺機器處理不了的數據, 就用多臺機器, 以分佈式集羣的形式來處理.apache
關於Map與Reduce有不少文章將這兩個詞直譯爲映射和規約, 其實Map的思想就是各自負責一塊實行分治, Reduce的思想即: 將分治的結果彙總. 幹嗎翻譯的這麼生硬呢(故意讓人以爲大數據很神祕麼?)編程
仍是很簡單的模式: 包含8個步驟app
咱們那最簡單的單詞計數來舉例(號稱大數據的HelloWorld), 先讓你們跑起來看看現象再說.分佈式
按照MapReduce思想有兩個主要步驟, Mapper與Reducer, 剩餘的東西Hadoop都幫助咱們實現了, 先入門實踐再瞭解原理;ide
MapReducer有兩種運行模式: 1,集羣模式(生產環境);2,本地模式(試驗學習)oop
前提: 學習
1, 下載一個Hadoop的安裝包, 放到本地, 並配置到環境變量裏面;大數據
2, 下載一個hadoop.dll放到hadoop的bin目錄下翻譯
建立Maven工程, 導入依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.10.1</version> </dependency>
數據文件D:\Source\data\demo_result1\xx.txt
hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop
開始編寫代碼
第一步, 建立Mapper類
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class BaseMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(","); Text keyout = new Text(); LongWritable valueout = new LongWritable(1); for (String word : words) { keyout.set(word); context.write(keyout, valueout); } } }
第二步, 建立Reducer類
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class BaseReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int x = 0; for (LongWritable value : values) { x += value.get(); } context.write(key, new LongWritable(x)); } }
第三步, 建立Job啓動類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MainJob extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), MainJob.class.getName()); //集羣運行時候: 要打包 job.setJarByClass(MainJob.class); //1, 讀取輸入文件解析類 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in")); //2, 設置Mapper類 job.setMapperClass(BaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3, 設置shuffle階段的分區, 排序, 規約, 分組 //7, 設置Reducer類 job.setReducerClass(BaseReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //8, 設置文件輸出類以及輸出地址 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_result1")); //啓動MapReduceJob boolean completion = job.waitForCompletion(true); return completion?0:1; } public static void main(String[] args) { MainJob mainJob = new MainJob(); try { Configuration configuration = new Configuration(); configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); int run = ToolRunner.run(configuration, mainJob, args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }