nutch與起點R3集成之筆記(四)

        經過「nutch與起點R3集成之筆記(1、2、三)」中的步驟,咱們能夠創建起一個行業內部網的搜索引擎,但搜索引擎一個最重要的功能,就是必須能搜索到網絡中最新的內容。這就要求nutch能及時採集到網絡中的最新網頁信息,同時將這些新採集到的信息更新到solr索引庫中。故本篇介紹如何實現索引的更新和去重問題。java

       咱們隨時可使用nutch的crawl命令來爬行網站,例如,今天上午8:00採集一次新浪網站http://www.sina.com.cn,並經過nutch 的solrindex索引到solr索引庫中,而後上午10:00再採集一次新浪網站,再經過solrindex索引到solr庫中。這樣作,能夠保證用戶能搜索到網絡中較新的信息。在solr中實現沒有什麼問題,由於solr使用的是update方式更新索引庫,由於索引field名爲id的是主鍵,只要id是惟一的,update到索引庫的操做就是成功的。   c++

       咱們回過頭來看看 《nutch與起點R3集成之筆記(二)》,在該篇中,咱們是定義了用digest做爲id,而沒有采用在nutch的conf 的solrconfig.xml中定義用url做爲id。其緣由在於,若是用url做爲id,在起點R3中會出現看不到索引數據等一些問題,我分析了好久,估計是因爲url中會出現: /  ?  & 字符,這些字符不經過escape轉換,在solr的q串提交會出現問題。web

       咱們來看一個solr的查詢日誌:算法

