求和:1+3+5+8+2+7+3+4+9+...+Integer.MAX_VALUE。java
這是一個簡單的加法,若是這道題單臺機器線性執行的話,能夠想一想這個時間的消耗有多大,若是咱們換一種思惟來進行計算那麼這個時間就能夠減小不少,將整個加法分紅若干個段進行相加,最後將這些結果段再進行相加。這樣就能夠實行分佈式的計算。apache
上述的方法的思想就是:分而治之,而後彙總。windows
MapReduce是一種分佈式計算模型,由Google提出,主要用於搜索領域,解決海量數據的計算問題。bash
Apache對其作了開源實現,整合在hadoop中實現通用分佈式數據計算。併發
MR由兩個階段組成:Map和Reduce,用戶只須要實現map()和reduce()兩個函數,便可實現分佈式計算,很是簡單。大大簡化了分佈式併發處理程序的開發。app
Map階段就是進行分段處理。框架
Reduce階段就是進行彙總處理。彙總以後還能夠進行數據的一系列美化操做,而後再輸出。eclipse
MapReduce原理圖: 此圖借鑑的網上的。具體出處如圖上的地址。分佈式
①WcMapperide
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 1.獲得行 String line = v1.toString(); // 2.切行爲單詞 String[] wds = line.split(" "); // 3.輸出單詞和數量,即k二、v2 for (String w : wds) { context.write(new Text(w), new IntWritable(1)); } } }
②WcReduce
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text k3, Iterable<IntWritable> v3s, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 1.獲取單詞 String word = k3.toString(); // 2.遍歷v3s,累計數量 int count = 0; Iterator<IntWritable> it = v3s.iterator(); while (it.hasNext()) { count += it.next().get(); } // 3.輸出結果 context.write(new Text(word), new IntWritable(count)); } }
③WcDerver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WcDriver { public static void main(String[] args) throws Exception { // 1.聲明一個做業 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.聲明做業的入口 job.setJarByClass(WcDriver.class); // 3.聲明Mapper job.setMapperClass(WcMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 4.聲明Reducer job.setReducerClass(WcReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 5.聲明輸入位置 FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/wcdata/words.txt")); // 6.聲明輸出位置 FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/wcresult")); // 7.啓動做業 job.waitForCompletion(true); } }
將程序成打成jar,提交到集羣中運行。
集羣搭建能夠參見:僞分佈式集羣搭建點我、徹底分佈式集羣搭建點我
如下的介紹中,我將使用k1代替mapper第一次輸入的數據key,v1表明mapper第一次輸入的數據的value值,k2表明mapper輸出數據的key,v2表明mapper輸出數據的value;k3表明reducer接收數據的key,v3表明reducer接收數據的value;
導出jar包有下面四個頁面:
右鍵項目-export:搜索jar-java-JAR file-next。
選擇要打包的項目-去掉.classpath和.project的勾選-JAR file:輸出路徑及jar包名字-next。
next。
main class:選擇主類-Finish。
hadoop jar xxx.jar
在eclipse中使用hadoop插件開發mapreduce可能遇到的問題及解決方案:
①空指針異常
本地hadoop缺乏支持包,將winutils和hadoop.dll(及其餘)放置到eclips關聯的hadoop/bin下,並將hadoop/bin配置到PATH環境變量中。若是還不行,就再放一份到c:/windows/system32下。
②不打印日誌
在mr程序下放置一個log4j.properties文件。
③null\bin\winutils.exe
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
解決方法1:
配置HADOOP_HOME環境變量,可能須要重啓電腦。
解決方法2:
若是不想配置環境變量,能夠在代碼中寫上。
System.setProperty("hadoop.home.dir", "本機hadoop地址");
④ExitCodeException
本地的Hadoop程序中hadoop-2.7.1\share\hadoop\common\hadoop-common-2.7.1.jar中NativeIO這個類加載不出來,須要將這個類從新打包。
文件邏輯切片,每個切片對應一個Mapper。
Mapper讀取輸入切片內容,每行解析成一個k一、v1對(默認狀況)。每個鍵值對調用一次map函數。執行map中的邏輯,對輸入的k一、v1處理,轉換成新的k二、v2輸出。
分配map輸出的數據到reduce其中會將k二、v2轉換爲k三、v3。
中間包括buffer、split、partition、combiner、grouping、sort、combiner等操做。
輸入shuffle獲得的k三、v3執行reduce處理獲得k四、v4,把k四、v4寫出到目的地。
MR能夠單獨運行,也能夠經由YARN分配資源運行。這裏先簡單說一下Yarn,後面會有具體講Yarn的文章更新。
YARN框架的組成:
1.0版本:JobTracker、2.0版本:ResourceManager。
1.0版本:TaskTracker 、2.0版本:NodeManager。
Mapper、Reducer。
客戶端提交一個mr的jar包給JobClient。
提交方式爲執行Hadoop的提交命令:
hadoop jar [jar包名]
JobClient經過RPC和ResourceManager進行通訊,代表要發起一個做業,ResourceManager返回一個存放jar包的地址(HDFS)和jobId。
client將jar包和相關配置信息寫入到HDFS指定的位置。
path=hdfs上的地址+jobId。
看成業資源上傳完畢以後,Client聯繫ResourceManager提交做業任務。此處提交的做業任務,只是任務的描述信息,不是jar包。
任務描述包括:jobid,jar存放的位置,配置信息等等。
ResourceManager獲得Client提交的做業任務信息,會根據信息進行做業初始化,建立做業對象。
建立好做業對象以後,ResourceManager讀取HDFS上的要處理的文件,開始計算輸入分片split,規劃出Mapper和Reducer的數量,規劃分配任務方案,一般採用本地化策略將任務分配給NodeManager。ResourceManager不會主動聯繫NodeManager,而是等待NodeManager心跳報告。
本地化任務策略:數據在那個節點上存儲,就將任務交給那個節點。
NodeManager經過心跳機制領取任務。這裏領取的只是任務的描述信息(即數據的元數據)。經過任務描述信息,NodeManager訪問hdfs獲取所需的jar,配置文件等。準備進行任務工做。
當準備任務完成以後,NodeManager會啓動一個單獨的java child子進程:worker進程,讓worker進程來執行具體的任務。Worker中運行指定的Mapper或Reducer,最終將結果寫入到HDFS當中。
這裏另外啓動一個進程來執行具體的任務,其實能夠算是NodeManager的一個自保機制,由於Mapper和Reducer的代碼是工程師編寫的,這裏面避免不了會存在致使線程崩潰的代碼,或者意外狀況,致使線程中斷。這樣作能夠保護NodeManager一直處於正常工做狀態,不會由於執行Mapper和Reducer代碼致使NodeManager死亡。NodeManager還有重啓任務的機制,保證在乎外狀況下致使Mapper和Reducer執行中斷,能夠完成任務。
整個過程傳遞的是代碼,而不是數據。即數據在哪裏,就讓運算髮生在哪裏,減小對數據的移動,提升效率。
因爲集羣工做過程當中須要用到RPC操做,因此想要MR處理的對象的類必須能夠進行序列化/反序列化操做。
Hadoop並無使用Java原生的序列化,它的底層實際上是經過AVRO實現序列化/反序列化,而且在其基礎上提供了便捷API。
以前用到的Text、LongWritable、IntWritable……其實都是在原有類型上包裝了一下,增長了AVRO序列化、反序列化的能力。
咱們也可使用本身定義的類型來做爲MR的kv使用,要求是必須也去實現AVRO序列化反序列化。
用於整個做業的管理。
①重要方法
1)getInstance(Configuration conf,String Jobname);
獲取job對象。
2)setJarByClass(class<?> cal);
設置程序入口。
3)setMapperClass(class<?> cal);
設置Mapper類。
4)setMapOutputKeyClass(class<?> cal);
設置Mapper類輸出的key值的類型。
5)setMapOutputValueClass(class<?> cal);
設置Mapper類輸出的value值類型。
6)setReducerClass(class<?> cal);
設置Reducer類。
7)setOutputKeyClass(class<?> cal);
設置Reducer類輸出的key值類型,若是Mapper類和Reducer類的輸出key值同樣,能夠只設置這一個。
8)setOutputValueClass(class<?> cal);
設置Reducer類輸出的value值類型,若是Mapper類和Reducer類的輸出value值類型型同樣,能夠只設置這一個。
9)waitForCompletion(boolean fg);
開啓job任務。true開啓,false關閉。
序列化標識接口,須要實現裏面的write()和readFileds()兩個方法。
①重要方法
1)write(DataOutput out);
此方法用於序列化,屬性的序列化順序要和反序列化順序一致。
2)readFields(DataInput in);
此方法是用於反序列化的方法,屬性的反序列化順序要和序列化順序一致。
此接口用於序列化和排序的標識接口。WritableComparable = Writable + Comparable。
①重要方法
1)write(DataOutput out);
2)readFields(DataInput in);
3)compareTo();
此方法用來實現排序比較的,java基礎有講過。返回負數代表調用此方法的對象小,返回0代表兩個對象相等,返回整數代表調用此方法的對象大。
若是對象只是用做k一、k4或value則只實現Writable接口便可。
若是對象用做k二、k3則類除了實現Writable接口外還要實現Comparable接口,也能夠直接實現WritableComparable效果是相同的。
統計流量(文件:flow.txt)自定義對象做爲keyvalue。
文件樣例:
13877779999 bj zs 2145 13766668888 sh ls 1028 13766668888 sh ls 9987 13877779999 bj zs 5678 13544445555 sz ww 10577 13877779999 sh zs 2145 13766668888 sh ls 9987
寫一個Bean實現Writable接口,實現其中的write和readFields方法,注意這兩個方法中屬性處理的順序和類型。
public class FlowBean implements Writable { private String phone; private String addr; private String name; private long flow; public FlowBean() {} public FlowBean(String phone, String addr, String name, long flow) { super(); this.phone = phone; this.addr = addr; this.name = name; this.flow = flow; } //對應的get/set方法,這裏省略 //對應的toString() @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeUTF(addr); out.writeUTF(name); out.writeLong(flow); } @Override public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.addr = in.readUTF(); this.name = in.readUTF(); this.flow = in.readLong(); } }
編寫完成以後,這個類的對象就能夠用於MR了。
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.獲取行,按照空格切分 String line = value.toString(); String attr[] = line.split(" "); // 2.獲取其中的手機號,做爲k2 String phone = attr[0]; // 3.封裝其餘信息爲FlowBean,做爲v2 FlowBean fb = new FlowBean(attr[0], attr[1], attr[2], Long.parseLong(attr[3])); // 4.發送數據。 context.write(new Text(phone), fb); } }
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { public void reduce(Text k3, Iterable<FlowBean> v3s, Context context) throws IOException, InterruptedException { // 1.經過k3獲取手機號 String phone = k3.toString(); // 2.遍歷v3s累計流量 FlowBean fb = new FlowBean(); Iterator<FlowBean> it = v3s.iterator(); while (it.hasNext()) { FlowBean nfb = it.next(); fb.setAddr(nfb.getAddr()); fb.setName(nfb.getName()); fb.setPhone(nfb.getPhone()); fb.setFlow(fb.getFlow() + nfb.getFlow()); } // 3.輸出結果 context.write(new Text(phone), fb); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowDriver { public static void main(String[] args) throws Exception { // 1.建立做業對象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); // 2.設置入口類 job.setJarByClass(FlowDriver.class); // 3.設置mapper類 job.setMapperClass(FlowMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 4.設置Reducer類 job.setReducerClass(cn.tedu.flow.FlowReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5.設置輸入位置 FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/flowdata")); // 6.設置輸出位置 FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/flowresult")); // 7.啓動做業 if (!job.waitForCompletion(true)) return; } }
Map執行事後,在數據進入reduce操做以前,數據將會按照K3進行排序,利用這個特性能夠實現大數據場景下排序的需求。
計算利潤,進行排序(文件:profit.txt)。
數據樣例:
1 ls 2850 100 2 ls 3566 200 3 ls 4555 323 1 zs 19000 2000 2 zs 28599 3900 3 zs 34567 5000 1 ww 355 10 2 ww 555 222 3 ww 667 192
此案例,須要兩個MR操做,合併數據、進行排序。
在真實開發場景中 對於複雜的業務場景,常常須要連續運行多個MR來進行處理。
①ProfitMapper
public class ProfitMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.獲取行,按照空格切分 String line=value.toString(); String attr[]=line.split(" "); // 2.獲取人名做爲k2 String name=attr[1]; // 3.獲取當月收入和支出計算利潤 int sum=Integer.parseInt(attr[2])-Integer.parseInt(attr[3]); // 4.輸出數據 context.write(new Text(name), new IntWritable(sum)); } }
②ProfitReducer
public class ProfitReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text k3, Iterable<IntWritable> v3s, Context context) throws IOException, InterruptedException { // 1.經過k3獲取人名 String name = k3.toString(); // 2.遍歷v3累計利潤 Iterator<IntWritable> it = v3s.iterator(); int cprofit = 0; while (it.hasNext()) { cprofit += it.next().get(); } // 3.輸出數據 context.write(new Text(name), new IntWritable(cprofit)); } }
③ProfitDriver
public class ProfitDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "profit_job"); job.setJarByClass(ProfitDriver.class); job.setMapperClass(ProfitMapper.class); job.setReducerClass(ProfitReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/pdata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/presult")); if (!job.waitForCompletion(true)) return; } }
④ProfitBean
建立Bean對象實現WritableComparable接口實現其中的write readFields compareTo方法,在Map操做時,將Bean對象做爲Key輸出,從而在Reduce接受到數據時已經通過排序,而Reduce操做時,只需原樣輸出數據便可。
public class ProfitBean implements WritableComparable<ProfitBean> { private String name; private int profit; public ProfitBean() { } public ProfitBean(String name, int profit) { this.name = name; this.profit = profit; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getProfit() { return profit; } public void setProfit(int profit) { this.profit = profit; } @Override public String toString() { return "ProfitBean [name=" + name + ", profit=" + profit + "]"; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(profit); } @Override public void readFields(DataInput in) throws IOException { this.name=in.readUTF(); this.profit=in.readInt(); } @Override public int compareTo(ProfitBean profit) { return this.profit-profit.getProfit()<=0?1:-1; } }
⑤ProfitSortMapper
public class ProfitSortMapper extends Mapper<LongWritable, Text, ProfitBean, NullWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String attr[]=line.split("\t"); ProfitBean pb=new ProfitBean(attr[0],Integer.parseInt(attr[1])); context.write(pb, NullWritable.get()); } }
⑥ProfitSortReducer
public class ProfitSortReducer extends Reducer<ProfitBean, NullWritable, Text, IntWritable> { public void reduce(ProfitBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String name=key.getName(); int profit=key.getProfit(); context.write(new Text(name), new IntWritable(profit)); } }
此案例中也能夠沒有Reducer,MapReduce中能夠只有Map沒有Reducer,若是不配置Reduce,hadoop會自動增長一個默認Reducer,功能是原樣輸出數據。
⑦ProfitSortDriver
public class ProfitSortDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "pro_sort_job"); job.setJarByClass(ProfitSortDriver.class); job.setMapperClass(ProfitSortMapper.class); job.setMapOutputKeyClass(ProfitBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(ProfitSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/presult")); FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/psresult")); if (!job.waitForCompletion(true)) return; } }
分區操做是shuffle操做中的一個重要過程,做用就是將map的結果按照規則分發到不一樣reduce中進行處理,從而按照分區獲得多個輸出結果。
Partitioner是partitioner的基類,若是須要定製partitioner也須要繼承該類。
HashPartitioner是mapreduce的默認partitioner。計算方法是:
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
注:默認狀況下,reduceTask數量爲1。
不少時候MR自帶的分區規則並不能知足咱們需求,爲了實現特定的效果,能夠須要本身來定義分區規則。
改造如上統計流量案例,根據不一樣地區分區存放數據。
開發Partitioner代碼,寫一個類實現Partitioner接口,在其中描述分區規則。
①FlowBean
public class FlowBean implements Writable{ private String phone; private String addr; private String name; private long flow; //……無參、有參構造…… //……get/set…… //……toString()…… //……read/write…… }
②FlowMapper
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { String line=value.toString(); String attr[]=line.split(" "); String phone=attr[0]; FlowBean fb=new FlowBean(attr[0], attr[1], attr[2], Integer.parseInt(attr[3])); context.write(new Text(phone), fb); } }
③FlowReducer
public class FlowReducer extends Reducer<Text, FlowBean, Text, NullWritable> { @Override protected void reduce(Text k3, Iterable<FlowBean> v3s, Reducer<Text, FlowBean, Text, NullWritable>.Context context) throws IOException, InterruptedException { Iterator<FlowBean> it=v3s.iterator(); FlowBean fb=new FlowBean(); while(it.hasNext()){ FlowBean nextFb=it.next(); fb.setAddr(nextFb.getAddr()); fb.setName(nextFb.getName()); fb.setPhone(nextFb.getPhone()); fb.setFlow(fb.getFlow()+nextFb.getFlow()); } Text t=new Text(fb.getName()+" "+fb.getPhone()+" "+fb.getAddr()+" "+fb.getFlow()); context.write(t, NullWritable.get()); } }
④FlowCityPartitioner
public class FlowCityPartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text k2, FlowBean v2, int num) { // 1.獲取流量所屬地區信息 String addr = v2.getAddr(); // 2.根據地區返回不一樣分區編號 實現 不一樣Reducer處理不一樣 地區數據的效果 switch (addr) { case "bj": return 0; case "sh": return 1; case "sz": return 2; default: return 3; } } }
⑤FlowDriver
public class FlowDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Flow_Addr_Job"); job.setJarByClass(cn.tedu.flow2.FlowDriver.class); job.setMapperClass(cn.tedu.flow2.FlowMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setReducerClass(cn.tedu.flow2.FlowReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //--設置Reducer的數量 默認爲1 job.setNumReduceTasks(4); //--設置當前 job的Partitioner實現根據城市分配數據 job.setPartitionerClass(FlowCityPartitioner.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/f2data")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/f2result")); if (!job.waitForCompletion(true)) return; } }
Partitioner將會將數據發往不一樣reducer,這就要求reducer的數量應該大於等於Partitioner可能的結果的數量,若是少於則在執行的過程當中會報錯。
每個MapperTask可能會產生大量的輸出,combiner的做用就是在MapperTask端對輸出先作一次合併,以減小傳輸到reducerTask的數據量。
combiner是實如今Mapper端進行key的歸併,combiner具備相似本地的reduce功能。
若是不用combiner,那麼,全部的結果都是reduce完成,效率會相對低下。使用combiner,先完成在Mapper的本地聚合,從而提高速度。
job.setCombinerClass(WCReducer.class);
改造WordCount案例,增長Combiner,從而提升效率。
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String attr[] =line.split(" "); for(String w:attr){ context.write(new Text(w), new IntWritable(1)); } } }
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text k3, Iterable<IntWritable> v3s, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { Iterator<IntWritable> it=v3s.iterator(); int count=0; while(it.hasNext()){ count+=it.next().get(); } context.write(k3, new IntWritable(count)); } }
public class WcDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Wc_addr_Job"); job.setJarByClass(cn.tedu.wc2.WcDriver.class); job.setMapperClass(cn.tedu.wc2.WcMapper.class); job.setReducerClass(cn.tedu.wc2.WcReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //爲當前job設置Combiner job.setCombinerClass(WcReducer.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/wdata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/wresult")); if (!job.waitForCompletion(true)) return; } }
MapReduce的重點樹shuffle的過程,這個我會單獨出一篇文章進行講解。