本文源碼:GitHub·點這裏 || GitEE·點這裏java
Hadoop核心組件之一:分佈式計算的方案MapReduce,是一種編程模型,用於大規模數據集的並行運算,其中Map(映射)和Reduce(歸約)。git
MapReduce既是一個編程模型,也是一個計算組件,處理的過程分爲兩個階段,Map階段:負責把任務分解爲多個小任務,Reduce負責把多個小任務的處理結果進行彙總。其中Map階段主要輸入是一對Key-Value,通過map計算後輸出一對Key-Value值;而後將相同Key合併,造成Key-Value集合;再將這個Key-Value集合轉入Reduce階段,通過計算輸出最終Key-Value結果集。github
MapReduce能夠實現基於上千臺服務器併發工做,提供很強大的數據處理能力,若是其中單臺服務掛掉,計算任務會自動轉義到另外節點執行,保證高容錯性;可是MapReduce不適應於實時計算與流式計算,計算的數據是靜態的。算法
數據文件通常以CSV格式居多,數據行一般以空格分隔,這裏須要考慮數據內容特色;spring
文件通過切片分配在不一樣的MapTask任務中併發執行;apache
MapTask任務執行完畢以後,執行ReduceTask任務,依賴Map階段的數據;編程
ReduceTask任務執行完畢後,輸出文件結果。設計模式
hadoop: # 讀取的文件源 inputPath: hdfs://hop01:9000/hopdir/javaNew.txt # 該路徑必須是程序運行前不存在的 outputPath: /wordOut
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text mapKey = new Text(); IntWritable mapValue = new IntWritable(1); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 一、讀取行 String line = value.toString(); // 二、行內容切割,根據文件中分隔符 String[] words = line.split(" "); // 三、存儲 for (String word : words) { mapKey.set(word); context.write(mapKey, mapValue); } } }
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum ; IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 一、累加求和統計 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 二、輸出結果 value.set(sum); context.write(key,value); } }
@RestController public class WordWeb { @Resource private MapReduceConfig mapReduceConfig ; @GetMapping("/getWord") public String getWord () throws IOException, ClassNotFoundException, InterruptedException { // 聲明配置 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() ); Job job = Job.getInstance(hadoopConfig); // Job執行做業 輸入路徑 FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath())); // Job執行做業 輸出路徑 FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath())); // 自定義 Mapper和Reducer 兩個階段的任務處理類 job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); // 設置輸出結果的Key和Value的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //執行Job直到完成 job.waitForCompletion(true); return "success" ; } }
將應用程序打包放到hop01服務上執行;服務器
java -jar map-reduce-case01.jar
Java數據類型與對應的Hadoop數據序列化類型;網絡
Java類型 | Writable類型 | Java類型 | Writable類型 |
---|---|---|---|
String | Text | float | FloatWritable |
int | IntWritable | long | LongWritable |
boolean | BooleanWritable | double | DoubleWritable |
byte | ByteWritable | array | DoubleWritable |
map | MapWritable |
Mapper模塊:處理輸入的數據,業務邏輯在map()方法中完成,輸出的數據也是KV格式;
Reducer模塊:處理Map程序輸出的KV數據,業務邏輯在reduce()方法中;
Driver模塊:將程序提交到yarn進行調度,提交封裝了運行參數的job對象;
序列化:將內存中對象轉換爲二進制的字節序列,能夠經過輸出流持久化存儲或者網絡傳輸;
反序列化:接收輸入字節流或者讀取磁盤持久化的數據,加載到內存的對象過程;
Hadoop序列化相關接口:Writable實現的序列化機制、Comparable管理Key的排序問題;
案例描述:讀取文件,並對文件相同的行作數據累加計算,輸出計算結果;該案例演示在本地執行,不把Jar包上傳的hadoop服務器,驅動配置一致。
實體對象屬性
public class AddEntity implements Writable { private long addNum01; private long addNum02; private long resNum; // 構造方法 public AddEntity() { super(); } public AddEntity(long addNum01, long addNum02) { super(); this.addNum01 = addNum01; this.addNum02 = addNum02; this.resNum = addNum01 + addNum02; } // 序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(addNum01); dataOutput.writeLong(addNum02); dataOutput.writeLong(resNum); } // 反序列化 @Override public void readFields(DataInput dataInput) throws IOException { // 注意:反序列化順序和寫序列化順序一致 this.addNum01 = dataInput.readLong(); this.addNum02 = dataInput.readLong(); this.resNum = dataInput.readLong(); } // 省略Get和Set方法 }
Mapper機制
public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> { Text myKey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 讀取行 String line = value.toString(); // 行內容切割 String[] lineArr = line.split(","); // 內容格式處理 String lineNum = lineArr[0]; long addNum01 = Long.parseLong(lineArr[1]); long addNum02 = Long.parseLong(lineArr[2]); myKey.set(lineNum); AddEntity myValue = new AddEntity(addNum01,addNum02); // 輸出 context.write(myKey, myValue); } }
Reducer機制
public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> { @Override protected void reduce(Text key, Iterable<AddEntity> values, Context context) throws IOException, InterruptedException { long addNum01Sum = 0; long addNum02Sum = 0; // 處理Key相同 for (AddEntity addEntity : values) { addNum01Sum += addEntity.getAddNum01(); addNum02Sum += addEntity.getAddNum02(); } // 最終輸出 AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum); context.write(key, addRes); } }
案例最終結果:
GitHub·地址 https://github.com/cicadasmile/big-data-parent GitEE·地址 https://gitee.com/cicadasmile/big-data-parent
推薦閱讀:編程體系整理
序號 | 項目名稱 | GitHub地址 | GitEE地址 | 推薦指數 |
---|---|---|---|---|
01 | Java描述設計模式,算法,數據結構 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆☆ |
02 | Java基礎、併發、面向對象、Web開發 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆ |
03 | SpringCloud微服務基礎組件案例詳解 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆ |
04 | SpringCloud微服務架構實戰綜合案例 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆☆ |
05 | SpringBoot框架基礎應用入門到進階 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆ |
06 | SpringBoot框架整合開發經常使用中間件 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆☆ |
07 | 數據管理、分佈式、架構設計基礎案例 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆☆ |
08 | 大數據系列、存儲、組件、計算等框架 | GitHub·點這裏 | GitEE·點這裏 | ☆☆☆☆☆ |