[search] webapp=null path=/select params={hl.snippets=3&q=id%3A20679dc38f64730579a1b2538727f76f&hl.simple.pre=%3Cfont+color%3D%27red%27%3E&hl.simple.post=%3C%2Ffont%3E&hl.fl=title&hl.fl=text&hl.usePhraseHighlighter=false&hl=true} hits=1 status=0 QTime=2

   其中:q=id%3A20679dc38f64730579a1b2538727f76f 是表示指定ID的查詢串爲 20679dc38f64730579a1b2538727f76f 的查詢,若是用url做爲id,而url也存在&字符,好比這樣會致使將url中&後接內容做爲查詢參數,會致使不能正確出現查詢結果。因此,在nutch向solr進行索引時,用url定義爲ID是不合適的。shell

       nutch中digest是對採集的每個網頁內容的32位哈希值,若是兩個網頁內容徹底同樣,它們的digest值確定會同樣,但哪怕其中之一多或少一個空格,它們的digest值就會不同。因此,我認爲,用digest作id是一個很是不錯的選擇。apache

        若是nutch在兩次不一樣的時間抓某個網頁,例如還有新浪首頁http://www.sina.com.cn,若是在兩次抓取這段時間,首頁沒有什麼變化,nutch計算出的兩次抓取的首頁的digest確定是同樣的,這樣,nutch在第二次向solr進行索引時,發現digest在solr的索引庫中已存在(id是惟一的),天然就寫不進去。這樣,避免了索引庫的重複記錄。網絡

       同時,採用digest做爲Id,也避免了一個行業內部網有鏡像網站在solr索引庫的重複記錄。app

       總之,採用digest做爲Id,防止了nutch在向solr索引庫寫入時寫入重複記錄的問題,有自然去重功能。eclipse

       可是,還必須解決一個問題:若是nutch在兩次不一樣的時間抓某個網頁,若是這個網頁有變化,兩次的digest值確定不一樣,這樣,nutch在向solr索引庫update時,會將同一個url(如http://www.sina.com.cn)寫入兩條記錄。那麼,在搜索界面鍵入:新浪,確定會出現兩條新浪網站數據。如此類推,nutch採集了同一url網頁 n 次,就有可能在solr索引庫中有n條類似的記錄。這樣的狀況出現讓我很抓狂,陷於困局之中。webapp

        其實,這就是對同一個url進行去重的問題。nutch有一個去重的模塊org.apache.nutch.indexer.solr.SolrDeleteDuplicates.java,命令爲: bin/nutch solrdedup,分析發現,它是對digest相同的記錄進行去重。估計是nutch把url做爲solr的主鍵id,因此沒有考慮到對url相同的記錄進行去重。代碼以下:

    

package org.apache.nutch.indexer.solr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;

/** 
 * Utility class for deleting duplicate documents from a solr index.
 *
 * The algorithm goes like follows:
 * 
 * Preparation:
 * <ol>
 * <li>Query the solr server for the number of documents (say, N)</li>
 * <li>Partition N among M map tasks. For example, if we have two map tasks
 * the first map task will deal with solr documents from 0 - (N / 2 - 1) and
 * the second will deal with documents from (N / 2) to (N - 1).</li>
 * </ol>
 * 
 * MapReduce:
 * <ul>
 * <li>Map: Identity map where keys are digests and values are {@link SolrRecord}
 * instances(which contain id, boost and timestamp)</li>
 * <li>Reduce: After map, {@link SolrRecord}s with the same digest will be
 * grouped together. Now, of these documents with the same digests, delete
 * all of them except the one with the highest score (boost field). If two
 * (or more) documents have the same score, then the document with the latest
 * timestamp is kept. Again, every other is deleted from solr index.
 * </li>
 * </ul>
 * 
 * Note that unlike {@link DeleteDuplicates} we assume that two documents in
 * a solr index will never have the same URL. So this class only deals with
 * documents with <b>different</b> URLs but the same digest. 
 */
public class SolrDeleteDuplicates
implements Reducer<Text, SolrDeleteDuplicates.SolrRecord, Text, SolrDeleteDuplicates.SolrRecord>,
Tool {

  public static final Log LOG = LogFactory.getLog(SolrDeleteDuplicates.class);

  private static final String SOLR_GET_ALL_QUERY = SolrConstants.ID_FIELD + ":[* TO *]";

  private static final int NUM_MAX_DELETE_REQUEST = 1000;

  public static class SolrRecord implements Writable {

    private float boost;
    private long tstamp;
    private String id;

    public SolrRecord() { }
    
    public SolrRecord(SolrRecord old) {
	this.id = old.id;
	this.boost = old.boost;
	this.tstamp = old.tstamp;
    }

    public SolrRecord(String id, float boost, long tstamp) {
      this.id = id;
      this.boost = boost;
      this.tstamp = tstamp;
    }

    public String getId() {
      return id;
    }

    public float getBoost() {
      return boost;
    }

    public long getTstamp() {
      return tstamp;
    }

    public void readSolrDocument(SolrDocument doc) {
      id = (String)doc.getFieldValue(SolrConstants.ID_FIELD);
      boost = (Float)doc.getFieldValue(SolrConstants.BOOST_FIELD);

      Date buffer = (Date)doc.getFieldValue(SolrConstants.TIMESTAMP_FIELD);
      tstamp = buffer.getTime();
    }

    public void readFields(DataInput in) throws IOException {
      id = Text.readString(in);
      boost = in.readFloat();
      tstamp = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
      Text.writeString(out, id);
      out.writeFloat(boost);
      out.writeLong(tstamp);
    } 
  }

  public static class SolrInputSplit implements InputSplit {

    private int docBegin;
    private int numDocs;

    public SolrInputSplit() { }

    public SolrInputSplit(int docBegin, int numDocs) {
      this.docBegin = docBegin;
      this.numDocs = numDocs;
    }

    public int getDocBegin() {
      return docBegin;
    }

    public int getNumDocs() {
      return numDocs;
    }

    public long getLength() throws IOException {
      return numDocs;
    }

    public String[] getLocations() throws IOException {
      return new String[] {} ;
    }

    public void readFields(DataInput in) throws IOException {
      docBegin = in.readInt();
      numDocs = in.readInt();
    }

    public void write(DataOutput out) throws IOException {
      out.writeInt(docBegin);
      out.writeInt(numDocs);
    }
  }

  public static class SolrInputFormat implements InputFormat<Text, SolrRecord> {

    /** Return each index as a split. */
    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
      SolrServer solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL));

      final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);
      solrQuery.setFields(SolrConstants.ID_FIELD);
      solrQuery.setRows(1);

      QueryResponse response;
      try {
        response = solr.query(solrQuery);
      } catch (final SolrServerException e) {
        throw new IOException(e);
      }

      int numResults = (int)response.getResults().getNumFound();
      int numDocsPerSplit = (numResults / numSplits); 
      int currentDoc = 0;
      SolrInputSplit[] splits = new SolrInputSplit[numSplits];
      for (int i = 0; i < numSplits - 1; i++) {
        splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit);
        currentDoc += numDocsPerSplit;
      }
      splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc);

      return splits;
    }

    public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split,
        final JobConf job, 
        Reporter reporter)
        throws IOException {

      SolrServer solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL));
      SolrInputSplit solrSplit = (SolrInputSplit) split;
      final int numDocs = solrSplit.getNumDocs();
      
      SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);
      solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD,
                          SolrConstants.TIMESTAMP_FIELD,
                          SolrConstants.DIGEST_FIELD);
      solrQuery.setStart(solrSplit.getDocBegin());
      solrQuery.setRows(numDocs);

      QueryResponse response;
      try {
        response = solr.query(solrQuery);
      } catch (final SolrServerException e) {
        throw new IOException(e);
      }

      final SolrDocumentList solrDocs = response.getResults();

      return new RecordReader<Text, SolrRecord>() {

        private int currentDoc = 0;

        public void close() throws IOException { }

        public Text createKey() {
          return new Text();
        }

        public SolrRecord createValue() {
          return new SolrRecord();
        }

        public long getPos() throws IOException {
          return currentDoc;
        }

        public float getProgress() throws IOException {
          return currentDoc / (float) numDocs;
        }

        public boolean next(Text key, SolrRecord value) throws IOException {
          if (currentDoc >= numDocs) {
            return false;
          }

          SolrDocument doc = solrDocs.get(currentDoc);
          String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD);
          key.set(digest);
          value.readSolrDocument(doc);

          currentDoc++;
          return true;
        }    
      };
    }
  }

  private Configuration conf;

  private SolrServer solr;

  private int numDeletes = 0;

  private UpdateRequest updateRequest = new UpdateRequest();

  public Configuration getConf() {
    return conf;
  }

  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  public void configure(JobConf job) {
    try {
      solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL));
    } catch (MalformedURLException e) {
      throw new RuntimeException(e);
    }
  }


  public void close() throws IOException {
    try {
      if (numDeletes > 0) {
        LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates");
        updateRequest.process(solr);
        solr.commit();
      }
    } catch (SolrServerException e) {
      throw new IOException(e);
    }
  }

  public void reduce(Text key, Iterator<SolrRecord> values,
      OutputCollector<Text, SolrRecord> output, Reporter reporter)
  throws IOException {
    SolrRecord recordToKeep = new SolrRecord(values.next());
    while (values.hasNext()) {
      SolrRecord solrRecord = values.next();
      if (solrRecord.getBoost() > recordToKeep.getBoost() ||
          (solrRecord.getBoost() == recordToKeep.getBoost() && 
              solrRecord.getTstamp() > recordToKeep.getTstamp())) {
        updateRequest.deleteById(recordToKeep.id);
        recordToKeep = new SolrRecord(solrRecord);
      } else {
        updateRequest.deleteById(solrRecord.id);
      }
      numDeletes++;
      if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
        try {
          LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates");
          updateRequest.process(solr);
        } catch (SolrServerException e) {
          throw new IOException(e);
        }
        updateRequest = new UpdateRequest();
        numDeletes = 0;
      }
    }
  }

  public void dedup(String solrUrl) throws IOException {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start));
    LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl);
    
    JobConf job = new NutchJob(getConf());

    job.set(SolrConstants.SERVER_URL, solrUrl);
    job.setInputFormat(SolrInputFormat.class);
    job.setOutputFormat(NullOutputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(SolrRecord.class);
    job.setMapperClass(IdentityMapper.class);
    job.setReducerClass(SolrDeleteDuplicates.class);

    JobClient.runJob(job);

    long end = System.currentTimeMillis();
    LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
  }

  public int run(String[] args) throws IOException {
    if (args.length != 1) {
      System.err.println("Usage: SolrDeleteDuplicates <solr url>");
      return 1;
    }

    dedup(args[0]);
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int result = ToolRunner.run(NutchConfiguration.create(),
        new SolrDeleteDuplicates(), args);
    System.exit(result);
  }

}

 在上面MAPReduce註釋說明中,談到了如何在分佈式服務Hadoop的MAPReduce中 實現刪除相同的digest算法以下:

1.將相同的digest文檔放在一組,而後只留下最高分(boost值)文檔,將其它相同的digest文檔刪除掉(去重);

2.若是最高分(boost值)有兩個以上的文檔,則將最新的時戳(timestamp)的文檔留下,將其他的刪除掉。

         分析代碼發現, 只須要將代碼中的有兩個地方的DIGEST_FIELD替換爲URL_FIELD ,就能夠實現對相同的url去重。修改後,在eclipse中編譯,找到主函數org.apache.nutch.indexer.solr.SolrDeleteDuplicates,創建起SolrDeleteDuplicates運行程序:

其對應的自變量設置爲:

點擊「運行」,在eclipse控制檯上出現運行結果:

    表示url去重成功。

       總結以下:本文提出,在nutch造成solr索引時,採用digest做爲ID,避免了nutch採用url做爲ID出現的查詢不到結果的問題。經過修改nutch去重模塊org.apache.nutch.indexer.solr.SolrDeleteDuplicates.java,實現了對url的去重和更新。

相關文章
相關標籤/搜索