MapReduce程式調用第三方包:我在使用過程當中須要用到hbase的jar包,若要使用,常規是添加到每臺機器的classpath中,可是 經過DistributeCache,在初始化前加入就ok了。要不就要將這些jar包打成一個新jar,經過hadoop jar XXX.jar運行,可是不利於代碼更新和維護。
html
咱們知道,在Hadoop中有一個叫作DistributedCache的東東,它是用來分發應用特定的只讀文件和一個jar包的,以供Map- Reduce框架在啓動任務和運行的時候使用這些緩衝的文件或者是把第三方jar包添加到其classpath路徑中去,要注意的是 DistributedCache的使用是有一個前提的,就它會認爲這些經過urls來表示的文件已經在hdfs文件系統裏面,因此這裏在使用的時候第一 步就是要把這些文件上傳到HDFS中。
而後Hadoop框架會把這些應用所須要的文件複製到每一個準備啓動的節點上去,它會把這些複製到mapred.temp.dir配置的目錄中去,以供相應的Task節點使用。
這裏要注意的DistriubtedCache分發的文件分紅公有與私有文件,公有文件能夠給HDFS中的全部用戶使用,而私有文件只能被特定的用戶所使用,用戶能夠配置上傳文件的訪問權限來達到這種效果。
public boolean run(Configuration conf, String inputPath, String outPath,String category)
throws Exception {
Job job = new Job(conf, "DIP_DIPLOGFILTER-"+category);
DistributedCache.addFileToClassPath(new Path("/libs/hbase-0.92.1-cdh4.0.0-security.jar"), job.getConfiguration());
job.setJarByClass(AnalysisLoader.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(AnalysisMapper.class);
job.setMapOutputKeyClass(ComplexKey.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(ComplexKeyPartitioner.class);
// job.setCombinerClass(AnalysisReducer.class);
job.setReducerClass(AnalysisReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(LogConfig.reduceCount);
String hdfs = ServerConfig.getHDFS();
String[] inputPaths =inputPath.split(",");
for (String p : inputPaths) {
if (!p.startsWith(hdfs)) {
p = hdfs + p;
}
MultipleInputs.addInputPath(job, new Path(p),TextInputFormat.class, AnalysisMapper.class);
}
FileOutputFormat.setOutputPath(job, new Path(outPath));
return(job.waitForCompletion(true));
}
DistributeCache的使用通常分紅三步:
1. 配置應用程序的cache,把須要使用的文件上傳到DFS中去
app
[html] view plaincopy框架
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat oop
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip url
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar spa
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar .net
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz orm
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz htm
2. 配置JobConf
對象
[html] view plaincopy
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),job); // 這裏的lookup.dat加了一個符號鏈接
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); // 這裏是把相應的jar包加到Task的啓動路徑上去
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. 在Mapper或者Reducer任務中使用這些文件
[html] view plaincopy
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
localArchives = DistributedCache.getLocalCacheArchives(job); // 獲得本地打包的文件,通常是數據文件,如字典文件
localFiles = DistributedCache.getLocalCacheFiles(job); // 獲得本地緩衝的文件,通常是配置文件等
}
public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}
1. 咱們知道,新的MP接口使用了Job這個類來對MP任務進行配置,這裏使用的時候要注意一點 Configuration conf = new Configuration(); // 對conf加入配置信息 - 正確方法 Job job = new Job(conf,"word count"); // 對conf加入配置信息 - 這是有問題的,這些配置不會生效,由於這裏生成Job的時候它會對conf進行復制,這個看一下Job的源代碼就知道。 // 這裏能夠用job.getConfiguration()來獲得其內部的conf對象,這樣就不會有問題。2. 若是你在啓動MP任務以前調用了第三方jar包的類,那這就會有問題,會在啓動任務的時候找不到這個類。這個問題我尚未找到好的解決辦法,一個辦法就是 把這些類想辦法移到MP任務中,若是有朋友知道更加好的辦法,請告訴我一下,多謝了。我感受Nutch中也會有一樣的問題,何時研究一下Nutch的 代碼,說不定會有不少關於Hadoop方面的收穫。