走了一遍Inject和Generate,基本瞭解了nutch在執行爬取前的一些前期預熱工做,包括url的過濾、規則化、分值計算以及其與mapreduce的聯繫緊密性等,自我感受nutch的整個流程是很縝密的,起碼從前面兩個過程看是這樣的。
前期回顧:上一期主要是講解了nutch的第二個環節Generate,該環節主要完成獲取將要抓取的url列表,並寫入到segments目錄下,其中一些細節的處理包括每一個job提交前的輸入輸出以及執行的map和reducer類具體作了那些工做均可以參考上一篇。接下來的fetch部分感受應該是nutch的靈魂了,由於之前的nutch定位是搜索引擎,發展至今已演變爲爬蟲工具了。
這幾天在弄一個項目的基礎數據,都沒有好好的用心看nutch,中間試圖再次拜讀fetch這塊的代碼,發現這是一塊難啃的骨頭,網上的一些材料講的側重點也有所不一樣,可是爲了走完nutch,必須跨過這道坎。。。。。。下面開始吧~~~~
1.fetch的入口從Crawl類的fetcher.fetch(segs[0], threads);語句入手,其將segments和爬取的線程數做爲參數傳到fetch函數中,進入到fetch函數中,首先執行的是一個checkConfiguration函數,用於檢查http.agent.name和http.robot.nam是否有值,若是爲空則經過控制檯返回一些報錯信息等。後面就是一些變量的賦值和初始化,好比超時變量、抓取的最大深度、最多的連接個數等這些都是爲了後面抓取工做作準備的。後面但是初始化一個mapreduce的job,設置輸入爲:Generate階段生成的segments目錄下的crawl_generate,輸出爲:segments,要操做的map的類是:job.setMapRunnerClass(Fetcher.class);,經過job.runJob(job)提交job;
2.提交完job後開始執行Fetch的run方法:
public void run(RecordReader<Text, CrawlDatum> input,
OutputCollector<Text, NutchWritable> output,
Reporter reporter) throws IOException {
……
}
從run函數的參數就能夠看出輸入是text和crawlDatum封裝的類RecordReader,輸出爲text和nutchWritable封裝的類OutputCollector,固然了,進來仍是一通的設置各類參數、閾值等等。這裏值得一提的是對於爬取網頁這塊用的一個之前學操做系統中關於任務調度的經典案例——生產者與消費者案例。首先經過一行代碼:feeder = new QueueFeeder(input, fetchQueues, threadCount * queueDepthMuliplier);定義生產者隊列,其中的input就是輸入參數,fetchQueues是經過this.fetchQueues = new FetchItemQueues(getConf());獲得(默認是採起byHost模式,另外還有兩種byIP和byDomain),第三個參數也是讀取配置文件的默認值來的。這裏定義好生產者後,主要負責從Generate出來的crawldatum的信息,並把它們加入到共享隊列中去。(補充一下:關於FetchItemQueues、FetchItemQueue以及FetchItem之間的相互關係能夠經過查找源碼發現:
FetchItemQueues中包含的字段有:
public static final String DEFAULT_ID = "default";
Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
AtomicInteger totalSize = new AtomicInteger(0);
int maxThreads;
long crawlDelay;
long minCrawlDelay;
long timelimit = -1;
int maxExceptionsPerQueue = -1;
Configuration conf;
public static final String QUEUE_MODE_HOST = "byHost";
public static final String QUEUE_MODE_DOMAIN = "byDomain";
public static final String QUEUE_MODE_IP = "byIP";
String queueMode;
因而可知,這裏的map集合queues是將一個字符串與FetchItemQueue封裝後獲得的,那麼FetchItemQueue主要包括的字段有:
List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
AtomicLong nextFetchTime = new AtomicLong();
AtomicInteger exceptionCounter = new AtomicInteger();
long crawlDelay;
long minCrawlDelay;
int maxThreads;
Configuration conf;
同理,從這裏能夠看出,queue是有對象FetchItem封裝而來,而這裏的FetchItem主要包括如下字段:
int outlinkDepth = 0;
String queueID;
Text url;
URL u;
CrawlDatum datum;
至此,咱們大概清楚了從fetchitem->fetchitemqueue->fetchitemqueues的封裝關係了。
)
既然有了生產者生產產品了,那就應該有消費者來消費了(有需求就有市場,有市場也就有消費者)
3.消費者的產生源自代碼:
for (int i = 0; i < threadCount; i++) { // spawn threads
new FetcherThread(getConf()).start();
}
這樣就根據用戶設置的需求,生成指定個數threadCount個消費者。在這以前還有一些參數的設置好比超時、blocking等,該方法後面就是關於等待每一個線程(消費者)的結束以及每一個線程抓取了多少網頁是否成功抓取網頁的信息,後面再判斷生產者的抓取隊列是否已經被抓取完,若是是則輸出抓取隊列中的信息,另外還有個一判斷機制,判斷抓取的線程是否超時,若是超時則進入等待狀態。
4.這是整個生產者消費者的模型,形象並有效的反映與解決了抓取的隊列和線程之間的關係,下面還要着重看看消費者是如何取到抓取隊列中的url並進行抓取的,這時主要是經過new FetcherThread(getConf()).start(); 代碼進入到FetchThread的run方法。進入後首先就是執行:fit = fetchQueues.getFetchItem();主要是從以前存入抓取隊列中取出數據,緊隨其後就是判斷,取出的數據是否爲空,若是爲空則進一步判斷生產者是否存活或者抓取隊列中是否還有數據,若是有則等待,若是沒有則任務fetchItem已經處理完了,結束該線程(消費者)的爬取。固然,若是取得了fit不爲空,則經過代碼: Text reprUrlWritable =
(Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY); if (reprUrlWritable == null) {
reprUrl = fit.url.toString();
} else {
reprUrl = reprUrlWritable.toString();
}獲得其url,而後還要從該url的數據中分析出協議protocal(注意:該功能的實現是利用nutch的必殺技插件機制實現的,用到的是protocolFactory這個類,具體怎麼回事,有待研究^_^),稍後是判斷該url是否聽從RobotRules,若是不聽從則利用代碼:fetchQueues.finishFetchItem(fit, true);或者如其delayTime大於咱們配置的maxDelayTime,那就不抓取這個網頁將其從fetchQueues抓取隊列中除名。再往下執行比較核心的三行代碼:
ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);//利用協議得到響應的內容
ProtocolStatus status = output.getStatus();//得到狀態
Content content = output.getContent();//得到內容
5.再下面主要是對響應的相應狀態進行相應的處理:
(1):若是狀態爲WOULDBLOCK,執行:
case ProtocolStatus.WOULDBLOCK:
// retry ?
fetchQueues.addFetchItem(fit);
break;
即進行retry,把當前url添加到FetchItemQueues隊列中,進行重試
(2)若是狀態時SUCCESS,表示抓取到了頁面,緊接着就是執行: pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);進入到output這個方法後,咱們能夠看到首先是對於元數據的賦值,包括 datum.setStatus(status);
datum.setFetchTime(System.currentTimeMillis());datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);等,後面就是判斷若是fetch_success標記存在的話即表示抓取成功,則將執行對抓取到的頁面源碼進行解析parseResult = this.parseUtil.parse(content);
再後面就是表示寫文件output.collect(key, new NutchWritable(datum));
output.collect(key, new NutchWritable(content));
output.collect(url, new NutchWritable(new ParseImpl(new ParseText(parse.getText()),parseData, parse.isCanonical())));
以上執行完output方法後咱們能夠經過代碼pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);發現會返回pstatus狀態,該狀態表示從頁面中是否解析出來了url。若是解析出來了則標記爲STATUS_DB_UNFETCHED並初始化分值,代碼以下:
CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
fit.datum.getFetchInterval(), fit.datum.getScore());
// transfer existing metadata to the redir
newDatum.getMetaData().putAll(fit.datum.getMetaData());
scfilters.initialScore(redirUrl, newDatum);隨即還對該redirUrl進行了一系列判斷及操做:
if (reprUrl != null) {
newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
new Text(reprUrl));
}
fit = FetchItem.create(redirUrl, newDatum, queueMode);
if (fit != null) {
FetchItemQueue fiq =
fetchQueues.getFetchItemQueue(fit.queueID);
fiq.addInProgressFetchItem(fit);
} else {
// stop redirecting
redirecting = false;
reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1);
}
以上就是對於返回狀態爲success的url的一系列解決方式;
(3)若是是MOVED或者TEMP_MOVED,表示這個網頁被重定向了。而後對其重定向的內容進行解析並生成相應的文件,執行output(fit.url, fit.datum, content, status, code);以及 Text redirUrl =handleRedirect(fit.url, fit.datum,
urlString, newUrl, temp,Fetcher.PROTOCOL_REDIR);獲得重定向的網址並生成一個新的FetchItem,根據其QueueID放到相應的隊列的inProgress集合中,而後再對這個重定向的網頁進行抓取;
(4)若是狀態是EXCEPTION,對當前url所屬的FetchItemQueue進行檢測,看其異常的網頁數有沒有超過最大異常網頁數,若是大於,那就清空這個隊列,認爲這個隊列中的全部網頁都有問題;
(5)若是狀態是RETRY或者是BLOCKED,那就輸出CrawlDatum,將其狀態設置成STATUS_FETCH_RETRY,在下一輪進行從新抓取;
(6)若是狀態是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就輸出CrawlDatum,設置其狀態爲STATUS_FETCH_GONE,可能在下一輪中就不進行抓取了;
(7)若是狀態是NOTMODIFIED,那就認爲這個網頁沒有改變過,那就輸出其CrawlDatum,將其狀態設成成STATUS_FETCH_NOTMODIFIED;
(8)若是全部狀態都沒有找到,那默認輸出其CrawlDatum,將其狀態設置成STATUS_FETCH_RETRY,在下一輪抓取中再重試
最後判斷網頁重定向的次數,若是超過最大重定向次數,就輸出其CrawlDatum,將其狀態設置成STATUS_FETCH_GONE
6.每一個消費者「消費」的過程走完後,還要執行從這個消費隊列中除名,畢竟你來過了,走了以後就要籤個到什麼的,因此在FetchThread的run方法最後執行了finally代碼:
finally {
if (fit != null) fetchQueues.finishFetchItem(fit);
activeThreads.decrementAndGet(); // count threads
LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
}表示當前線程結束,整個線程隊列中減小醫院,其中activeThreads.decrementAndGet(); 這類的用法在nutch的fetch過程當中出現的很頻繁,activeThreads的定義爲:private AtomicInteger activeThreads = new AtomicInteger(0);(補充一下:這裏主要的做用表示不論是decrementAndGet()仍是incrementAndGet()方法都是線程安全的,一個表示減1,一個表示加1)
後面就是其餘的消費中一次重複三、四、五、6的過程,咱們跳出來回到Crawl.java類中的fetcher.fetch(segs[0], threads);方法能夠看出它也是在整個循環:
for (i = 0; i < depth; i++) { // generate new segment
Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
.currentTimeMillis());
if (segs == null) {
LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
break;
}
fetcher.fetch(segs[0], threads); // fetch it segs[0]===[crawl20140727/segments/20140727195735]
if (!Fetcher.isParsing(job)) {
parseSegment.parse(segs[0]); // parse it, if needed
}
crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
}中,也就是說Generate、fetch、parse以及update是在循環執行,當達到用戶設置的採集depth或者系統默認的depth時,採集結束。
看到這裏,咱們大體明白了nutch的採集爬蟲的過程了。
本身感受最難啃的一根骨頭應該是啃完了,儘管不是啃得很乾淨……
整個fetch的脈絡大體以下,首先是進入從Fetch類的fetch函數入口,而後進行了一系列的賦值初始化等過程提交一個job,從代碼job.setMapRunnerClass(Fetcher.class);能夠看出在提交job時,執行到fetch的run函數:public void run(RecordReader<Text, CrawlDatum> input,OutputCollector<Text, NutchWritable> output,Reporter reporter) throws IOException 進入該run函數後,就是鋪墊好要解決的工做並經過生產者-消費者模型來解決這個問題,真正的爬取部分由消費者來解決,經過代碼:new FetcherThread(getConf()).start();看出應該進入到FetcherThread的run函數裏面執行一系列的頁面抓取、解析等操做。
(補充一點,從調試過程能夠看到property即配置文件的信息爲:{job.end.retry.interval=30000, ftp.keep.connection=false, io.bytes.per.checksum=512, mapred.job.tracker.retiredjobs.cache.size=1000, db.fetch.schedule.adaptive.dec_rate=0.2, mapred.task.profile.reduces=0-2, mapreduce.jobtracker.staging.root.dir=hadoop.tmp.dir/mapred/staging,mapred.job.reuse.jvm.num.tasks=1,mapred.reduce.tasks.speculative.execution=true,moreIndexingFilter.indexMimeTypeParts=true,db.ignore.external.links=false,io.seqfile.sorter.recordlimit=1000000,generate.min.score=0,db.update.additions.allowed=true,mapred.task.tracker.http.address=0.0.0.0:50060,fetcher.queue.depth.multiplier=50,fs.ramfs.impl=org.apache.hadoop.fs.InMemoryFileSystem,mapred.system.dir={hadoop.tmp.dir}/mapred/system, mapred.task.tracker.report.address=127.0.0.1:0, mapreduce.reduce.shuffle.connect.timeout=180000, db.fetch.schedule.adaptive.inc_rate=0.4, db.fetch.schedule.adaptive.sync_delta_rate=0.3, mapred.healthChecker.interval=60000, mapreduce.job.complete.cancel.delegation.tokens=true, generate.max.per.host=-1, fetcher.max.exceptions.per.queue=-1, fs.trash.interval=0, mapred.skip.map.auto.incr.proc.count=true, parser.fix.embeddedparams=true,
……
urlnormalizer.order=org.apache.nutch.net.urlnormalizer.basic.BasicURLNormalizer org.apache.nutch.net.urlnormalizer.regex.RegexURLNormalizer, io.compression.codecs=org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec, link.score.updater.clear.score=0.0f, parser.html.impl=neko, io.file.buffer.size=4096, parser.character.encoding.default=windows-1252, ftp.timeout=60000, mapred.map.tasks.speculative.execution=true, fetcher.timelimit.mins=-1, mapreduce.job.split.metainfo.maxsize=10000000, http.agent.name=jack, mapred.map.max.attempts=4, mapred.job.shuffle.merge.percent=0.66, fs.har.impl=org.apache.hadoop.fs.HarFileSystem, hadoop.security.authentication=simple, fs.s3.buffer.dir=${hadoop.tmp.dir}/s3, lang.analyze.max.length=2048, mapred.skip.reduce.auto.incr.proc.count=true, mapred.job.tracker.jobhistory.lru.cache.size=5, fetcher.threads.timeout.divisor=2, db.fetch.schedule.class=org.apache.nutch.crawl.DefaultFetchSchedule, mapred.jobtracker.blacklist.fault-bucket-width=15, mapreduce.job.acl-view-job= , mapred.job.queue.name=default, fetcher.queue.mode=byHost, link.analyze.initial.score=1.0f, mapred.job.tracker.persist.jobstatus.hours=0, db.max.outlinks.per.page=100, fs.file.impl=org.apache.hadoop.fs.LocalFileSystem, db.fetch.schedule.adaptive.sync_delta=true, urlnormalizer.loop.count=1, ipc.client.kill.max=10, mapred.healthChecker.script.timeout=600000, mapred.tasktracker.map.tasks.maximum=2, http.max.delays=100, fetcher.follow.outlinks.depth.divisor=2, mapred.job.tracker.persist.jobstatus.dir=/jobtracker/jobsInfo, lang.identification.only.certain=false, http.useHttp11=false, lang.extraction.policy=detect,identify, mapred.reduce.slowstart.completed.maps=0.05, io.sort.mb=100, ipc.server.listen.queue.size=128, db.fetch.interval.default=2592000, ftp.password=anonymous@example.com, solr.auth=false, io.mapfile.bloom.size=1048576, ftp.follow.talk=false, fs.hsftp.impl=org.apache.hadoop.hdfs.HsftpFileSystem, fetcher.verbose=false, fetcher.throughput.threshold.check.after=5, hadoop.rpc.socket.factory.class.default=org.apache.hadoop.net.StandardSocketFactory, fs.hftp.impl=org.apache.hadoop.hdfs.HftpFileSystem, db.fetch.interval.max=7776000, fs.kfs.impl=org.apache.hadoop.fs.kfs.KosmosFileSystem, mapred.map.tasks=2, mapred.local.dir.minspacekill=0, fs.hdfs.impl=org.apache.hadoop.hdfs.DistributedFileSystem, urlfilter.domain.file=domain-urlfilter.txt, mapred.job.map.memory.mb=-1, mapred.jobtracker.completeuserjobs.maximum=100, plugin.folders=./plugins, indexer.max.content.length=-1, fetcher.throughput.threshold.retries=5, link.analyze.damping.factor=0.85f, urlfilter.regex.file=regex-urlfilter.txt, mapred.min.split.size=0, http.robots.403.allow=true……這樣的信息)
參考博文:http://blog.csdn.net/amuseme_lu/article/details/6725561
友情贊助html
若是你以爲博主的文章對你那麼一點小幫助,恰巧你又有想打賞博主的小衝動,那麼事不宜遲,趕忙掃一掃,小額地贊助下,攢個奶粉錢,也是讓博主有動力繼續努力,寫出更好的文章^^。java
1. 支付寶 2. 微信apache