Hadoop Job的提交,到底提交了什麼?

##一個最基本的Hadoo任務 WordCountMapper:java

<pre class="prettyprint lang-java"> private final static IntWritable one = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { String str = tokenizer.nextToken(); context.write(new Text(StringUtils.trim(str.replaceAll("\\W", ""))), one); } } </pre>apache

WordCountReduce:緩存

<pre class="prettyprint lang-java"> @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } </pre>架構

WordCount:app

<pre class="prettyprint lang-java"> final Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("wordcount"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); </pre>eclipse

如你所見, 這是個Hadoop基礎的入門例子, 若是你瞭解Hadoop, 你已經對這些代碼熟記於心了.這篇文章我想說明Hadoop提交Job到底提交了那些東西,提交了哪些類,不須要提交那些東西.分佈式

##Hadoop任務提交ide

###傳統的Hadoop任務提交oop

把上面的代碼<code> WordCountMapper, WordCountReduce, WordCount</code>打包成jar,放到hadoop目錄下, 使用<code>hadoop jar wordcount.jar WordCount</code>運行任務. 這樣方式我稱爲傳統的方式,也是《hadoop權威指南》上一向的方法.測試

###Eclipse的hadoop插件的Hadoop任務提交

若是你開發過Hadoop的Job, 那麼對這個應該很熟悉.大多數開發測試都是用這個提交任務的,若是每次都是打包成jar, 再用hadoop jar ** 這還不把人搞瘋.

若是你還細心,你會發現,你選好Hadoop的jobtracker,提交任務的前一刻,Eclipse會彈出一個浮動窗口,上面跳動着顯示不少jar名.爲何會這樣?它作了什麼?

在此輸入圖片描述

###在Eclipse中當作Java Application運行爲何不能夠?

Hadoop的job項目都有main方法,這個是符合JavaApplication運行條件的,那麼咱們是否是可使用Eclipse中直接運行呢?當咱們嘗試運行的時候,程序是能夠運行的,但總當運行一下子(幾秒鐘)後拋出WordCountMapper <code>ClassNotFount</code>的錯誤.

那麼爲何程序不是直接拋出錯誤而是過了一下子才拋出?爲何用Eclipse Hadoop插件運行不會發生這個錯誤.

##背景

寫這篇文章前,咱們已經正在開發一個Hadoop的任務調度的系統. 也就是一個項目中提早寫好不少個Hadoop Job綁定到裏邊,若是想要運行哪一個Job,咱們就從前臺配置好參數,並把這個Job提交到Hadoop集羣. 並從前臺不斷的獲得Job的運行任務信息,取得Job的執行進度. 不關心進度的話直接去喝茶就能夠,完了來看結果.

若是每次都是<code>hadoop jar wordcount.jar WordCount</code>未免太弱智. 若是每次從Eclispe中運行,那專業性太強,也不可取.是否是有更好的方法提交任務?

答案確定是有的.

##用JVisualVM監視Eclipse hadoop插件的Hadoop任務提交

打開JVisualVM準備着,運行一個Job. 運行後當即就能夠看到一個Java進程. 用JVisualVM打開這個進程查看,如圖:

在此輸入圖片描述

在此輸入圖片描述

我打開我電腦上的目錄<code>F:\Eclipse\workspace.metadata.plugins\org.apache.hadoop.eclipse\hadoop-conf-1007657720166395816</code>而且查看了它的上級目錄,頓時一些皆明朗了.Eclipse Hadoop插件居然把個人項目下全部的類,資源文件打包成jar而後運行的.

在此輸入圖片描述

在此輸入圖片描述

##使用Hadoop Api提交Job,完美解決方案

其實我也是從這篇文章(http://luliangy.iteye.com/blog/1401453)中找到靈感的.爲何把項目打包,以Java Application的方式就正常運行了.

<pre class="prettyprint lang-java"> ((JobConf) job.getConfiguration()).setJar("wordcount.jar"); job.setJarByClass(WordCount.class); </pre>

衝着這股勁我看了不少Hadoop API,終於找到爲何.

經過<code>job.setJarByClass(WordCount.class); </code>這條路查看源碼, 你會找到以下的兩個方法.

<pre class="prettyprint lang-java"> private static String findContainingJar(Class my_class) { ClassLoader loader = my_class.getClassLoader(); String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; try { for(Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { URL url = (URL) itr.nextElement(); if ("jar".equals(url.getProtocol())) { String toReturn = url.getPath(); if (toReturn.startsWith("file:")) { toReturn = toReturn.substring("file:".length()); } toReturn = URLDecoder.decode(toReturn, "UTF-8"); return toReturn.replaceAll("!.*$", ""); } } } catch (IOException e) { throw new RuntimeException(e); } return null; } public void setJarByClass(Class cls) { String jar = findContainingJar(cls); if (jar != null) { setJar(jar); } } </pre>

我說說<code>findContainingJar</code>有什麼做用? 當使用<code>job.setJarByClass(WordCount.class);</code>設置類的時候, Hadoop Client能從你的classpath中取得<code>WordCount.class</code>所在的jar包的jar File絕對路徑.若是找不到jar, <code>setJar(jar);</code>方法沒有執行,jar確定是個空值.

咱們在Eclipse中直接以Java Application運行的時候,classpath是一個本地文件夾, findContainingJar確定找不到項目的jar.也就是Mapper和Reduce所在的jar. 這樣在提交任務時候Configuration中<code>mapper.jar</code>屬性是一個空值.這也就解釋了爲何在Eclipse中當作Java Application運行時老是過一段時間後才發生ClassNotFound的錯誤緣由.

其實到這裏Hadoop提交了什麼也好解釋了.

Hadoop向集羣中提交了一個xml和一個攜帶Mapper/Reduce的jar. xml就是Configuration對象序列化的結果.

說到這裏也許你已經發現,這是一個開發上的架構問題.既然Hadoop Job須要Map/Reduce的jar.咱們應該把全部的Map/Reduce單獨在一個項目中開發.而後打包放入調度系統項目的ClassPath就行了.而後在調度系統中構造Job,並把<code>job.setJarByClass(class);</code>中的class設置爲該Job的map clas或者reduce class就好了.

##哪些是在Client執行的?哪些是在Hadoop集羣中執行?

一個Hadoop 任務通常都有3個類(Map/Reduce/Job).<code>WordCountMapper, WordCountReduce, WordCount</code>你認爲這三個類都會提交到集羣中執行嗎?

不是! 只有Mapper和Reduce這2個類會提交到Hadoop集羣, MapReduce執行也是這2個類. WordCount只是充當一個配置Job的客戶端,而且提交任務,以後又定時輪詢Job的運行狀態輸出簡單的日誌,直到任務完成,WordCount的這個進程會自動退出.

##Hadoop分佈式緩存

講到這裏你也許能順利的實現一個和我相同思路的系統了.

可是我仍是想說一個常見錯誤. 不是Mapper Class NotFound,而是Mapper中使用的Class NotFound. 而你又不想往hadoop集羣中添加jar包,也不想重啓Hadoop集羣. 你可使用Hadoop提供的一個類:<code>DistributedCache</code>

  1. DistributedCache.addArchiveToClassPath() 添加hdfs上的jar到MapReduce的classpath
  2. DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), job);
  3. DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
  4. DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
  5. DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
  6. DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
  7. DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
相關文章
相關標籤/搜索