既然MR是一種計算框架,那麼也存在其餘的計算框架。html
From: [Distributed ML] Yi WANG's talkjava
Mapping與Shuffling之間能夠插入」Combine「過程,但不必定都適合,好比」求平均值「。git
Ref: Java總結篇系列:Java泛型github
Ref: Word Count MapReduce Program in Hadoop 算法
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// Map function public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on spaces String[] stringArr = value.toString().split("\\s+"); for (String str : stringArr) { word.set(str);
// 每一個單詞出現1次,做爲中間結果輸出 context.write(word, new IntWritable(1)); } } } // Reduce function public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum);
// 輸出最終結果 context.write(key, result); } }
public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WC");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
// 設置輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交做業 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
其實就是 對 Yarn 的學習和理解。shell
Yarn不光能運行MapReduce程序,還能運行Spark程序等。數據庫
更多參考:MapReduce執行過程apache
原文連接:https://blog.csdn.net/qq_36951116/article/details/92435687編程
一、啓動 RunJarapp
二、啓動 MRAppMaster
三、啓動 mapper的yarnChild(運行map or reduce)
四、銷燬 mapper的yarnChild
五、啓動 reduce的yarnChild(運行map or reduce)
六、銷燬 reduce的yarnChild
七、銷燬 RunJar
八、銷燬 MRAppMaster
Ref: 實戰案例玩轉Hadoop系列11--運行Map Reduce程序
在真實的生產環境中,MAP REDUCE程序應該提交到Yarn集羣上分佈式運行,這樣才能發揮出MAP REDUCE分佈式並行計算的效果。
MAP REDUCE程序提交給Yarn執行的過程以下:
一、客戶端代碼中設置好MAP REDUCE程序運行時所要使用的Mapper類、Reducer類、程序Jar包所在路徑、Job名稱、Job輸入數據的切片信息、Configuration所配置的參數等資源,統一提交給Yarn所指定的位於HDFS上的Job資源提交路徑;
二、客戶端向Yarn中的Resource Manager請求運行Jar包中MRAppMaster進程的資源容器Container;
三、Yarn將提供Container的任務指派給某個擁有空閒資源的 Node Manager節點,Node Manager接受任務後建立資源容器(即所謂的Container);
四、客戶端向建立好容器的Node Manager發送啓動MRAppMaster進程的shell腳本命令,啓動MRAppMaster;
五、MRAppMaster啓動後,讀取 job相關配置及程序資源,向Resource Manager請求N個資源容器來啓動若干個Map Task進程和若干個Reduce Task進程,並監控這些Map Task進程和Reduce Task進程的運行狀態;
六、當整個Job的全部Map Task進程和Reduce Task進程任務處理完成後,整個Job的全部進程所有註銷,Yarn則銷燬Container,回收運算資源。
運行過程示意圖以下:
Ref: Hadoop詳解(四)——Shuffle原理,Partitioner分區原理,Combiner編程,常見的MR算法
Partitioner是shuffle的一部分。
默認規則:Hadoop有一個默認的分區規則。
手動規則:Partitioner是partitioner的基類,若是須要定製partitioner也須要繼承該類。HashPartitioner是mapreduce的默認partitioner。經過以下計算方法獲得當前的 "目的reducer"。
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
package liuxun.hadoop.mr.dc; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class DataBean implements Writable { private String tel; private long upPayLoad; private long downPayLoad; private long totalPayLoad; public DataBean() { } public DataBean(String tel, long upPayLoad, long downPayLoad) { this.tel = tel; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; this.totalPayLoad = upPayLoad + downPayLoad; } @Override public String toString() { return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad; } public void readFields(DataInput in) throws IOException { this.tel = in.readUTF(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); this.totalPayLoad = in.readLong(); } // 注意兩點:寫入的順序和寫入的類型 public void write(DataOutput out) throws IOException { out.writeUTF(tel); out.writeLong(upPayLoad); out.writeLong(downPayLoad); out.writeLong(totalPayLoad); } public String getTel() { return tel; } public void setTel(String tel) { this.tel = tel; } public long getUpPayLoad() { return upPayLoad; } public void setUpPayLoad(long upPayLoad) { this.upPayLoad = upPayLoad; } public long getDownPayLoad() { return downPayLoad; } public void setDownPayLoad(long downPayLoad) { this.downPayLoad = downPayLoad; } public long getTotalPayLoad() { return totalPayLoad; } public void setTotalPayLoad(long totalPayLoad) { this.totalPayLoad = totalPayLoad; } }
package liuxun.hadoop.mr.dc; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DataCountPartition { public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // accept String line = value.toString();
// split String[] fields = line.split("\t");
String tel= fields[1];
long up = Long.parseLong(fields[8]); long down = Long.parseLong(fields[9]); DataBean bean = new DataBean(tel, up, down);
// send context.write(new Text(tel), bean); } }
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean> { @Override protected void reduce(Text key, Iterable<DataBean> values, Context context) throws IOException, InterruptedException {
long up_sum = 0; long down_sum = 0;
for (DataBean bean : values) { up_sum += bean.getUpPayLoad(); down_sum += bean.getDownPayLoad(); } DataBean bean = new DataBean("", up_sum, down_sum); context.write(key, bean); } }
public static class ProviderPartitioner extends Partitioner<Text, DataBean> { private static Map<String, Integer> prividerMap = new HashMap<String, Integer>(); static { // 實際開發時是從數據庫加載這種映射關係的 // 1:中國移動 2:中國聯通 3:中國電信 prividerMap.put("135", 1); prividerMap.put("136", 1); prividerMap.put("137", 1); prividerMap.put("150", 2); prividerMap.put("159", 2); prividerMap.put("182", 3); prividerMap.put("183", 3); } // 此方法的返回值是分區號 // key: mapper一次輸出的key 這裏是手機號 // key: mapper一次輸出的Value 這裏是DataBean // numPartitions:分區數量,由Reducer的數量決定,啓動幾個Reducer就會有幾個partition @Override public int getPartition(Text key, DataBean value, int numPartitions) { // 根據手機號獲得運營商 此處根據key進行分區,實際開發中也能夠根據value進行分區 String account = key.toString(); String sub_acc = account.substring(0, 3); Integer code = prividerMap.get(sub_acc); if (code == null) { code = 0; } return code; } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCountPartition.class); job.setMapperClass(DCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(DCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setPartitionerClass(ProviderPartitioner.class); // 設置啓動Reducer的數量 job.setNumReduceTasks(Integer.parseInt(args[2])); job.waitForCompletion(true); } }
<k,v>中v能夠是一個類(如上),k能夠麼?固然也能夠。
key使用了類,能夠支持更爲複雜的操做,好比這裏的 "二次排序"。
因此,須要自定義BeanInfo類並實現WritableComparable接口,並重寫compareTo方法和toString方法。
每個map可能會產生大量的輸出,combiner的做用就是在map端對輸出先作一次合併,以減小傳輸到reducer的數據量。
參見連接中最後的例子:https://blog.csdn.net/u013087513/article/details/77799686
此部分能夠放在 yarn的章節一併講解。
Github: mrjob: the Python MapReduce library
文檔版本:mrjob Documentation Release 0.7.0.dev0
網頁版本:mrjob v0.7.0.dev0 documentation
/* implement */