要執行Map reduce程序,首先得安裝hadoop,hadoop安裝能夠參考hadoop安裝java
啓動hdfs和yarnspring
start-dfs.cmd start-yarn.cmd
建立待處理數據目錄:apache
hadoop fs -mkdir /wc hadoop fs -mkdir /wc/in # 查看目錄 hadoop fs -ls -R /
上傳待處理數據文件:json
hadoop fs -put file:///G:\note\bigdata\hadoop\wordcount\word1.txt /wc/in hadoop fs -put file:///G:\note\bigdata\hadoop\wordcount\word2.txt /wc/in
其中數據文件內容以下: word1.txt網絡
hello world hello hadoop
word2.txtapp
hadoop world hadoop learn
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text,Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("[map-key]:" + key + " [map-value]:" + value); StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()){ word.set(stringTokenizer.nextToken()); context.write(word,one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; StringBuffer sb = new StringBuffer(); for(IntWritable num : values){ sum += num.get(); sb.append(num); sb.append("、"); } result.set(sum); context.write(key,result); System.out.println("[reduce-key]:" + key + " [reduce-values]:" + sb.substring(0,sb.length()-1)); } } //job:http://localhost:8088/ //hdfs:http://localhost:9870/ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); configuration.set("fs.default.name", "hdfs://localhost:9000"); Job job = Job.getInstance(configuration, "WC"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); // job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inPath = new Path("/wc/in/"); Path outPath = new Path("/wc/out/"); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); System.exit(job.waitForCompletion(true) ? 0:1); } }
若是輸出目錄已經存在,能夠使用下面的命令刪除:maven
hadoop fs -rm -r /wc/out
咱們先來看一下程序的輸出:ide
[map-key]:0 [map-value]:hadoop world [map-key]:14 [map-value]:hadoop learn [map-key]:0 [map-value]:hello world [map-key]:13 [map-value]:hello hadoop [reduce-key]:hadoop [reduce-values]:一、一、1 [reduce-key]:hello [reduce-values]:一、1 [reduce-key]:learn [reduce-values]:1 [reduce-key]:world [reduce-values]:一、1
從輸出咱們能夠推測hadoop的map過程是:hadoop把待處理的文件按行拆分,每一行調用map函數,map函數的key就是每一行的起始位置,值就是這一行的值。函數
map處理以後,再按key-value的形式寫中間值。oop
reduce函數就是處理這些中間過程,參數的key就是map寫入的key,value就是,map以後按key分組的value。
再Map和Reduce中間還能夠加入Combin過程,用於處理中間結果,減小網絡間數據傳輸的數據量。
Map->Reduce->Combin
咱們把上面程序中job.setCombinerClass(IntSumReducer.class);註釋去掉就能夠獲取到有Combiner的輸出:
[map-key]:0 [map-value]:hadoop world [map-key]:14 [map-value]:hadoop learn [reduce-key]:hadoop [reduce-values]:一、1 [reduce-key]:learn [reduce-values]:1 [reduce-key]:world [reduce-values]:1 [map-key]:0 [map-value]:hello world [map-key]:13 [map-value]:hello hadoop [reduce-key]:hadoop [reduce-values]:1 [reduce-key]:hello [reduce-values]:一、1 [reduce-key]:world [reduce-values]:1 [reduce-key]:hadoop [reduce-values]:二、1 [reduce-key]:hello [reduce-values]:2 [reduce-key]:learn [reduce-values]:1 [reduce-key]:world [reduce-values]:一、1
從上面的輸出咱們能夠看到,map以後有一個reduce輸出,實際上是combin操做,combin和reduce的區別是combin是在單節點內部執行的,爲了減少中間數據。
注意:combin操做必須知足結合律,例如:
均值就不能使用combin操做: (a+b+c+d)/4 明顯不等價於 ((a+b)/2 + (c+d)/2)/2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.curitis</groupId> <artifactId>hadoop-learn</artifactId> <version>1.0.0</version> <properties> <spring.version>5.1.3.RELEASE</spring.version> <junit.version>4.11</junit.version> <hadoop.version>3.0.2</hadoop.version> <parquet.version>1.10.1</parquet.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- parquet --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-common</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-encoding</artifactId> <version>${parquet.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <!--test--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>