package com.demo.admin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool { //構建map類 public static class TestMap extends Mapper<LongWritable, Text, Text, TestWritable>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ //根據$開始切割字段名 final String[] splited = value.toString().split(「\\$」); //以name爲key從0開始 final String name = splited[0]; final Text k2 = new Text(name); //phone shiqu就是第四位和第八位 final TestWritable v2=new TestWritable(splited[4], splited[8]); //name爲key值 phone和shiqu爲v2寫入 context.write(k2, v2); } } //構建reduce類 public static class TestReduce extends Reducer<Text, TestWritable, Text, TestWritable>{ public void reduce(Text k2,Iterable<TestWritable> v2s,Context context) throws IOException, InterruptedException{ String phone; String shiqu; //循環全部的key值和values值 for(TestWritable testWritable:v2s){ phone=testWritable.phone; shiqu=testWritable.shiqu; TestWritable v3=new TestWritable(phone, shiqu); context.write(k2, v3); } } } //main方法啓動 public static void main(String [] args) throws IOException, Exception{ ToolRunner.run(new Test(), args); } @SuppressWarnings(「deprecation」) public int run(String[] args) throws Exception { Configuration conf=new Configuration(); String[]argArray=new GenericOptionsParser(conf, args).getRemainingArgs(); if(argArray.length!=2){ System.out.println(「請提供兩個參數」); System.exit(1); } Job job=Job.getInstance(conf, 「Test」); FileSystem fs = FileSystem.get(new URI(args[1]), conf); fs.delete(new Path(args[1])); job.setJarByClass(Test.class); job.setMapperClass(TestMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TestWritable.class); job.setReducerClass(TestReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TestWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); return 0; } static class TestWritable implements Writable{ String phone; String shiqu; public TestWritable(String phone,String shiqu){ this.phone=phone; this.shiqu=shiqu; } //無參構造方法public class UserBean implements Writable //這個應該是在自定義writable的時候須要注意,反射過程當中須要調用無參構造。 public TestWritable(){} public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.shiqu=in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeUTF(shiqu); } public String toString() { return phone + 「\t」 + shiqu + 「\t」; } } } 示例文件:張三$25$男$未婚$15997444444$409930360$中國$湖北$廣水 輸入文件:張三 15997444444廣水 shell 命令: /usr/local/hadoop/bin/hadoop fs -put /home/XX/test.txt /test_log/ /usr/local/hadoop/bin/hadoop jar /home/XX/test.jar /test_log/test.txt /test_cleaned/ 1>/dev/nul