五、map端獲取數據來源於那個表

一、需求

獲取map端處理的數據來自於那個文件java

二、理解

主要是map端的setup方法有一個Context參數,裏面包含了此map對應的split是來自於哪一個文件apache

三、代碼

  • 一、GetSourceMapper.classapp

    package com.bigdata.surfilter.mapgetsource;
    
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
      import java.io.IOException;
    
      /**
       * [@Author](https://my.oschina.net/arthor) liufu
       * @CreateTime 2016/7/26  16:44
       */
      public class GetSourceMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
          //一個map默認處理一個block,那麼他的數據必定來自於同一個文件
          String sourceFile = null;
          Text outPutText = null;
          [@Override](https://my.oschina.net/u/1162528)
          protected void setup(Context context) throws IOException, InterruptedException {
              FileSplit inputSplit = (FileSplit)context.getInputSplit();
              Path path = inputSplit.getPath();
              sourceFile = path.getName();
              outPutText = new Text();
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              outPutText.set(value.toString() + " recevie from " + sourceFile);
              context.write(outPutText, null);
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          protected void cleanup(Context context) throws IOException, InterruptedException {
              outPutText = null;
          }
      }
  • 二、ApplicationRun.classide

    package com.bigdata.surfilter;
    
      import com.bigdata.surfilter.flowcount.*;
      import com.bigdata.surfilter.mapgetsource.GetSourceMapper;
      import com.bigdata.surfilter.wordcount.WordCountMapper;
      import com.bigdata.surfilter.wordcount.WordCountReduce;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
      import java.io.IOException;
    
      /**
       * @Author liufu
       * @CreateTime 2016/7/25  15:55
       */
      public class ApplicationRun {
          public static void main(String[] args) throws IOException {
              Configuration conf = new Configuration();
    
              //在集羣中使用hadoop jar 命令去啓動,就不須要設置下面這兩行,由於hadoop安裝目錄下的/etc/hadoop中配置文件有這些配置
      //        conf.set("fs.default.name", "hdfs://192.168.0.186:9000");
              //假裝本身的身份爲root
      //        System.setProperty("HADOOP_USER_NAME", "root");
    
              Job job = new Job(conf, "getsourcemapper");
    
              //經過classpath中主類找到jar
              job.setJarByClass(ApplicationRun.class);
    
              //job的map端和reduce端代碼
              job.setMapperClass(GetSourceMapper.class);
    
              //設置map端和reduce輸出的類型,這樣纔可以作反射獲得對應的類
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(NullWritable.class);
    
              //job 如何讀取數據,如何寫出數據
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
    
              //job 的數據從哪裏來;  綁定輸入目錄,能夠使用setInputPaths, 也能夠使用 addInputPaths
              FileInputFormat.setInputPaths(job, new Path("/mapgetsource/input1/"),new Path("/mapgetsource/input2/"));
    
              //寫到哪裏去
              FileOutputFormat.setOutputPath(job, new Path("/mapgetsource/output/"));
    
              //不須要reduce
              job.setNumReduceTasks(0);
    
              try {
                  boolean b = job.waitForCompletion(true);
                  System.exit(b == true ? 0 : 1);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (ClassNotFoundException e) {
                  e.printStackTrace();
              }
          }
      }
相關文章
相關標籤/搜索