package com.empire.hadoop.mr.combinefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * RecordReader的核心工做邏輯: 經過nextKeyValue()方法去讀取數據構造將返回的key value 經過getCurrentKey 和 * getCurrentValue來返回上面構造好的key和value * * @author */ class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * 返回當前進度 */ @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing } }
package com.empire.hadoop.mr.combinefile; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { //設置每一個小文件不可分片,保證一個小文件生成一個key-value鍵值對 @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } }
package com.empire.hadoop.mr.combinefile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SmallFilesToSequenceFileConverter extends Configured implements Tool { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); /* System.setProperty("HADOOP_USER_NAME", "hadoop"); */ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: combinefiles <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "combine small files to sequencefile"); job.setJarByClass(SmallFilesToSequenceFileConverter.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args); System.exit(exitCode); } }
#上傳jar Alt+p lcd d:/ put combinefile.jar #準備hadoop處理的數據文件 cd /home/hadoop/apps/hadoop-2.9.1 hadoop fs -mkdir -p /combinefile/smallinput hdfs dfs -put xxx.txt xxx1.txt xxx2.txt xxx3.txt /combinefile/smallinput #運行combinefile程序 hadoop jar combinefile.jar com.empire.hadoop.mr.combinefile.SmallFilesToSequenceFileConverter /combinefile/smallinput /combinefile/smalloutput
[hadoop@centos-aaron-h1 ~]$ hadoop jar combinefile.jar com.empire.hadoop.mr.combinefile.SmallFilesToSequenceFileConverter /combinefile/smallinput /combinefile/smalloutput 18/12/30 07:45:25 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/ 18/12/30 07:45:26 INFO input.FileInputFormat: Total input files to process : 3 18/12/30 07:45:26 INFO mapreduce.JobSubmitter: number of splits:3 18/12/30 07:45:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 18/12/30 07:45:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546126874346_0001 18/12/30 07:45:27 INFO impl.YarnClientImpl: Submitted application application_1546126874346_0001 18/12/30 07:45:27 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1546126874346_0001/ 18/12/30 07:45:27 INFO mapreduce.Job: Running job: job_1546126874346_0001 18/12/30 07:45:36 INFO mapreduce.Job: Job job_1546126874346_0001 running in uber mode : false 18/12/30 07:45:36 INFO mapreduce.Job: map 0% reduce 0% 18/12/30 07:45:45 INFO mapreduce.Job: map 33% reduce 0% 18/12/30 07:45:57 INFO mapreduce.Job: map 100% reduce 0% 18/12/30 07:45:58 INFO mapreduce.Job: map 100% reduce 100% 18/12/30 07:45:59 INFO mapreduce.Job: Job job_1546126874346_0001 completed successfully 18/12/30 07:45:59 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=74230 FILE: Number of bytes written=938941 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=74393 HDFS: Number of bytes written=74324 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Killed map tasks=1 Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=41278 Total time spent by all reduces in occupied slots (ms)=10813 Total time spent by all map tasks (ms)=41278 Total time spent by all reduce tasks (ms)=10813 Total vcore-milliseconds taken by all map tasks=41278 Total vcore-milliseconds taken by all reduce tasks=10813 Total megabyte-milliseconds taken by all map tasks=42268672 Total megabyte-milliseconds taken by all reduce tasks=11072512 Map-Reduce Framework Map input records=3 Map output records=3 Map output bytes=74213 Map output materialized bytes=74242 Input split bytes=371 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=74242 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=808 CPU time spent (ms)=8390 Physical memory (bytes) snapshot=754876416 Virtual memory (bytes) snapshot=3381936128 Total committed heap usage (bytes)=380628992 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=74022 File Output Format Counters Bytes Written=74324 [hadoop@centos-aaron-h1 ~]$
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /combinefile/smalloutput
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2018-12-30 07:45 /combinefile/smalloutput/_SUCCESS
-rw-r--r--   2 hadoop supergroup      74324 2018-12-30 07:45 /combinefile/smalloutput/part-r-00000
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /combinefile/smalloutput/part-r-00000
bbs ttx tttt yyyy hhahh shame jime apple kelly chelly hellow hello hellowx hellowbbb mail shell bananer vival value
bbs ttx tttt yyyy hhahh shame jime apple kelly chelly hellow hello hellowx hellowbbb mail shell bananer vival value
bbs ttx tttt yyyy hhahh shame jime apple kelly chelly [hadoop@centos-aaron-h1 ~]$
經過mapreduce程序運行結果,能夠看到三個文件最後處理合併成了一個文件。在以上主程序中實現了extends Configured implements Tool ,可使用這種方式來執行咱們的mr程序。