Nutch中Hadoop的應用之Injector

這學期「體系結構」的大做業是讓寫一個搜索引擎。因此前一段時間就簡單看了下Nutch。這篇文章不會涉及到搜索引擎的細節,而是側重於Hadoop在Nutch中的應用。java

Hadoop在Nutch中是充當了數據的存儲和MapReduce任務執行的角色。數據庫

咱們首先關注下頁面爬取的過程。在Nutch中有個叫作「CrawlDb「的數據庫,它是用來存儲全部網絡爬蟲已知的URL地址,以及該地址的相應狀態信息的的。其中狀態信息包括爬取狀態,上次爬取的時間,爬取間隔等。在Nutch中,該數據庫使用Hadoop的Map文件形式存儲。具體格式爲<Text, CrawlDatum>。其中的key表示該URL的地址,而value則是Nutch本身定義的一個類型「CrawlDatum」,該類型實現了"Writable"接口,用來表示頁面的一些屬性。apache

「CrawlDatum"的類的具體形式以下:網絡

package org.apache.nutch.crawl;

public class CrawlDatum implements WritableComparable<CrawlDatum>, Cloneable {
    private byte status;
    private long fetchTime = System.currentTimeMillis();
    private byte retries;
    private int fetchInterval;
    private float score = 1.0f;
    private byte[] signature = null;
    private long modifiedTime;
    private org.apache.hadoop.io.MapWritable metaData;
}

其中包括了頁面的狀態,抓取時間,抓取間隔等屬性。app

搜索引擎最初開始工做時,「CrawlDb」的內容是從一個簡單的文本文件中獲取的。該文本文件包含了一系列初始的URL地址。這個過程叫作「Injector」。若是"CrawlDb"已經存在,這時再調用「Injector"會把"CrawlDb"中原始的數據和提供的文本文件中的URL數據合併。dom

在Nutch中」Injector「實現分爲4步,其中前兩步爲兩個MapReduce任務。函數

第一個M/R任務是用來把文本文件中的URL進行過濾,規範化,包裝成「CrawlDatum」,並最終存成臨時的Sequence類型文件。該臨時文件的格式爲<Key, CrawlDatum>。oop

第二個M/R任務是用來把排序後的結果和原「CrawlDb"中的數據(若是存在)進行合併,生成新的CrawlDb文件。fetch

第三步,用新生成的CrawlDb來替換原有的文件。搜索引擎

第四步,刪除以前產生的臨時文件。

下面咱們來看一下它的實現過程的代碼。

public void inject(Path crawlDb, Path urlDir) throws IOException {
// 首先生成一個臨時路徑,來存放中間結果。
        Path tempDir =
                new Path(getConf().get("mapred.temp.dir", ".")
                + "/inject-temp-"
                + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// 建立排序任務
        JobConf sortJob = new NutchJob(getConf());
        // 設置相關屬性
        FileInputFormat.addInputPath(sortJob, urlDir);
        sortJob.setMapperClass(InjectMapper.class);
        FileOutputFormat.setOutputPath(sortJob, tempDir);
        sortJob.setOutputFormat(SequenceFileOutputFormat.class);
        sortJob.setOutputKeyClass(Text.class);
        sortJob.setOutputValueClass(CrawlDatum.class);
        // 運行任務
        JobClient.runJob(sortJob);
// 建立合併任務
        JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
        // 設置屬性
        FileInputFormat.addInputPath(mergeJob, tempDir);
        mergeJob.setReducerClass(InjectReducer.class);
        // 運行任務
        JobClient.runJob(mergeJob);
// 替換
        CrawlDb.install(mergeJob, crawlDb);
// 刪除臨時文件
        FileSystem fs = FileSystem.get(getConf());
        fs.delete(tempDir, true);
    }

 其中第一個任務的Mapper類爲「InjectMapper」。咱們來看一下它的具體實現。

public void map(WritableComparable key, Text value,
                OutputCollector<Text, CrawlDatum> output, Reporter reporter)
                throws IOException {
// 得到一行URL地址 
            String url = value.toString();   
try {
                // 規範化和過濾URL
                url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
                url = filters.filter(url);
            } catch (Exception e) {
                url = null;
            }
// 保存結果
            if (url != null) {                         
                value.set(url);
                CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, customInterval);
                datum.setFetchTime(curTime);
                output.collect(value, datum);
            }
    }

該M/R任務執行後,以前的文本文件會被轉化成一個臨時的Sequence文件,其格式爲<Text, CrawlDatum>。

接下來就是合併任務的執行。該合併任務是經過調用「CrawlDb.createJob()」方法來建立的,因此咱們首先來看一下該方法。

public static JobConf createJob(Configuration config, Path crawlDb)
            throws IOException {
// 爲即將生成的新"CrawlDb"產生一個臨時的隨機路徑
        Path newCrawlDb =
                new Path(crawlDb,
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// 建立任務
        JobConf job = new NutchJob(config);
        // 得到已有的「CrawlDb"的路徑
        Path current = new Path(crawlDb, CURRENT_NAME);
        // 若是該文件存在,則把它加入該任務的輸入路徑中
        if (FileSystem.get(job).exists(current)) {
            FileInputFormat.addInputPath(job, current);
        }
// 設置任務參數
        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapperClass(CrawlDbFilter.class);
        FileOutputFormat.setOutputPath(job, newCrawlDb);
        job.setOutputFormat(MapFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CrawlDatum.class);
// 返回任務
        return job;
    }

能夠看出,該合併任務的輸出是個Map類型的文件,和以前的「CrawlDb"(若是存在)類型是相同的。該任務的Mapper類爲「CrawlDbFilter」。其實現和以前的「InjectMapper」類似,這裏略去不提。該任務的Reducer類爲「InjectReducer」,它是合併的具體實施者。咱們來看一下它的實現。

public void reduce(Text key, Iterator<CrawlDatum> values,
                OutputCollector<Text, CrawlDatum> output, Reporter reporter)
                throws IOException {
boolean oldSet = false;
            while (values.hasNext()) {
                CrawlDatum val = values.next();
                if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
                    // 新URL
                    injected.set(val);
                    injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
                } else {
                    // 原「CrawlDb"中已有的URL
                    old.set(val);
                    oldSet = true;
                }
            }
CrawlDatum res = null;
            if (oldSet) {
                // 不覆蓋已有的URL
                res = old;
            } else {
                res = injected;
            }
// 保存結果
            output.collect(key, res);
    }
這兩個M/R任務執行後,系統中存在一新一舊兩個"CrawlDb"文件(若是以前已經存在"CrawlDb")。接下來咱們須要把兩個文件互換。相應的函數爲CrawlDb.install(mergeJob, crawlDb)。咱們來看一下它的實現:

public static void install(JobConf job, Path crawlDb) throws IOException {
// 得到新"CrawlDb"路徑
        Path newCrawlDb = FileOutputFormat.getOutputPath(job);
        FileSystem fs = new JobClient(job).getFs();
        Path old = new Path(crawlDb, "old");
        // 得到當前(舊)「CrawlDb"路徑
        Path current = new Path(crawlDb, CURRENT_NAME);
// 若是當前已經存在「CrawlDb",則把其重命名爲」old"。
        if (fs.exists(current)) {
            fs.rename(current, old);
        }
// 把新的"CrawlDb"重命名爲當前「CrawDb"
        fs.rename(newCrawlDb, current);
// 刪除"old"
        if (fs.exists(old)) {
            fs.delete(old, true);
        }
    }
在替換以後,還剩下最後一步,即把以前的臨時文件刪除。至此,「Injection」的使命就完成了。
相關文章
相關標籤/搜索