本節解析與處理器有關的內容.
與處理器有關的主要在如下幾個類:Processor(處理器類),ProcessorChain(處理器類),ProcessorChainList(處理器鏈列表).它們之間的關係以下:
下面將解析該圖.
(1)Processor
表明一個處理器.
Code
package org.archive.crawler.framework;
public class Processor extends ModuleType {
//默認的下一個處理器
private Processor defaultNextProcessor = null;
/**
* Perform processing on the given CrawlURI.
* 處理一個連接
* @param curi
* @throws InterruptedException
*/
public final void process(CrawlURI curi) throws InterruptedException {
// by default, arrange for curi to proceed to next processor
//設置當前處理器的下一個處理器
curi.setNextProcessor(getDefaultNextProcessor(curi));
// Check if this processor is enabled before processing
try {
if (!((Boolean) getAttribute(ATTR_ENABLED, curi)).booleanValue()) {
return;
}
} catch (AttributeNotFoundException e) {
logger.severe(e.getMessage());
}
if(rulesAccept(curi)) {
innerProcess(curi); //留給子類實現
} else {
innerRejectProcess(curi);
}
}
Code
(2)ProcessorChain
該類實際上實現一個隊列的功能,它表明一個由許多處理器鏈接的處理器鏈.
Code
package org.archive.crawler.framework;
public class ProcessorChain {
//存放當前處理鏈中全部的處理器
private final MapType processorMap;
//下一個處理器鏈
private ProcessorChain nextChain;
//處理器鏈的第一個處理器
private Processor firstProcessor;
/** Construct a new processor chain.
* 把該處理鏈的全部的處理器鏈接起來
* @param processorMap a map of the processors belonging to this chain.
*/
public ProcessorChain(MapType processorMap) {
this.processorMap = processorMap;
Processor previous = null;
for (Iterator it = processorMap.iterator(null); it.hasNext();) {
Processor p = (Processor) it.next();
if (previous == null) {
firstProcessor = p;
} else {
//設置前一個處理器的下一個處理器爲當前處理器
previous.setDefaultNextProcessor(p);
}
logger.info(
"Processor: " + p.getName() + " --> " + p.getClass().getName());
//當前處理器設置爲前一個處理器
previous = p;
}
}
/** Set the processor chain that the URI should be working through after
* finishing this one.
* 設置下一個處理器
* @param nextProcessorChain the chain that should be processed after this
* one.
*/
public void setNextChain(ProcessorChain nextProcessorChain) {
this.nextChain = nextProcessorChain;
}
/** Get the processor chain that the URI should be working through after
* finishing this one.
*
* @return the next processor chain.
*/
public ProcessorChain getNextProcessorChain() {
return nextChain;
}
/** Get the first processor in the chain.
* 獲取第一個處理器
* @return the first processor in the chain.
*/
public Processor getFirstProcessor() {
return firstProcessor;
}
Code
(3)ProcessorChainList
該類是保存一次抓取任務的全部的處理器鏈(ProcessorChain).
Code
package org.archive.crawler.framework;
public class ProcessorChainList {
//處理器鏈列表,保存全部的處理器鏈
private List<ProcessorChain> chainList = new ArrayList<ProcessorChain>();
//全部的處理器
private Map<String,ProcessorChain> chainMap
= new HashMap<String,ProcessorChain>();
/** Add a new chain of processors to the chain list.
* 將全部的處理器鏈添加到Map中
* This method takes a map of processors and wraps it in a ProcessorChain
* object and adds it to the list of chains.
*
* @param processorMap the processor map to be added.
*/
public void addProcessorMap(String name, MapType processorMap) {
//由MapType生成一個處理器鏈
ProcessorChain processorChain = new ProcessorChain(processorMap);
ProcessorChain previousChain = getLastChain();
if (previousChain != null) {
//設置下一個處理器鏈
previousChain.setNextChain(processorChain);
}
chainList.add(processorChain);
chainMap.put(name, processorChain);
}
/** Get the first processor chain.
* 獲取第一個處理鏈
* @return the first processor chain.
*/
public ProcessorChain getFirstChain() {
return (ProcessorChain) chainList.get(0);
}
(4)ToeThread
爲了高效抓取網頁,Heritrix採用了線程池的設計.每個線程將調用全部的處理器來處理連接.
Code
private void processCrawlUri() throws InterruptedException {
currentCuri.setThreadNumber(this.serialNumber);
//獲取第一個處理器鏈
currentCuri.setNextProcessorChain(controller.getFirstProcessorChain());
lastStartTime = System.currentTimeMillis();
// System.out.println(currentCuri);
try {
while (currentCuri.nextProcessorChain() != null) {
setStep(STEP_ABOUT_TO_BEGIN_CHAIN);
// Starting on a new processor chain.
//設置下一個處理器
currentCuri.setNextProcessor(currentCuri.nextProcessorChain().getFirstProcessor());
currentCuri.setNextProcessorChain(currentCuri.nextProcessorChain().getNextProcessorChain());
while (currentCuri.nextProcessor() != null) {
setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR);
Processor currentProcessor = getProcessor(currentCuri.nextProcessor());
currentProcessorName = currentProcessor.getName();
continueCheck();
// long memBefore = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
//調用處理器處理連接
currentProcessor.process(currentCuri);
// long memAfter = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;
// System.out.println((memAfter-memBefore)+"K in "+currentProcessorName);
}
}
setStep(STEP_DONE_WITH_PROCESSORS);
currentProcessorName = "";
} catch (RuntimeExceptionWrapper e) {
// Workaround to get cause from BDB
if(e.getCause() == null) {
e.initCause(e.getCause());
}
recoverableProblem(e);
} catch (AssertionError ae) {
// This risks leaving crawl in fatally inconsistent state,
// but is often reasonable for per-Processor assertion problems
recoverableProblem(ae);
} catch (RuntimeException e) {
recoverableProblem(e);
} catch (StackOverflowError err) {
recoverableProblem(err);
} catch (Error err) {
// OutOfMemory and any others
seriousError(err);
}
Code
(5)處理器鏈的初始化
全部的處理器鏈都是在CrawlController的initialize中初始化的.
Code
public void initialize(SettingsHandler sH)
{
//初始化了Scope、Frontier以及ProcessorChain
setupCrawlModules();
Code
Code
Code
Code
private void setupCrawlModules(){
//設置處理鏈
// Setup processors
if (processorChains == null) {
processorChains = new ProcessorChainList(order);
}
原始地址: