(一)MR編寫之讀取linux上(指定目錄下的)配置文件

我首先把main函數和調用方式寫一下

/**
 *
 * @param args
 * 一、傳入參數
 * 二、傳出參數
 * 三、業務參數:①stg_log_  ②stg_log_class_perform_
 * 四、配置文件路徑:如xetl.properties
 */
public static void main(String[] args) {
    if (args.length < 2) {
        System.out.println("args must more than 2.");
        System.exit(0);
    }
    // day=(args[0].split("/"))[3];
    Configuration conf = new Configuration();
    FileSystem hdfs = null;
    try {
        int res = ToolRunner.run(conf, new AutoActLogParseMr(), args);
        System.exit(res);
    } catch (Exception e) {
        logger.error("", e);
    }
}

調用方式

hadoop jar( jar包路徑和名字) com.whh.bigdata.xetl.mr.AutoActLogParseMr  ${srcParth} ${log_data}/${com_present_date_Y_m_d} 'stg_log_' ${BASE_PATH}/include/xetl.properties

數據線:hadoop fs -cp /log_data/2017-09-12/1600035/* /log_data/stg_log_1600035/day=2017-06-22/java

下面倒序一下傳入到MR的配置信息:

配置信息最終是要在map中使用的,因此傳到map時是用map中的setup(Context context)方法的context,下面我來說解一下setup方法:mysql

protected void setup(Context context) throws IOException, InterruptedException {
            try {
                String mysqlUser = context.getConfiguration().get("mysqlUser");
                String mysqlUrl = context.getConfiguration().get("mysqlUrl");
                String mysqlPassword = context.getConfiguration().get("mysqlPassword");
                String dbname = context.getConfiguration().get("dbname");



                String string = context.getConfiguration().get("fs.allActs");
                actMap = AutoActLogParseUtil.getHiveStaticConf(string,mysqlUrl,mysqlUser,mysqlPassword,dbname);


            } catch (SQLException e) {
                e.printStackTrace();
            }
            super.setup(context);
        }

        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

        }

***插播:在hadoop的源碼中,基類Mapper類和Reducer類中都是隻包含四個方法:setup方法,cleanup方法,run方法,map方法。

/**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
        setup(context);
       try {
                 while (context.nextKeyValue()) {
                         map(context.getCurrentKey(), context.getCurrentValue(), context);
                 }
       } finally {
               cleanup(context);
       }
  }

能夠看出,在run方法中調用了上面的三個方法:setup方法,map方法,cleanup方法。其中setup方法和cleanup方法默認是不作任何操做,且它們只被執行一次。可是setup方法通常會在map函數以前執行一些準備工做,如做業的一些配置信息等;cleanup方法則是在map方法運行完以後最後執行 的,該方法是完成一些結尾清理的工做,如:資源釋放等。若是須要作一些配置和清理的工做,須要在Mapper/Reducer的子類中進行重寫來實現相應的功能。map方法會在對應的子類中從新實現,就是咱們自定義的map方法。該方法在一個while循環裏面,代表該方法是執行不少次的。run方法就是每一個maptask調用的方法。sql

那參數是怎麼傳到context的呢?

下面來看看run方法:apache

public int run(String[] params) throws Exception {

        Configuration conf = getConf();
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");

        Integer numReduceTasks = 3;

        FileSystem hdfs = null;
        try {
            // 程序配置
//            conf.set("fs.default.name", "hdfs://Galaxy");
            //config.set("hadoop.job.ugi", "feng,111111");
            //config.set("hadoop.tmp.dir", "/tmp/hadoop-fengClient");
            //config.set("dfs.replication", "1");
            //config.set("mapred.job.tracker", "master:9001");
//            hdfs = FileSystem.get(new URI("hdfs://Galaxy"),
//                    conf, "bigdata");
            Path path = new Path("/log_data/");
            hdfs = path.getFileSystem(conf);
         //   logger.info("path 的值:" + path);
            String flag=params[2];
            acts = getOutPutName(hdfs, path, conf,flag);
            conf.set("fs.allActs", acts);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // acts = Hdfstools.readHDFSFile("/log_data/actId");
      //  logger.info("acts的值爲" + acts);

        //獲取配置文件信息
        Config propertiesConfig = new Config();
        propertiesConfig.init(params[3]);

        String mysqlUrl = propertiesConfig.getValue("mysqlUrl");
        String mysqlUser = propertiesConfig.getValue("mysqlUser");
        String mysqlPassword = propertiesConfig.getValue("mysqlPassword");
        String dbname = propertiesConfig.getValue("dbname");


        conf.set("mysqlUser",mysqlUser);
        conf.set("mysqlUrl",mysqlUrl);
        conf.set("mysqlPassword",mysqlPassword);
        conf.set("dbname",dbname);



        Job job = Job.getInstance(conf);
        job.setJarByClass(AutoActLogParseMr.class);
        job.setMapperClass(AutoActLogParseMr.AutoActLogParseMaper.class);
        job.setReducerClass(AutoActLogParseMr.AutoActLogParseReducer.class);

        //將第一個路徑參數做爲輸入參數
        FileInputFormat.setInputPaths(job, new Path(params[0]));
        //將第二個參數做爲輸出參數
        FileOutputFormat.setOutputPath(job, new Path(params[1]));
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setNumReduceTasks(numReduceTasks);

        String dirName[] = acts.split(Constant.MARK_AITE);
        for (String a : dirName) {
            MultipleOutputs.addNamedOutput(job, a, TextOutputFormat.class,
                    NullWritable.class, Text.class);
        }
        logger.info("---excuter---");

        return job.waitForCompletion(true) ? 0 : 1;

    }

import org.apache.hadoop.conf.Configuration; 建立一個conf對象: Configuration conf = getConf();app

//獲取配置文件信息
//Config 是自定義的一個類,實現了從配置文件獲取數據的方法:
        Config propertiesConfig = new Config();
        propertiesConfig.init(params[3]);

        String mysqlUrl = propertiesConfig.getValue("mysqlUrl");
        String mysqlUser = propertiesConfig.getValue("mysqlUser");
        String mysqlPassword = propertiesConfig.getValue("mysqlPassword");
        String dbname = propertiesConfig.getValue("dbname");


        conf.set("mysqlUser",mysqlUser); //再set給Configuration 對象
        conf.set("mysqlUrl",mysqlUrl);
        conf.set("mysqlPassword",mysqlPassword);
        conf.set("dbname",dbname);


        Job job = Job.getInstance(conf);

***插播:Config 是自定義的一個類,實現了從配置文件獲取數據的方法,具體代碼以下:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.*;
import java.util.Properties;


public class Config {

    private static Log logger = LogFactory.getLog(Config.class);
    private static Properties propertie=null;

    private static FileInputStream inputFile=null;

    private static FileOutputStream outputFile=null;
   
    /**
     * 必須首先使用該方法初始化系統配置,不然將會在調用Config.getValue(key,default)時得到默認值,而Config.getValue(key)將返回空
     * @param configPathFile
     */
  public static void init(String configPathFile){
        propertie = new Properties();
        try {
            InputStream input = new FileInputStream(new File(configPathFile));
            if (input != null){
                propertie.load(input);
                input.close();
            }

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public static void initJarFile(String configPathFile) {
        propertie = new Properties();
        try {
            //讀jar包根目錄下的文件
            InputStream input = Config.class.getClass().getResourceAsStream("/" + configPathFile);
            if (input != null) {
                propertie.load(input);
                input.close();
            }
        } catch (Exception e) {
            logger.error("read properties file error: ", e);
        }
    }

    public static String getValue(String key) {
        if (propertie!=null&&propertie.containsKey(key)) {
            String value = propertie.getProperty(key);
            try {
				value=new String(value.getBytes("iso-8859-1"));
			} catch (UnsupportedEncodingException e) {
			}
            return value;
        } else
            return "";
    }

    public static String getValue(String key, String defaultValue) {
    	String v=null;
       if(key!=null){
    	   v=getValue(key);
       }
       if(v==null||v.trim().length()==0){
    	   v=defaultValue;
       }
       return v;
    }

    public static void clear() {
        propertie.clear();
    }

    public static void setValue(String key, String value) {
        propertie.setProperty(key, value);
    }

    public static void saveFile(String fileName, String description) {
        try {
            outputFile = new FileOutputStream(fileName);
            propertie.store(outputFile, description);
            outputFile.close();
        } catch (Exception e) {
            logger.error(e);
        }
    }

    public static void main(String[] args) {
        Properties pps=System.getProperties();
        //pps.list(System.out);
        Config config = new Config();
        config.initJarFile("mr_hbase.properties");
        String numReduceTasksStr = config.getValue("numReduceTasks");
        System.out.println(numReduceTasksStr);
    }
}

總結:配置文件的信息是經過MR的參數傳入的,在run方法中通過org.apache.hadoop.conf.Configuration的對象的set方法傳給Mapper類context,再通過setup()方法,進行賦值給Mapper類的類對象,供map使用。ide

相關文章
相關標籤/搜索