經過「nutch與起點R3集成之筆記(1、2、三)」中的步驟,咱們能夠創建起一個行業內部網的搜索引擎,但搜索引擎一個最重要的功能,就是必須能搜索到網絡中最新的內容。這就要求nutch能及時採集到網絡中的最新網頁信息,同時將這些新採集到的信息更新到solr索引庫中。故本篇介紹如何實現索引的更新和去重問題。java
咱們隨時可使用nutch的crawl命令來爬行網站,例如,今天上午8:00採集一次新浪網站http://www.sina.com.cn,並經過nutch 的solrindex索引到solr索引庫中,而後上午10:00再採集一次新浪網站,再經過solrindex索引到solr庫中。這樣作,能夠保證用戶能搜索到網絡中較新的信息。在solr中實現沒有什麼問題,由於solr使用的是update方式更新索引庫,由於索引field名爲id的是主鍵,只要id是惟一的,update到索引庫的操做就是成功的。 c++
咱們回過頭來看看 《nutch與起點R3集成之筆記(二)》,在該篇中,咱們是定義了用digest做爲id,而沒有采用在nutch的conf 的solrconfig.xml中定義用url做爲id。其緣由在於,若是用url做爲id,在起點R3中會出現看不到索引數據等一些問題,我分析了好久,估計是因爲url中會出現: / ? & 字符,這些字符不經過escape轉換,在solr的q串提交會出現問題。web
咱們來看一個solr的查詢日誌:算法
[search] webapp=null path=/select params={hl.snippets=3&q=id%3A20679dc38f64730579a1b2538727f76f&hl.simple.pre=%3Cfont+color%3D%27red%27%3E&hl.simple.post=%3C%2Ffont%3E&hl.fl=title&hl.fl=text&hl.usePhraseHighlighter=false&hl=true} hits=1 status=0 QTime=2
其中:q=id%3A20679dc38f64730579a1b2538727f76f 是表示指定ID的查詢串爲 20679dc38f64730579a1b2538727f76f 的查詢,若是用url做爲id,而url也存在&字符,好比這樣會致使將url中&後接內容做爲查詢參數,會致使不能正確出現查詢結果。因此,在nutch向solr進行索引時,用url定義爲ID是不合適的。shell
nutch中digest是對採集的每個網頁內容的32位哈希值,若是兩個網頁內容徹底同樣,它們的digest值確定會同樣,但哪怕其中之一多或少一個空格,它們的digest值就會不同。因此,我認爲,用digest作id是一個很是不錯的選擇。apache
若是nutch在兩次不一樣的時間抓某個網頁,例如還有新浪首頁http://www.sina.com.cn,若是在兩次抓取這段時間,首頁沒有什麼變化,nutch計算出的兩次抓取的首頁的digest確定是同樣的,這樣,nutch在第二次向solr進行索引時,發現digest在solr的索引庫中已存在(id是惟一的),天然就寫不進去。這樣,避免了索引庫的重複記錄。網絡
同時,採用digest做爲Id,也避免了一個行業內部網有鏡像網站在solr索引庫的重複記錄。app
總之,採用digest做爲Id,防止了nutch在向solr索引庫寫入時寫入重複記錄的問題,有自然去重功能。eclipse
可是,還必須解決一個問題:若是nutch在兩次不一樣的時間抓某個網頁,若是這個網頁有變化,兩次的digest值確定不一樣,這樣,nutch在向solr索引庫update時,會將同一個url(如http://www.sina.com.cn)寫入兩條記錄。那麼,在搜索界面鍵入:新浪,確定會出現兩條新浪網站數據。如此類推,nutch採集了同一url網頁 n 次,就有可能在solr索引庫中有n條類似的記錄。這樣的狀況出現讓我很抓狂,陷於困局之中。webapp
其實,這就是對同一個url進行去重的問題。nutch有一個去重的模塊org.apache.nutch.indexer.solr.SolrDeleteDuplicates.java,命令爲: bin/nutch solrdedup,分析發現,它是對digest相同的記錄進行去重。估計是nutch把url做爲solr的主鍵id,因此沒有考慮到對url相同的記錄進行去重。代碼以下:
package org.apache.nutch.indexer.solr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.MalformedURLException; import java.text.SimpleDateFormat; import java.util.Iterator; import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; /** * Utility class for deleting duplicate documents from a solr index. * * The algorithm goes like follows: * * Preparation: * <ol> * <li>Query the solr server for the number of documents (say, N)</li> * <li>Partition N among M map tasks. For example, if we have two map tasks * the first map task will deal with solr documents from 0 - (N / 2 - 1) and * the second will deal with documents from (N / 2) to (N - 1).</li> * </ol> * * MapReduce: * <ul> * <li>Map: Identity map where keys are digests and values are {@link SolrRecord} * instances(which contain id, boost and timestamp)</li> * <li>Reduce: After map, {@link SolrRecord}s with the same digest will be * grouped together. Now, of these documents with the same digests, delete * all of them except the one with the highest score (boost field). If two * (or more) documents have the same score, then the document with the latest * timestamp is kept. Again, every other is deleted from solr index. * </li> * </ul> * * Note that unlike {@link DeleteDuplicates} we assume that two documents in * a solr index will never have the same URL. So this class only deals with * documents with <b>different</b> URLs but the same digest. */ public class SolrDeleteDuplicates implements Reducer<Text, SolrDeleteDuplicates.SolrRecord, Text, SolrDeleteDuplicates.SolrRecord>, Tool { public static final Log LOG = LogFactory.getLog(SolrDeleteDuplicates.class); private static final String SOLR_GET_ALL_QUERY = SolrConstants.ID_FIELD + ":[* TO *]"; private static final int NUM_MAX_DELETE_REQUEST = 1000; public static class SolrRecord implements Writable { private float boost; private long tstamp; private String id; public SolrRecord() { } public SolrRecord(SolrRecord old) { this.id = old.id; this.boost = old.boost; this.tstamp = old.tstamp; } public SolrRecord(String id, float boost, long tstamp) { this.id = id; this.boost = boost; this.tstamp = tstamp; } public String getId() { return id; } public float getBoost() { return boost; } public long getTstamp() { return tstamp; } public void readSolrDocument(SolrDocument doc) { id = (String)doc.getFieldValue(SolrConstants.ID_FIELD); boost = (Float)doc.getFieldValue(SolrConstants.BOOST_FIELD); Date buffer = (Date)doc.getFieldValue(SolrConstants.TIMESTAMP_FIELD); tstamp = buffer.getTime(); } public void readFields(DataInput in) throws IOException { id = Text.readString(in); boost = in.readFloat(); tstamp = in.readLong(); } public void write(DataOutput out) throws IOException { Text.writeString(out, id); out.writeFloat(boost); out.writeLong(tstamp); } } public static class SolrInputSplit implements InputSplit { private int docBegin; private int numDocs; public SolrInputSplit() { } public SolrInputSplit(int docBegin, int numDocs) { this.docBegin = docBegin; this.numDocs = numDocs; } public int getDocBegin() { return docBegin; } public int getNumDocs() { return numDocs; } public long getLength() throws IOException { return numDocs; } public String[] getLocations() throws IOException { return new String[] {} ; } public void readFields(DataInput in) throws IOException { docBegin = in.readInt(); numDocs = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(docBegin); out.writeInt(numDocs); } } public static class SolrInputFormat implements InputFormat<Text, SolrRecord> { /** Return each index as a split. */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { SolrServer solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL)); final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD); solrQuery.setRows(1); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } int numResults = (int)response.getResults().getNumFound(); int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; SolrInputSplit[] splits = new SolrInputSplit[numSplits]; for (int i = 0; i < numSplits - 1; i++) { splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc); return splits; } public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split, final JobConf job, Reporter reporter) throws IOException { SolrServer solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL)); SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs(); SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } final SolrDocumentList solrDocs = response.getResults(); return new RecordReader<Text, SolrRecord>() { private int currentDoc = 0; public void close() throws IOException { } public Text createKey() { return new Text(); } public SolrRecord createValue() { return new SolrRecord(); } public long getPos() throws IOException { return currentDoc; } public float getProgress() throws IOException { return currentDoc / (float) numDocs; } public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) { return false; } SolrDocument doc = solrDocs.get(currentDoc); String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD); key.set(digest); value.readSolrDocument(doc); currentDoc++; return true; } }; } } private Configuration conf; private SolrServer solr; private int numDeletes = 0; private UpdateRequest updateRequest = new UpdateRequest(); public Configuration getConf() { return conf; } public void setConf(Configuration conf) { this.conf = conf; } public void configure(JobConf job) { try { solr = new CommonsHttpSolrServer(job.get(SolrConstants.SERVER_URL)); } catch (MalformedURLException e) { throw new RuntimeException(e); } } public void close() throws IOException { try { if (numDeletes > 0) { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); solr.commit(); } } catch (SolrServerException e) { throw new IOException(e); } } public void reduce(Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output, Reporter reporter) throws IOException { SolrRecord recordToKeep = new SolrRecord(values.next()); while (values.hasNext()) { SolrRecord solrRecord = values.next(); if (solrRecord.getBoost() > recordToKeep.getBoost() || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord.getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = new SolrRecord(solrRecord); } else { updateRequest.deleteById(solrRecord.id); } numDeletes++; if (numDeletes >= NUM_MAX_DELETE_REQUEST) { try { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); } catch (SolrServerException e) { throw new IOException(e); } updateRequest = new UpdateRequest(); numDeletes = 0; } } } public void dedup(String solrUrl) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start)); LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl); JobConf job = new NutchJob(getConf()); job.set(SolrConstants.SERVER_URL, solrUrl); job.setInputFormat(SolrInputFormat.class); job.setOutputFormat(NullOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SolrRecord.class); job.setMapperClass(IdentityMapper.class); job.setReducerClass(SolrDeleteDuplicates.class); JobClient.runJob(job); long end = System.currentTimeMillis(); LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); } public int run(String[] args) throws IOException { if (args.length != 1) { System.err.println("Usage: SolrDeleteDuplicates <solr url>"); return 1; } dedup(args[0]); return 0; } public static void main(String[] args) throws Exception { int result = ToolRunner.run(NutchConfiguration.create(), new SolrDeleteDuplicates(), args); System.exit(result); } }
在上面MAPReduce註釋說明中,談到了如何在分佈式服務Hadoop的MAPReduce中 實現刪除相同的digest算法以下:
1.將相同的digest文檔放在一組,而後只留下最高分(boost值)文檔,將其它相同的digest文檔刪除掉(去重);
2.若是最高分(boost值)有兩個以上的文檔,則將最新的時戳(timestamp)的文檔留下,將其他的刪除掉。
分析代碼發現, 只須要將代碼中的有兩個地方的DIGEST_FIELD替換爲URL_FIELD ,就能夠實現對相同的url去重。修改後,在eclipse中編譯,找到主函數org.apache.nutch.indexer.solr.SolrDeleteDuplicates,創建起SolrDeleteDuplicates運行程序:
其對應的自變量設置爲:
點擊「運行」,在eclipse控制檯上出現運行結果:
表示url去重成功。
總結以下:本文提出,在nutch造成solr索引時,採用digest做爲ID,避免了nutch採用url做爲ID出現的查詢不到結果的問題。經過修改nutch去重模塊org.apache.nutch.indexer.solr.SolrDeleteDuplicates.java,實現了對url的去重和更新。