上一篇文章分析了運營商流量日誌解析加強的實現,至此,mapreduce的組件中除了inputFormat全都自定義寫過了!博主今天將繼續分享自定義inputFormat。java
1、需求web
不管hdfs仍是mapreduce,對於小文件都有損效率,實踐中,又不免面臨處理大量小文件的場景,此時,就須要有相應解決方案。shell
2、分析apache
小文件的優化無非如下幾種方式:
一、在數據採集的時候,就將小文件或小批數據合成大文件再上傳HDFS
二、在業務處理以前,在HDFS上使用mapreduce程序對小文件進行合併
三、在mapreduce處理時,可採用combineInputFormat提升效率centos
3、實現服務器
本節實現的是上述第二種方式,程序的核心機制:自定義一個InputFormat,改寫RecordReader,實現一次讀取一個完整文件封裝爲KV,在輸出時使用SequenceFileOutPutFormat輸出合併文件.app
代碼以下:ide
WholeFileRecordReader(自定義RecordReader)oop
package com.empire.hadoop.mr.combinefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * RecordReader的核心工做邏輯: 經過nextKeyValue()方法去讀取數據構造將返回的key value 經過getCurrentKey 和 * getCurrentValue來返回上面構造好的key和value * * @author */ class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * 返回當前進度 */ @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing } }
WholeFileInputFormat(自定義InputFormat)大數據
package com.empire.hadoop.mr.combinefile; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { //設置每一個小文件不可分片,保證一個小文件生成一個key-value鍵值對 @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } }
SmallFilesToSequenceFileConverter(小文件合併主程序)
package com.empire.hadoop.mr.combinefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SmallFilesToSequenceFileConverter extends Configured implements Tool { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); /* System.setProperty("HADOOP_USER_NAME", "hadoop"); */ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: combinefiles <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "combine small files to sequencefile"); job.setJarByClass(SmallFilesToSequenceFileConverter.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args); System.exit(exitCode); } }
4、運行程序
#上傳jar Alt+p lcd d:/ put combinefile.jar #準備hadoop處理的數據文件 cd /home/hadoop/apps/hadoop-2.9.1 hadoop fs -mkdir -p /combinefile/smallinput hdfs dfs -put xxx.txt xxx1.txt xxx2.txt xxx3.txt /combinefile/smallinput #運行combinefile程序 hadoop jar combinefile.jar com.empire.hadoop.mr.combinefile.SmallFilesToSequenceFileConverter /combinefile/smallinput /combinefile/smalloutput
5、運行效果
[hadoop@centos-aaron-h1 ~]$ hadoop jar combinefile.jar com.empire.hadoop.mr.combinefile.SmallFilesToSequenceFileConverter /combinefile/smallinput /combinefile/smalloutput 18/12/30 07:45:25 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032 18/12/30 07:45:26 INFO input.FileInputFormat: Total input files to process : 3 18/12/30 07:45:26 INFO mapreduce.JobSubmitter: number of splits:3 18/12/30 07:45:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 18/12/30 07:45:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546126874346_0001 18/12/30 07:45:27 INFO impl.YarnClientImpl: Submitted application application_1546126874346_0001 18/12/30 07:45:27 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1546126874346_0001/ 18/12/30 07:45:27 INFO mapreduce.Job: Running job: job_1546126874346_0001 18/12/30 07:45:36 INFO mapreduce.Job: Job job_1546126874346_0001 running in uber mode : false 18/12/30 07:45:36 INFO mapreduce.Job: map 0% reduce 0% 18/12/30 07:45:45 INFO mapreduce.Job: map 33% reduce 0% 18/12/30 07:45:57 INFO mapreduce.Job: map 100% reduce 0% 18/12/30 07:45:58 INFO mapreduce.Job: map 100% reduce 100% 18/12/30 07:45:59 INFO mapreduce.Job: Job job_1546126874346_0001 completed successfully 18/12/30 07:45:59 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=74230 FILE: Number of bytes written=938941 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=74393 HDFS: Number of bytes written=74324 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Killed map tasks=1 Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=41278 Total time spent by all reduces in occupied slots (ms)=10813 Total time spent by all map tasks (ms)=41278 Total time spent by all reduce tasks (ms)=10813 Total vcore-milliseconds taken by all map tasks=41278 Total vcore-milliseconds taken by all reduce tasks=10813 Total megabyte-milliseconds taken by all map tasks=42268672 Total megabyte-milliseconds taken by all reduce tasks=11072512 Map-Reduce Framework Map input records=3 Map output records=3 Map output bytes=74213 Map output materialized bytes=74242 Input split bytes=371 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=74242 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=808 CPU time spent (ms)=8390 Physical memory (bytes) snapshot=754876416 Virtual memory (bytes) snapshot=3381936128 Total committed heap usage (bytes)=380628992 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=74022 File Output Format Counters Bytes Written=74324 [hadoop@centos-aaron-h1 ~]$
6、運行結果
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /combinefile/smalloutput Found 2 items -rw-r--r-- 2 hadoop supergroup 0 2018-12-30 07:45 /combinefile/smalloutput/_SUCCESS -rw-r--r-- 2 hadoop supergroup 74324 2018-12-30 07:45 /combinefile/smalloutput/part-r-00000 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /combinefile/smalloutput/part-r-00000 bbs ttx tttt yyyy hhahh shame jime apple kelly chelly Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. hellow hello hellowx hellowbbb mail shell bananer vival value bbs ttx tttt yyyy hhahh shame jime apple kelly chelly Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. hellow hello hellowx hellowbbb mail shell bananer vival value bbs ttx tttt yyyy hhahh shame jime apple kelly chelly Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services.Dow Translation provides instant and free Chinese, English, Japanese, Korean, French, German, Russian, Spanish, Portuguese, Vietnamese, Indonesian full-text translation, web translation and document translation services. [hadoop@centos-aaron-h1 ~]$
7、總結
經過mapreduce程序運行結果,能夠看到三個文件最後處理合併成了一個文件。在以上主程序中實現了extends Configured implements Tool ,可使用這種方式來執行咱們的mr程序。
最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。