這學期「體系結構」的大做業是讓寫一個搜索引擎。因此前一段時間就簡單看了下Nutch。這篇文章不會涉及到搜索引擎的細節,而是側重於Hadoop在Nutch中的應用。java
Hadoop在Nutch中是充當了數據的存儲和MapReduce任務執行的角色。數據庫
咱們首先關注下頁面爬取的過程。在Nutch中有個叫作「CrawlDb「的數據庫,它是用來存儲全部網絡爬蟲已知的URL地址,以及該地址的相應狀態信息的的。其中狀態信息包括爬取狀態,上次爬取的時間,爬取間隔等。在Nutch中,該數據庫使用Hadoop的Map文件形式存儲。具體格式爲<Text, CrawlDatum>。其中的key表示該URL的地址,而value則是Nutch本身定義的一個類型「CrawlDatum」,該類型實現了"Writable"接口,用來表示頁面的一些屬性。apache
「CrawlDatum"的類的具體形式以下:網絡
package org.apache.nutch.crawl; public class CrawlDatum implements WritableComparable<CrawlDatum>, Cloneable { private byte status; private long fetchTime = System.currentTimeMillis(); private byte retries; private int fetchInterval; private float score = 1.0f; private byte[] signature = null; private long modifiedTime; private org.apache.hadoop.io.MapWritable metaData; }
其中包括了頁面的狀態,抓取時間,抓取間隔等屬性。app
搜索引擎最初開始工做時,「CrawlDb」的內容是從一個簡單的文本文件中獲取的。該文本文件包含了一系列初始的URL地址。這個過程叫作「Injector」。若是"CrawlDb"已經存在,這時再調用「Injector"會把"CrawlDb"中原始的數據和提供的文本文件中的URL數據合併。dom
在Nutch中」Injector「實現分爲4步,其中前兩步爲兩個MapReduce任務。函數
第一個M/R任務是用來把文本文件中的URL進行過濾,規範化,包裝成「CrawlDatum」,並最終存成臨時的Sequence類型文件。該臨時文件的格式爲<Key, CrawlDatum>。oop
第二個M/R任務是用來把排序後的結果和原「CrawlDb"中的數據(若是存在)進行合併,生成新的CrawlDb文件。fetch
第三步,用新生成的CrawlDb來替換原有的文件。搜索引擎
第四步,刪除以前產生的臨時文件。
下面咱們來看一下它的實現過程的代碼。
public void inject(Path crawlDb, Path urlDir) throws IOException {
// 首先生成一個臨時路徑,來存放中間結果。 Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/inject-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// 建立排序任務 JobConf sortJob = new NutchJob(getConf()); // 設置相關屬性 FileInputFormat.addInputPath(sortJob, urlDir); sortJob.setMapperClass(InjectMapper.class); FileOutputFormat.setOutputPath(sortJob, tempDir); sortJob.setOutputFormat(SequenceFileOutputFormat.class); sortJob.setOutputKeyClass(Text.class); sortJob.setOutputValueClass(CrawlDatum.class); // 運行任務 JobClient.runJob(sortJob);
// 建立合併任務 JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); // 設置屬性 FileInputFormat.addInputPath(mergeJob, tempDir); mergeJob.setReducerClass(InjectReducer.class); // 運行任務 JobClient.runJob(mergeJob);
// 替換 CrawlDb.install(mergeJob, crawlDb);
// 刪除臨時文件 FileSystem fs = FileSystem.get(getConf()); fs.delete(tempDir, true); }
其中第一個任務的Mapper類爲「InjectMapper」。咱們來看一下它的具體實現。
public void map(WritableComparable key, Text value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
// 得到一行URL地址 String url = value.toString();
try { // 規範化和過濾URL url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); url = filters.filter(url); } catch (Exception e) { url = null; }
// 保存結果 if (url != null) { value.set(url); CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, customInterval); datum.setFetchTime(curTime); output.collect(value, datum); } }
該M/R任務執行後,以前的文本文件會被轉化成一個臨時的Sequence文件,其格式爲<Text, CrawlDatum>。
接下來就是合併任務的執行。該合併任務是經過調用「CrawlDb.createJob()」方法來建立的,因此咱們首先來看一下該方法。
public static JobConf createJob(Configuration config, Path crawlDb) throws IOException {
// 爲即將生成的新"CrawlDb"產生一個臨時的隨機路徑 Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// 建立任務 JobConf job = new NutchJob(config); // 得到已有的「CrawlDb"的路徑 Path current = new Path(crawlDb, CURRENT_NAME); // 若是該文件存在,則把它加入該任務的輸入路徑中 if (FileSystem.get(job).exists(current)) { FileInputFormat.addInputPath(job, current); }
// 設置任務參數 job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(CrawlDbFilter.class); FileOutputFormat.setOutputPath(job, newCrawlDb); job.setOutputFormat(MapFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class);
// 返回任務 return job; }
能夠看出,該合併任務的輸出是個Map類型的文件,和以前的「CrawlDb"(若是存在)類型是相同的。該任務的Mapper類爲「CrawlDbFilter」。其實現和以前的「InjectMapper」類似,這裏略去不提。該任務的Reducer類爲「InjectReducer」,它是合併的具體實施者。咱們來看一下它的實現。
public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
boolean oldSet = false; while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { // 新URL injected.set(val); injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); } else { // 原「CrawlDb"中已有的URL old.set(val); oldSet = true; } }
CrawlDatum res = null; if (oldSet) { // 不覆蓋已有的URL res = old; } else { res = injected; }
// 保存結果 output.collect(key, res); }這兩個M/R任務執行後,系統中存在一新一舊兩個"CrawlDb"文件(若是以前已經存在"CrawlDb")。接下來咱們須要把兩個文件互換。相應的函數爲CrawlDb.install(mergeJob, crawlDb)。咱們來看一下它的實現:
public static void install(JobConf job, Path crawlDb) throws IOException {
// 得到新"CrawlDb"路徑 Path newCrawlDb = FileOutputFormat.getOutputPath(job); FileSystem fs = new JobClient(job).getFs(); Path old = new Path(crawlDb, "old"); // 得到當前(舊)「CrawlDb"路徑 Path current = new Path(crawlDb, CURRENT_NAME);
// 若是當前已經存在「CrawlDb",則把其重命名爲」old"。 if (fs.exists(current)) { fs.rename(current, old); }
// 把新的"CrawlDb"重命名爲當前「CrawDb" fs.rename(newCrawlDb, current);
// 刪除"old" if (fs.exists(old)) { fs.delete(old, true); } }在替換以後,還剩下最後一步,即把以前的臨時文件刪除。至此,「Injection」的使命就完成了。