如下內容是《用Python寫網絡爬蟲》的讀書筆記:html
1、串行爬蟲python
咱們以前使用的爬蟲方式,都是一個頁面接着一個頁面下載,也就是使用串行的方式進行爬蟲。可是顯然這種方式下載的速度是很是的慢的,特別是當咱們須要下載大量頁面的時候這個問題就會變得更加的突出。因此本節內,就學習如何進行多線程和多進程的並行爬蟲。web
2、多線程爬蟲服務器
咱們在使用多線程進行爬蟲的時候由於同時下載的頁面過多,可能出現服務器奔潰的狀況,甚至會可能出現ip被封的狀況。所以須要爲每一個ip設置一個爬取同一個域名下的不一樣頁面最小的執行間隔時間。網絡
爲了獲取實驗所需的數據,首先咱們要到指定的網站下載zip文件,將其解壓,而後獲取保存在其中的域名。如下是具體代碼:多線程
from zipfile import ZipFile from StringIO import StringIO import csv import sys from crawler import crawler from Chapter3 import LinkCrawler from Chapter3 import MongoDb class AlexaCallback: def __init__(self, max_length=1000): ''' init the seed_url and max_length :param max_length: we can get the max_length website at most ''' self.seed_url = "http://s3.amazonaws.com/alexa-static/top-1m.csv.zip" self.max_length = max_length def __call__(self, url, html): ''' get at most max_length website, and return their urls :param url: :param html: :return: urls which we want ''' if url == self.seed_url: urls = [] with ZipFile(StringIO(html)) as zf: csv_name = zf.namelist()[0] for _, website in csv.reader(zf.open(csv_name)): urls.append("http://" + website) if len(urls) == self.max_length: break return urls
有了獲取域名的函數以後,咱們就能夠對一個進程開啓多個線程,實現併發。咱們須要建立一個線程池,在沒有達到容許的最大線程或者須要下載的鏈接不爲空的時候,咱們就須要建立一個新的線程,在這個過程當中須要去除線程池中死去的線程。全部的線程都執行相同的代碼。併發
from Chapter3 import download import threading import time import urlparse import AlexaCallback from Chapter3 import MongoDb Sleep_Time = 1 def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2, cache=None, scrape_callback=None, timeout=60, max_threads=10): ''' create max_threads threads to download html to realize parallel :param url: :param delay: :param user_agent: :param proxies: :param num_tries: :param cache: :param scrape_callback: :param timeout: :param max_threads: :return: ''' crawl_queue = [url] seen = set(crawl_queue) d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_tries=num_tries, timeout=timeout) def process_queue(): ''' every thread exceed this code to create a download operation :return: ''' while True: try: current_url = crawl_queue.pop() except IndexError: break else: html = d(current_url) if scrape_callback: try: links = scrape_callback(current_url, html) or [] except Exception as e: print "error in callback for: {}: {}".format(current_url, e) else: for link in links: link = normalize(url, link) if link not in seen: seen.add(link) crawl_queue.append(link) # the thread pool threads = [] while threads or crawl_queue: # remove the dead thread for thread in threads: if not thread.is_alive(): threads.remove(thread) # start a new thread while len(threads) < max_threads and crawl_queue: thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so cpu can focus execution elsewhere time.sleep(Sleep_Time) def normalize(url, link): link, _ = urlparse.urldefrag(link) return urlparse.urljoin(url, link) if __name__ =="__main__": scape_callback = AlexaCallback.AlexaCallback() cache = MongoDb.MongoDb() threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
3、多進程爬蟲app
多線程爬蟲只能由一個進程進行處理,若是咱們的電腦有多個cpu的時候,爲了使下載的速度更快,咱們可使用多進程爬蟲。要使用多進程爬蟲,爬蟲隊列就不能在內存中,由於這樣的話,其餘進程就沒有辦法訪問了。咱們須要將爬蟲隊列放到Mongdb中。這樣不一樣進程就能實現同步。函數
首先咱們先要建立Mongdbqueue。這個隊列可以實現不一樣進程之間url下載的協調,同步。學習
from pymongo import errors, MongoClient from datetime import datetime, timedelta class MongdbQueue: # init the three state Outstanding, Proceeding, Complete = range(3) def __init__(self, client=None, timeout=300): self.client = MongoClient() if client is None else client self.db = self.client.cache self.timeout = timeout def __nonzero__(self): ''' if there are more objects return true :return: ''' record = self.db.crawl_queue.find_one({'status': {'$ne': self.Complete}}) return True if record else False def push(self, url): ''' insert url if it not exist :param url: :return: ''' try: self.db.crawl_queue.insert({'_id': url, 'status': self.Outstanding}) except errors.DuplicateKeyError as e: pass def pop(self): ''' change the process which status is outstanding to proceeding, if not find a record raise key error :return: ''' record = self.db.crawl_queue.find_and_modify(query={'status': self.Outstanding}, update={'$set': {'status': self.Proceeding, 'timestamp': datetime.now()}}) if record: return record['_id'] else: self.repair() raise KeyError() def repair(self): ''' release stalled jobs :return: the url of the stalled jobs ''' record = self.db.crawl_queue.find_and_modify(query={'timestamp': {'$lt': datetime.now()-timedelta(self.timeout) }, 'status': self.Complete}, update={'$set': {'$ne': self.Outstanding}}) if record: print "release: ", record['_id'] def complete(self, url): ''' change the status to complete if the process has finished :return: ''' self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.Complete}}) def clear(self): self.db.crawl_queue.drop() def peek(self): record = self.db.crawl_queue.find_one({'status': self.Outstanding}) if record: return record['_id']
而後就要用新建的MongodbQueue來重寫threadcrawler
from Chapter3 import download import threading import time import urlparse import AlexaCallback from Chapter3 import MongoDb from MongdbQueue import MongdbQueue Sleep_Time = 1 def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2, cache=None, scrape_callback=None, timeout=60, max_threads=10): ''' create max_threads threads to download html to realize parallel :param url: :param delay: :param user_agent: :param proxies: :param num_tries: :param cache: :param scrape_callback: :param timeout: :param max_threads: :return: ''' crawl_queue = MongdbQueue() crawl_queue.clear() crawl_queue.push(url) d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_tries=num_tries, timeout=timeout) def process_queue(): ''' every thread exceed this code to create a download operation :return: ''' while True: try: current_url = crawl_queue.pop() except IndexError: break else: html = d(current_url) if scrape_callback: try: links = scrape_callback(current_url, html) or [] except Exception as e: print "error in callback for: {}: {}".format(current_url, e) else: for link in links: link = normalize(url, link) crawl_queue.push(link) crawl_queue.complete(current_url) # the thread pool threads = [] while threads or crawl_queue: # remove the dead thread for thread in threads: if not thread.is_alive(): threads.remove(thread) # start a new thread while len(threads) < max_threads and crawl_queue.peek(): thread = threading.Thread(target=process_queue) thread.setDaemon(True) thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so cpu can focus execution elsewhere time.sleep(Sleep_Time) def normalize(url, link): link, _ = urlparse.urldefrag(link) return urlparse.urljoin(url, link) # if __name__ == "__main__": # scape_callback = AlexaCallback.AlexaCallback() # cache = MongoDb.MongoDb() # threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
最後是多進程爬蟲,我寫的這個函數報錯了,可是不知道是哪裏出了問題,有待考證。
import multiprocessing from ThreadsCrawler import threadscrawler import AlexaCallback from Chapter3 import MongoDb def processcrawler(arg, **kwargs): num_cups = multiprocessing.cpu_count() print "start process num is ", num_cups process = [] for i in range(num_cups): p = multiprocessing.Process(target=threadscrawler, args=(arg, ), kwargs=kwargs) p.start() process.append(p) for p in process: p.join() if __name__ == "__main__": scape_callback = AlexaCallback.AlexaCallback() cache = MongoDb.MongoDb() processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
如下是個人出錯日誌:
E:\python27\python.exe E:/pycharm/crawling/Chapter4/ProcessCrawler.py start process num is 4 Traceback (most recent call last): File "E:/pycharm/crawling/Chapter4/ProcessCrawler.py", line 20, in <module> processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10) File "E:/pycharm/crawling/Chapter4/ProcessCrawler.py", line 12, in processcrawler p.start() File "E:\python27\lib\multiprocessing\process.py", line 130, in start self._popen = Popen(self) File "E:\python27\lib\multiprocessing\forking.py", line 277, in __init__ dump(process_obj, to_child, HIGHEST_PROTOCOL) File "E:\python27\lib\multiprocessing\forking.py", line 199, in dump ForkingPickler(file, protocol).dump(obj) File "E:\python27\lib\pickle.py", line 224, in dump self.save(obj) File "E:\python27\lib\pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "E:\python27\lib\pickle.py", line 425, in save_reduce save(state) File "E:\python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "E:\python27\lib\pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "E:\python27\lib\pickle.py", line 687, in _batch_setitems save(v) File "E:\python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "E:\python27\lib\pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "E:\python27\lib\pickle.py", line 687, in _batch_setitems save(v) File "E:\python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "E:\python27\lib\pickle.py", line 731, in save_inst save(stuff) File "E:\python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "E:\python27\lib\pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "E:\python27\lib\pickle.py", line 687, in _batch_setitems save(v) File "E:\python27\lib\pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "E:\python27\lib\pickle.py", line 425, in save_reduce save(state) File "E:\python27\lib\pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "E:\python27\lib\pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "E:\python27\lib\pickle.py", line 687, in _batch_setitems save(v) File "E:\python27\lib\pickle.py", line 306, in save rv = reduce(self.proto) TypeError: can't pickle thread.lock objects Traceback (most recent call last): File "<string>", line 1, in <module> File "E:\python27\lib\multiprocessing\forking.py", line 381, in main self = load(from_parent) File "E:\python27\lib\pickle.py", line 1384, in load return Unpickler(file).load() File "E:\python27\lib\pickle.py", line 864, in load dispatch[key](self) File "E:\python27\lib\pickle.py", line 886, in load_eof raise EOFError EOFError Process finished with exit code 1
4、總結
爲了可以提升下載大量頁面的速度,咱們採用了多線程和多進程的方式。在必定的範圍內,提升線程和進程數,可以明顯的提升咱們的下載速度,可是一旦超過某一個度的時候,就不會提高反而降低,由於線程之間的切換,會帶來大量的能量損耗。