Frontier是Heritrix最核心的組成部分之一,也是最複雜的組成部分.它主要功能是爲處理連接的線程提供URL,並負責連接處理完成後的一些後續調度操做.而且爲了提升效率,它在內部使用了Berkeley DB.本節將對它的內部機理進行詳細解剖.
在Heritrix的官方文檔上有一個Frontier的例子,雖然很簡單,可是它卻解釋Frontier實現的基本原理.在這裏就不討論,有興趣的讀者能夠參考相應文檔.可是不得不提它的三個核心方法:
(1)next(int timeout):爲處理線程提供一個連接.Heritrix的全部處理線程(ToeThread)都是經過調用該方法獲取連接的.
(2)schedule(CandidateURI caURI):調度待處理的連接.
(3)finished(CrawlURI cURI):完成一個已處理的連接.
總體結構以下:
BdbMultipleWorkQueues:
它是對Berkeley DB的簡單封裝.在內部有一個Berkeley Database,存放全部待處理的連接.
Code
package org.archive.crawler.frontier;
public class BdbMultipleWorkQueues
{
//存放全部待處理的URL的數據庫
private Database pendingUrisDB = null;
//由key獲取一個連接
public CrawlURI get(DatabaseEntry headKey)
throws DatabaseException {
DatabaseEntry result = new DatabaseEntry();
// From Linda Lee of sleepycat:
// "You want to check the status returned from Cursor.getSearchKeyRange
// to make sure that you have OperationStatus.SUCCESS. In that case,
// you have found a valid data record, and result.getData()
// (called by internally by the binding code, in this case) will be
// non-null. The other possible status return is
// OperationStatus.NOTFOUND, in which case no data record matched
// the criteria. "
//由key獲取相應的連接
OperationStatus status = getNextNearestItem(headKey, result);
CrawlURI retVal = null;
if (status != OperationStatus.SUCCESS) {
LOGGER.severe("See '1219854 NPE je-2.0 "
+ "entryToObject'. OperationStatus "
+ " was not SUCCESS: "
+ status
+ ", headKey "
+ BdbWorkQueue.getPrefixClassKey(headKey.getData()));
return null;
}
try {
retVal = (CrawlURI)crawlUriBinding.entryToObject(result);
} catch (RuntimeExceptionWrapper rw) {
LOGGER.log(
Level.SEVERE,
"expected object missing in queue " +
BdbWorkQueue.getPrefixClassKey(headKey.getData()),
rw);
return null;
}
retVal.setHolderKey(headKey);
return retVal;//返回連接
}
//從等處理列表獲取一個連接
protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
DatabaseEntry result) throws DatabaseException {
Cursor cursor = null;
OperationStatus status;
try {
//打開遊標
cursor = this.pendingUrisDB.openCursor(null, null);
// get cap; headKey at this point should always point to
// a queue-beginning cap entry (zero-length value)
status = cursor.getSearchKey(headKey, result, null);
if(status!=OperationStatus.SUCCESS || result.getData().length > 0) {
// cap missing
throw new DatabaseException("bdb queue cap missing");
}
// get next item (real first item of queue)
status = cursor.getNext(headKey,result,null);
} finally {
if(cursor!=null) {
cursor.close();
}
}
return status;
}
/**
* Put the given CrawlURI in at the appropriate place.
* 添加URL到數據庫
* @param curi
* @throws DatabaseException
*/
public void put(CrawlURI curi, boolean overwriteIfPresent)
throws DatabaseException {
DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();
if (insertKey == null) {
insertKey = calculateInsertKey(curi);
curi.setHolderKey(insertKey);
}
DatabaseEntry value = new DatabaseEntry();
crawlUriBinding.objectToEntry(curi, value);
// Output tally on avg. size if level is FINE or greater.
if (LOGGER.isLoggable(Level.FINE)) {
tallyAverageEntrySize(curi, value);
}
OperationStatus status;
if(overwriteIfPresent) {
//添加
status = pendingUrisDB.put(null, insertKey, value);
} else {
status = pendingUrisDB.putNoOverwrite(null, insertKey, value);
}
if(status!=OperationStatus.SUCCESS) {
LOGGER.severe("failed; "+status+ " "+curi);
}
}
}
BdbWorkQueue:
表明一個連接隊列,該隊列中全部的連接都具備相同的鍵值.它其實是經過調用BdbMultipleWorkQueues的get方法從等處理連接數據庫中取得一個連接的.
Code
package org.archive.crawler.frontier;
public class BdbWorkQueue extends WorkQueue
implements Comparable, Serializabl
{
//獲取一個URL
protected CrawlURI peekItem(final WorkQueueFrontier frontier)
throws IOException {
/**
* 關鍵:從BdbFrontier中返回pendingUris
*/
final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
.getWorkQueues();
DatabaseEntry key = new DatabaseEntry(origin);
CrawlURI curi = null;
int tries = 1;
while(true) {
try {
//獲取連接
curi = queues.get(key);
} catch (DatabaseException e) {
LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);
}
return curi;
}
}
WorkQueueFrontier:
實現了最核心的三個方法.
Code
public CrawlURI next()
throws InterruptedException, EndedException {
while (true) { // loop left only by explicit return or exception
long now = System.currentTimeMillis();
// Do common checks for pause, terminate, bandwidth-hold
preNext(now);
synchronized(readyClassQueues) {
int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();
while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {
activateInactiveQueue();
activationsNeeded--;
}
}
WorkQueue readyQ = null;
Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);
if (key != null) {
readyQ = (WorkQueue)this.allQueues.get(key);
}
if (readyQ != null) {
while(true) { // loop left by explicit return or break on empty
CrawlURI curi = null;
synchronized(readyQ) {
/**取出一個URL,最終從子類BdbFrontier的
* pendingUris中取出一個連接
*/
curi = readyQ.peek(this);
if (curi != null) {
// check if curi belongs in different queue
String currentQueueKey = getClassKey(curi);
if (currentQueueKey.equals(curi.getClassKey())) {
// curi was in right queue, emit
noteAboutToEmit(curi, readyQ);
//加入正在處理隊列中
inProcessQueues.add(readyQ);
return curi; //返回
}
// URI's assigned queue has changed since it
// was queued (eg because its IP has become
// known). Requeue to new queue.
curi.setClassKey(currentQueueKey);
readyQ.dequeue(this);//出隊列
decrementQueuedCount(1);
curi.setHolderKey(null);
// curi will be requeued to true queue after lock
// on readyQ is released, to prevent deadlock
} else {
// readyQ is empty and ready: it's exhausted
// release held status, allowing any subsequent
// enqueues to again put queue in ready
readyQ.clearHeld();
break;
}
}
if(curi!=null) {
// complete the requeuing begun earlier
sendToQueue(curi);
}
}
} else {
// ReadyQ key wasn't in all queues: unexpected
if (key != null) {
logger.severe("Key "+ key +
" in readyClassQueues but not allQueues");
}
}
if(shouldTerminate) {
// skip subsequent steps if already on last legs
throw new EndedException("shouldTerminate is true");
}
if(inProcessQueues.size()==0) {
// Nothing was ready or in progress or imminent to wake; ensure
// any piled-up pending-scheduled URIs are considered
this.alreadyIncluded.requestFlush();
}
}
}
//將URL加入待處理隊列
public void schedule(CandidateURI caUri) {
// Canonicalization may set forceFetch flag. See
// #canonicalization(CandidateURI) javadoc for circumstance.
String canon = canonicalize(caUri);
if (caUri.forceFetch()) {
alreadyIncluded.addForce(canon, caUri);
} else {
alreadyIncluded.add(canon, caUri);
}
}
BdbFrontier:
繼承了WorkQueueFrontier,是Heritrix惟一個具備實際意義的連接工廠.
Code
package org.archive.crawler.frontier;
public class BdbFrontier extends WorkQueueFrontier implements Serializable
{
/** 全部待抓取的連接*/
protected transient BdbMultipleWorkQueues pendingUris;
//初始化pendingUris,父類爲抽象方法
protected void initQueue() throws IOException {
try {
this.pendingUris = createMultipleWorkQueues();
} catch(DatabaseException e) {
throw (IOException)new IOException(e.getMessage()).initCause(e);
}
}
private BdbMultipleWorkQueues createMultipleWorkQueues()
throws DatabaseException {
return new BdbMultipleWorkQueues(this.controller.getBdbEnvironment(),
this.controller.getBdbEnvironment().getClassCatalog(),
this.controller.isCheckpointRecover());
}
protected BdbMultipleWorkQueues getWorkQueues() {
return pendingUris;
}
}
BdbUriUniqFilter:
其實是一個過濾器,它用來檢查一個要進入等待隊列的連接是否已經被抓取過.
Code
//添加URL
protected boolean setAdd(CharSequence uri) {
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(createKey(uri), key);
long started = 0;
OperationStatus status = null;
try {
if (logger.isLoggable(Level.INFO)) {
started = System.currentTimeMillis();
}
//添加到數據庫
status = alreadySeen.putNoOverwrite(null, key, ZERO_LENGTH_ENTRY);
if (logger.isLoggable(Level.INFO)) {
aggregatedLookupTime +=
(System.currentTimeMillis() - started);
}
} catch (DatabaseException e) {
logger.severe(e.getMessage());
}
if (status == OperationStatus.SUCCESS) {
count++;
if (logger.isLoggable(Level.INFO)) {
final int logAt = 10000;
if (count > 0 && ((count % logAt) == 0)) {
logger.info("Average lookup " +
(aggregatedLookupTime / logAt) + "ms.");
aggregatedLookupTime = 0;
}
}
}
//若是存在,返回false
if(status == OperationStatus.KEYEXIST) {
return false; // not added
} else {
return true; } }