將基於行的數據轉化成分層格式,如JSON。能夠用在文章和評論的關係上等等。java
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import filtering.GrepMain; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; import java.io.IOException; /** * @Author bluesnail95 * @Date 2019/7/21 8:50 * @Description */ public class PostCommentMain { public static class PostMapper extends Mapper<Object, Text,Text,Text> { public void map(Object key,Text value,Context context) { JSONObject textJson = JSONObject.parseObject(value.toString()); String id = textJson.getString("id"); if(StringUtils.isNoneBlank(id)) { try { textJson.put("type","P"); context.write(new Text(id),new Text(textJson.toString())); } catch (Exception e) { e.printStackTrace(); } } } } public static class CommentMapper extends Mapper<Object, Text,Text,Text> { public void map(Object key,Text value,Context context) { JSONObject textJson = JSONObject.parseObject(value.toString()); String id = textJson.getString("postId"); if(StringUtils.isNoneBlank(id)) { try { textJson.put("type","C"); context.write(new Text(id),new Text(textJson.toString())); } catch (Exception e) { e.printStackTrace(); } } } } public static class PostCommentReducer extends Reducer<Text,Text,Text, NullWritable> { private JSONObject postJson = null; private JSONArray commentArray = new JSONArray(); public void reduce(Text key,Iterable<Text> values,Context context) { postJson = null; commentArray.clear(); for (Text value : values) { JSONObject valueJson = JSONObject.parseObject(value.toString()); String type = valueJson.getString("type"); if("P".equals(type)) { postJson = valueJson; }else if("C".equals(type)) { commentArray.add(valueJson); } } postJson.put("comemnt",commentArray); try { context.write(new Text(postJson.toString()),NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "PostComment"); //與本身定義的類名保持一致 job.setJarByClass(PostCommentMain.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //輸入輸出路徑 //與本身定義的Mapper類和Reducer類保持一致 job.setReducerClass(PostCommentReducer.class); MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,PostMapper.class); MultipleInputs.addInputPath(job,new Path(args[1]), TextInputFormat.class,CommentMapper.class); FileUtil.fullyDelete(new File(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; import java.io.IOException; /** * @Author bluesnail95 * @Date 2019/7/21 10:01 * @Description */ public class QuestionAnswerMain { public static class QuestionAnswerMapper extends Mapper<Object, Text,Text, Text> { public void map(Object key,Text value,Context context) { JSONObject valueJson = JSONObject.parseObject(value.toString()); String parentId = valueJson.getString("parentId"); try { if(StringUtils.isNotBlank(parentId)) { context.write(new Text(parentId),value); }else { String id = valueJson.getString("id"); context.write(new Text(id),value); } }catch(Exception e) { e.printStackTrace(); } } } public static class QuestionAnswerReducer extends Reducer<Text,Text,Text,NullWritable> { private JSONObject questionJson = null; private JSONArray answerArray = new JSONArray(); public void reduce(Text key, Iterable<Text> values, Context context) { questionJson = null; answerArray.clear(); for (Text value:values) { JSONObject valueJson = JSONObject.parseObject(value.toString()); String parentId = valueJson.getString("parentId"); if(StringUtils.isNotBlank(parentId)) { answerArray.add(valueJson); }else{ questionJson = valueJson; } } questionJson.put("answer",answerArray); try { context.write(new Text(questionJson.toString()),NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "question and answer"); //與本身定義的類名保持一致 job.setJarByClass(QuestionAnswerMain.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(QuestionAnswerMapper.class); job.setReducerClass(QuestionAnswerReducer.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
import com.alibaba.fastjson.JSONObject; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; import java.text.SimpleDateFormat; import java.util.Calendar; /** * @Author bluesnail95 * @Date 2019/7/22 6:27 * @Description */ public class PartitionMain { public static class PatitionMapper extends Mapper<Object,Text, IntWritable, Text> { private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public void map(Object key,Text value,Context context) { try { JSONObject valueJson = JSONObject.parseObject(value.toString()); String strDate = valueJson.getString("lastAccessDate"); Calendar calendar = Calendar.getInstance(); calendar.setTime(frmt.parse(strDate)); int year = calendar.get(Calendar.YEAR); context.write(new IntWritable(year),value); } catch (Exception e) { e.printStackTrace(); } } } public static class LastAccessDatePartitioner extends Partitioner<IntWritable,Text> implements Configurable { private static final String MIN_LAST_ACCESS_DATE_YEAR = "min.last.access.date.year"; private Configuration conf = null; private int minLastAccessDateYear = 0; public void setConf(Configuration conf) { this.conf = conf; minLastAccessDateYear = conf.getInt(MIN_LAST_ACCESS_DATE_YEAR,0); } public Configuration getConf() { return conf; } public int getPartition(IntWritable key, Text value, int numPartitions) { return key.get() - minLastAccessDateYear; } public static void setMinLastAccessDate(Job job,int minLastAccessDateYear) { job.getConfiguration().setInt(MIN_LAST_ACCESS_DATE_YEAR,minLastAccessDateYear); } } public static class PatitionReducer extends Reducer<IntWritable,Text,Text,NullWritable> { public void reduce(IntWritable key,Iterable<Text> values,Context context) { for(Text value:values) { try { context.write(value, NullWritable.get()); } catch (Exception e) { e.printStackTrace(); } } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Partition"); //與本身定義的類名保持一致 job.setJarByClass(PartitionMain.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(PatitionMapper.class); job.setPartitionerClass(LastAccessDatePartitioner.class); LastAccessDatePartitioner.setMinLastAccessDate(job,2010); job.setNumReduceTasks(10); job.setReducerClass(PatitionReducer.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.File; /** * @Author bluesnail95 * @Date 2019/7/22 20:46 * @Description */ public class BinningMain { public static class BinningMapper extends Mapper<Object,Text, Text, NullWritable> { private MultipleOutputs<Text,NullWritable> output = null; public void setup(Context context) { output = new MultipleOutputs<Text, NullWritable>(context); } public void map(Object key,Text value,Context context) { try { JSONObject valueJson = JSONObject.parseObject(value.toString()); String tag = valueJson.getString("tag"); if(StringUtils.isBlank(tag)) { return; } if(tag.equalsIgnoreCase("hadoop")) { output.write("bins",value,NullWritable.get(),"hadoop-tag"); } if(tag.equalsIgnoreCase("hive")) { output.write("bins",value,NullWritable.get(),"hive-tag"); } if(tag.equalsIgnoreCase("hbase")) { output.write("bins",value,NullWritable.get(),"hbase-tag"); } if(tag.equalsIgnoreCase("pig")) { output.write("bins",value,NullWritable.get(),"pig-tag"); } } catch (Exception e) { e.printStackTrace(); } } protected void cleanup(Context context) { try { output.close(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Binning"); //與本身定義的類名保持一致 job.setJarByClass(BinningMain.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(BinningMapper.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class,NullWritable.class); MultipleOutputs.setCountersEnabled(job,true); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
import com.alibaba.fastjson.JSONObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import java.io.File; /** * @Author bluesnail95 * @Date 2019/7/23 6:25 * @Description */ public class TotalOrderSortMain { public static class LastAccessDateMapper extends Mapper<Object,Text,Text,Text> { public void map(Object key, Text value, Context context) { JSONObject valueJson = JSONObject.parseObject(value.toString()); String lastAccessDate = valueJson.getString("lastAccessDate"); try { context.write(new Text(lastAccessDate),value); } catch (Exception e) { e.printStackTrace(); } } } public static class ValueReducer extends Reducer<Text,Text,Text, NullWritable> { public void reduce(Text key,Iterable<Text> values,Context context) { for(Text value:values) { try { context.write(value,NullWritable.get()); } catch (Exception e) { e.printStackTrace(); } } } } public static void main(String[] args) { Configuration configuration = new Configuration(); Path inputPath = new Path(args[0]); Path partitionFile = new Path(args[1] + "_partitions.lst"); Path outputStage = new Path(args[1] + "_staging"); Path outputOrder = new Path(args[1]); try { Job sampleJob = Job.getInstance(configuration,"TotalOrderSortingStage"); sampleJob.setJarByClass(TotalOrderSortMain.class); sampleJob.setMapperClass(LastAccessDateMapper.class); sampleJob.setNumReduceTasks(0); sampleJob.setOutputKeyClass(Text.class); sampleJob.setOutputValueClass(Text.class); TextInputFormat.setInputPaths(sampleJob,inputPath); sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileUtil.fullyDelete(new File(args[1] + "_staging")); SequenceFileOutputFormat.setOutputPath(sampleJob,outputStage); int code = sampleJob.waitForCompletion(true) ? 0 : 1; if(code == 0) { Job orderJob = Job.getInstance(configuration,"TotalOrderSortingStage"); orderJob.setMapperClass(Mapper.class); orderJob.setReducerClass(ValueReducer.class); orderJob.setNumReduceTasks(10); orderJob.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(configuration,partitionFile); orderJob.setOutputKeyClass(Text.class); orderJob.setOutputValueClass(Text.class); orderJob.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setInputPaths(orderJob,outputStage); FileUtil.fullyDelete(new File(args[1])); TextOutputFormat.setOutputPath(orderJob,outputOrder); orderJob.getConfiguration().set("mapred.textoutputformat.separator",""); InputSampler.writePartitionFile(orderJob,new InputSampler.RandomSampler(1,20)); code = orderJob.waitForCompletion(true) ? 0 : 2; } FileSystem.get(new Configuration()).delete(partitionFile,false); FileSystem.get(new Configuration()).delete(outputStage,true); System.exit(0); } catch (Exception e) { e.printStackTrace(); } } }
import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.File; /** * @Author bluesnail95 * @Date 2019/7/22 20:46 * @Description */ public class BinningMain { public static class BinningMapper extends Mapper<Object,Text, Text, NullWritable> { private MultipleOutputs<Text,NullWritable> output = null; public void setup(Context context) { output = new MultipleOutputs<Text, NullWritable>(context); } public void map(Object key,Text value,Context context) { try { JSONObject valueJson = JSONObject.parseObject(value.toString()); String tag = valueJson.getString("tag"); if(StringUtils.isBlank(tag)) { return; } if(tag.equalsIgnoreCase("hadoop")) { output.write("bins",value,NullWritable.get(),"hadoop-tag"); } if(tag.equalsIgnoreCase("hive")) { output.write("bins",value,NullWritable.get(),"hive-tag"); } if(tag.equalsIgnoreCase("hbase")) { output.write("bins",value,NullWritable.get(),"hbase-tag"); } if(tag.equalsIgnoreCase("pig")) { output.write("bins",value,NullWritable.get(),"pig-tag"); } } catch (Exception e) { e.printStackTrace(); } } protected void cleanup(Context context) { try { output.close(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Binning"); //與本身定義的類名保持一致 job.setJarByClass(BinningMain.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(BinningMapper.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,Text.class,NullWritable.class); MultipleOutputs.setCountersEnabled(job,true); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
《MapReduce設計模式》apache