上一篇博客博主分享了mapreduce的並行原理,本篇博客將繼續分享yarn客戶端提交job的流程和自定義partition編程。java
1、yarn客戶端提交job的流程apache
2、自定義partition編程編程
FlowBean(輸出結果類)centos
package com.empire.hadoop.mr.provinceflow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable { private long upFlow; private long dFlow; private long sumFlow; //反序列化時,須要反射調用空參構造函數,因此要顯示定義一個 public FlowBean() { } public FlowBean(long upFlow, long dFlow) { this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 序列化方法 */ public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意:反序列化的順序跟序列化的順序徹底一致 */ public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dFlow = in.readLong(); sumFlow = in.readLong(); } public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow; } }
ProvincePartitioner (自定義分區類)服務器
package com.empire.hadoop.mr.provinceflow; import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * K2 V2 對應的是map輸出kv的類型 * * @author */ public class ProvincePartitioner extends Partitioner<Text, FlowBean> { public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>(); static { proviceDict.put("136", 0); proviceDict.put("137", 1); proviceDict.put("138", 2); proviceDict.put("139", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String prefix = key.toString().substring(0, 3); Integer provinceId = proviceDict.get(prefix); return provinceId == null ? 4 : provinceId; } }
FlowCount(mapreduce主類)app
package com.empire.hadoop.mr.provinceflow; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCount { static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //將一行內容轉成string String[] fields = line.split("\t"); //切分字段 String phoneNbr = fields[1]; //取出手機號 long upFlow = Long.parseLong(fields[fields.length - 3]); //取出上行流量下行流量 long dFlow = Long.parseLong(fields[fields.length - 2]); context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow)); } } static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { //<183323,bean1><183323,bean2><183323,bean3><183323,bean4>....... @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_dFlow = 0; //遍歷全部bean,將其中的上行流量,下行流量分別累加 for (FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_dFlow += bean.getdFlow(); } FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow); context.write(key, resultBean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resoucemanager.hostname", "mini1"); */ Job job = Job.getInstance(conf); /* job.setJar("/home/hadoop/wc.jar"); */ //指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCount.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); //指定咱們自定義的數據分區器 job.setPartitionerClass(ProvincePartitioner.class); //同時指定相應「分區」數量的reducetask job.setNumReduceTasks(5); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行 /* job.submit(); */ boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
3、運行jar包,並查看結果ide
#提交hadoop集羣運行 hadoop jar flowcount_patitioner_aaron.jar com.empire.hadoop.mr.provinceflow.FlowCount /user/hadoop/flowcount /flowcountpatitioner #查看輸出結果目錄 hdfs dfs -ls /flowcountpatitioner #瀏覽輸出結果 hdfs dfs -cat /flowcountpatitioner/part-r-00000
運行效果:函數
18/11/29 07:26:20 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032 18/11/29 07:26:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 18/11/29 07:26:22 INFO input.FileInputFormat: Total input files to process : 5 18/11/29 07:26:22 INFO mapreduce.JobSubmitter: number of splits:5 18/11/29 07:26:22 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 18/11/29 07:26:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1543447570289_0001 18/11/29 07:26:24 INFO impl.YarnClientImpl: Submitted application application_1543447570289_0001 18/11/29 07:26:24 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1543447570289_0001/ 18/11/29 07:26:24 INFO mapreduce.Job: Running job: job_1543447570289_0001 18/11/29 07:26:36 INFO mapreduce.Job: Job job_1543447570289_0001 running in uber mode : false 18/11/29 07:26:36 INFO mapreduce.Job: map 0% reduce 0% 18/11/29 07:26:45 INFO mapreduce.Job: map 20% reduce 0% 18/11/29 07:27:02 INFO mapreduce.Job: map 40% reduce 1% 18/11/29 07:27:04 INFO mapreduce.Job: map 100% reduce 1% 18/11/29 07:27:05 INFO mapreduce.Job: map 100% reduce 8% 18/11/29 07:27:06 INFO mapreduce.Job: map 100% reduce 60% 18/11/29 07:27:07 INFO mapreduce.Job: map 100% reduce 100% 18/11/29 07:27:07 INFO mapreduce.Job: Job job_1543447570289_0001 completed successfully 18/11/29 07:27:08 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=4195 FILE: Number of bytes written=1986755 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=11574 HDFS: Number of bytes written=594 HDFS: Number of read operations=30 HDFS: Number of large read operations=0 HDFS: Number of write operations=10 Job Counters Killed map tasks=1 Launched map tasks=6 Launched reduce tasks=5 Data-local map tasks=6 Total time spent by all maps in occupied slots (ms)=111307 Total time spent by all reduces in occupied slots (ms)=93581 Total time spent by all map tasks (ms)=111307 Total time spent by all reduce tasks (ms)=93581 Total vcore-milliseconds taken by all map tasks=111307 Total vcore-milliseconds taken by all reduce tasks=93581 Total megabyte-milliseconds taken by all map tasks=113978368 Total megabyte-milliseconds taken by all reduce tasks=95826944 Map-Reduce Framework Map input records=110 Map output records=110 Map output bytes=3945 Map output materialized bytes=4315 Input split bytes=624 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=4315 Reduce input records=110 Reduce output records=21 Spilled Records=220 Shuffled Maps =25 Failed Shuffles=0 Merged Map outputs=25 GC time elapsed (ms)=3300 CPU time spent (ms)=5980 Physical memory (bytes) snapshot=1349332992 Virtual memory (bytes) snapshot=8470929408 Total committed heap usage (bytes)=689782784 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=10950 File Output Format Counters Bytes Written=594
處理結果:oop
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /flowcountpatitioner Found 6 items -rw-r--r-- 2 hadoop supergroup 0 2018-11-29 07:27 /flowcountpatitioner/_SUCCESS -rw-r--r-- 2 hadoop supergroup 58 2018-11-29 07:27 /flowcountpatitioner/part-r-00000 -rw-r--r-- 2 hadoop supergroup 113 2018-11-29 07:27 /flowcountpatitioner/part-r-00001 -rw-r--r-- 2 hadoop supergroup 24 2018-11-29 07:27 /flowcountpatitioner/part-r-00002 -rw-r--r-- 2 hadoop supergroup 112 2018-11-29 07:27 /flowcountpatitioner/part-r-00003 -rw-r--r-- 2 hadoop supergroup 287 2018-11-29 07:27 /flowcountpatitioner/part-r-00004 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00000 13602846565 9690 14550 24240 13660577991 34800 3450 38250 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00001 13719199419 1200 0 1200 13726230503 12405 123405 135810 13726238888 12405 123405 135810 13760778710 600 600 1200 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00002 13826544101 1320 0 1320 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00003 13922314466 15040 18600 33640 13925057413 55290 241215 296505 13926251106 1200 0 1200 13926435656 660 7560 8220 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowcountpatitioner/part-r-00004 13480253104 900 900 1800 13502468823 36675 551745 588420 13560436666 5580 4770 10350 13560439658 10170 29460 39630 15013685858 18295 17690 35985 15920133257 15780 14680 30460 15989002119 9690 900 10590 18211575961 7635 10530 18165 18320173382 47655 12060 59715 84138413 20580 7160 27740
4、最後總結大數據
(1)默認實現分區的類:HashPatitioner(分區效果是根據key的hashcode模reducetasks的啓動數量後是幾就落到幾號分區,分區總數就等於numReduceTasks)
/** * Partition keys by their {@link Object#hashCode()}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
(2)job提交後的切片是定的,可是啓動的maptask數不必定就等於切片數;當maptask執行慢時,集羣會覺得它有問題,因而再啓動一個maptask來執行這個慢的task對於的切片,兩個一塊兒跑看誰先跑完用誰的結果;(這就是推測執行)
(3)當重寫了分區後,通常咱們要手動在代碼中設置reducetask個數爲分區數;但若是沒有設置reducetask時,默認reducetask數會啓動一個,此時是能夠正常運行的,只是只能生成一個結果文件;當設置爲大於1小於分區數的reducetask時會報錯,由於它不知道有些數據應該入哪一個區;當設置爲大於分區數的reducetask時,程序能正常運行,只是大於分區的那幾個reducetask不會收到數據,也不會產生結果。【建議由條件的小夥伴們驗證下結論】
最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。