/** * * @param args * 一、傳入參數 * 二、傳出參數 * 三、業務參數:①stg_log_ ②stg_log_class_perform_ * 四、配置文件路徑:如xetl.properties */ public static void main(String[] args) { if (args.length < 2) { System.out.println("args must more than 2."); System.exit(0); } // day=(args[0].split("/"))[3]; Configuration conf = new Configuration(); FileSystem hdfs = null; try { int res = ToolRunner.run(conf, new AutoActLogParseMr(), args); System.exit(res); } catch (Exception e) { logger.error("", e); } }
hadoop jar( jar包路徑和名字) com.whh.bigdata.xetl.mr.AutoActLogParseMr ${srcParth} ${log_data}/${com_present_date_Y_m_d} 'stg_log_' ${BASE_PATH}/include/xetl.properties
數據線:hadoop fs -cp /log_data/2017-09-12/1600035/* /log_data/stg_log_1600035/day=2017-06-22/java
配置信息最終是要在map中使用的,因此傳到map時是用map中的setup(Context context)方法的context,下面我來說解一下setup方法:mysql
protected void setup(Context context) throws IOException, InterruptedException { try { String mysqlUser = context.getConfiguration().get("mysqlUser"); String mysqlUrl = context.getConfiguration().get("mysqlUrl"); String mysqlPassword = context.getConfiguration().get("mysqlPassword"); String dbname = context.getConfiguration().get("dbname"); String string = context.getConfiguration().get("fs.allActs"); actMap = AutoActLogParseUtil.getHiveStaticConf(string,mysqlUrl,mysqlUser,mysqlPassword,dbname); } catch (SQLException e) { e.printStackTrace(); } super.setup(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { }
/** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
能夠看出,在run方法中調用了上面的三個方法:setup方法,map方法,cleanup方法。其中setup方法和cleanup方法默認是不作任何操做,且它們只被執行一次。可是setup方法通常會在map函數以前執行一些準備工做,如做業的一些配置信息等;cleanup方法則是在map方法運行完以後最後執行 的,該方法是完成一些結尾清理的工做,如:資源釋放等。若是須要作一些配置和清理的工做,須要在Mapper/Reducer的子類中進行重寫來實現相應的功能。map方法會在對應的子類中從新實現,就是咱們自定義的map方法。該方法在一個while循環裏面,代表該方法是執行不少次的。run方法就是每一個maptask調用的方法。sql
下面來看看run方法:apache
public int run(String[] params) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec"); Integer numReduceTasks = 3; FileSystem hdfs = null; try { // 程序配置 // conf.set("fs.default.name", "hdfs://Galaxy"); //config.set("hadoop.job.ugi", "feng,111111"); //config.set("hadoop.tmp.dir", "/tmp/hadoop-fengClient"); //config.set("dfs.replication", "1"); //config.set("mapred.job.tracker", "master:9001"); // hdfs = FileSystem.get(new URI("hdfs://Galaxy"), // conf, "bigdata"); Path path = new Path("/log_data/"); hdfs = path.getFileSystem(conf); // logger.info("path 的值:" + path); String flag=params[2]; acts = getOutPutName(hdfs, path, conf,flag); conf.set("fs.allActs", acts); } catch (Exception e) { e.printStackTrace(); } // acts = Hdfstools.readHDFSFile("/log_data/actId"); // logger.info("acts的值爲" + acts); //獲取配置文件信息 Config propertiesConfig = new Config(); propertiesConfig.init(params[3]); String mysqlUrl = propertiesConfig.getValue("mysqlUrl"); String mysqlUser = propertiesConfig.getValue("mysqlUser"); String mysqlPassword = propertiesConfig.getValue("mysqlPassword"); String dbname = propertiesConfig.getValue("dbname"); conf.set("mysqlUser",mysqlUser); conf.set("mysqlUrl",mysqlUrl); conf.set("mysqlPassword",mysqlPassword); conf.set("dbname",dbname); Job job = Job.getInstance(conf); job.setJarByClass(AutoActLogParseMr.class); job.setMapperClass(AutoActLogParseMr.AutoActLogParseMaper.class); job.setReducerClass(AutoActLogParseMr.AutoActLogParseReducer.class); //將第一個路徑參數做爲輸入參數 FileInputFormat.setInputPaths(job, new Path(params[0])); //將第二個參數做爲輸出參數 FileOutputFormat.setOutputPath(job, new Path(params[1])); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(numReduceTasks); String dirName[] = acts.split(Constant.MARK_AITE); for (String a : dirName) { MultipleOutputs.addNamedOutput(job, a, TextOutputFormat.class, NullWritable.class, Text.class); } logger.info("---excuter---"); return job.waitForCompletion(true) ? 0 : 1; }
import org.apache.hadoop.conf.Configuration; 建立一個conf對象: Configuration conf = getConf();app
//獲取配置文件信息 //Config 是自定義的一個類,實現了從配置文件獲取數據的方法: Config propertiesConfig = new Config(); propertiesConfig.init(params[3]); String mysqlUrl = propertiesConfig.getValue("mysqlUrl"); String mysqlUser = propertiesConfig.getValue("mysqlUser"); String mysqlPassword = propertiesConfig.getValue("mysqlPassword"); String dbname = propertiesConfig.getValue("dbname"); conf.set("mysqlUser",mysqlUser); //再set給Configuration 對象 conf.set("mysqlUrl",mysqlUrl); conf.set("mysqlPassword",mysqlPassword); conf.set("dbname",dbname); Job job = Job.getInstance(conf);
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.*; import java.util.Properties; public class Config { private static Log logger = LogFactory.getLog(Config.class); private static Properties propertie=null; private static FileInputStream inputFile=null; private static FileOutputStream outputFile=null; /** * 必須首先使用該方法初始化系統配置,不然將會在調用Config.getValue(key,default)時得到默認值,而Config.getValue(key)將返回空 * @param configPathFile */ public static void init(String configPathFile){ propertie = new Properties(); try { InputStream input = new FileInputStream(new File(configPathFile)); if (input != null){ propertie.load(input); input.close(); } } catch (Exception ex) { ex.printStackTrace(); } } public static void initJarFile(String configPathFile) { propertie = new Properties(); try { //讀jar包根目錄下的文件 InputStream input = Config.class.getClass().getResourceAsStream("/" + configPathFile); if (input != null) { propertie.load(input); input.close(); } } catch (Exception e) { logger.error("read properties file error: ", e); } } public static String getValue(String key) { if (propertie!=null&&propertie.containsKey(key)) { String value = propertie.getProperty(key); try { value=new String(value.getBytes("iso-8859-1")); } catch (UnsupportedEncodingException e) { } return value; } else return ""; } public static String getValue(String key, String defaultValue) { String v=null; if(key!=null){ v=getValue(key); } if(v==null||v.trim().length()==0){ v=defaultValue; } return v; } public static void clear() { propertie.clear(); } public static void setValue(String key, String value) { propertie.setProperty(key, value); } public static void saveFile(String fileName, String description) { try { outputFile = new FileOutputStream(fileName); propertie.store(outputFile, description); outputFile.close(); } catch (Exception e) { logger.error(e); } } public static void main(String[] args) { Properties pps=System.getProperties(); //pps.list(System.out); Config config = new Config(); config.initJarFile("mr_hbase.properties"); String numReduceTasksStr = config.getValue("numReduceTasks"); System.out.println(numReduceTasksStr); } }
總結:配置文件的信息是經過MR的參數傳入的,在run方法中通過org.apache.hadoop.conf.Configuration的對象的set方法傳給Mapper類context,再通過setup()方法,進行賦值給Mapper類的類對象,供map使用。ide