獲取map端處理的數據來自於那個文件java
主要是map端的setup方法有一個Context參數,裏面包含了此map對應的split是來自於哪一個文件apache
一、GetSourceMapper.classapp
package com.bigdata.surfilter.mapgetsource; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * [@Author](https://my.oschina.net/arthor) liufu * @CreateTime 2016/7/26 16:44 */ public class GetSourceMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ //一個map默認處理一個block,那麼他的數據必定來自於同一個文件 String sourceFile = null; Text outPutText = null; [@Override](https://my.oschina.net/u/1162528) protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit)context.getInputSplit(); Path path = inputSplit.getPath(); sourceFile = path.getName(); outPutText = new Text(); } [@Override](https://my.oschina.net/u/1162528) protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { outPutText.set(value.toString() + " recevie from " + sourceFile); context.write(outPutText, null); } [@Override](https://my.oschina.net/u/1162528) protected void cleanup(Context context) throws IOException, InterruptedException { outPutText = null; } }
二、ApplicationRun.classide
package com.bigdata.surfilter; import com.bigdata.surfilter.flowcount.*; import com.bigdata.surfilter.mapgetsource.GetSourceMapper; import com.bigdata.surfilter.wordcount.WordCountMapper; import com.bigdata.surfilter.wordcount.WordCountReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * @Author liufu * @CreateTime 2016/7/25 15:55 */ public class ApplicationRun { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); //在集羣中使用hadoop jar 命令去啓動,就不須要設置下面這兩行,由於hadoop安裝目錄下的/etc/hadoop中配置文件有這些配置 // conf.set("fs.default.name", "hdfs://192.168.0.186:9000"); //假裝本身的身份爲root // System.setProperty("HADOOP_USER_NAME", "root"); Job job = new Job(conf, "getsourcemapper"); //經過classpath中主類找到jar job.setJarByClass(ApplicationRun.class); //job的map端和reduce端代碼 job.setMapperClass(GetSourceMapper.class); //設置map端和reduce輸出的類型,這樣纔可以作反射獲得對應的類 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //job 如何讀取數據,如何寫出數據 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //job 的數據從哪裏來; 綁定輸入目錄,能夠使用setInputPaths, 也能夠使用 addInputPaths FileInputFormat.setInputPaths(job, new Path("/mapgetsource/input1/"),new Path("/mapgetsource/input2/")); //寫到哪裏去 FileOutputFormat.setOutputPath(job, new Path("/mapgetsource/output/")); //不須要reduce job.setNumReduceTasks(0); try { boolean b = job.waitForCompletion(true); System.exit(b == true ? 0 : 1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }