爬蟲中的併發下載

如下內容是《用Python寫網絡爬蟲》的讀書筆記:html

1、串行爬蟲python

  咱們以前使用的爬蟲方式,都是一個頁面接着一個頁面下載,也就是使用串行的方式進行爬蟲。可是顯然這種方式下載的速度是很是的慢的,特別是當咱們須要下載大量頁面的時候這個問題就會變得更加的突出。因此本節內,就學習如何進行多線程和多進程的並行爬蟲。web

2、多線程爬蟲服務器

  咱們在使用多線程進行爬蟲的時候由於同時下載的頁面過多,可能出現服務器奔潰的狀況,甚至會可能出現ip被封的狀況。所以須要爲每一個ip設置一個爬取同一個域名下的不一樣頁面最小的執行間隔時間。網絡

  爲了獲取實驗所需的數據,首先咱們要到指定的網站下載zip文件,將其解壓,而後獲取保存在其中的域名。如下是具體代碼:多線程

from zipfile import ZipFile
from StringIO import StringIO
import csv
import sys
from crawler import crawler
from Chapter3 import LinkCrawler
from Chapter3 import MongoDb

class AlexaCallback:
    def __init__(self, max_length=1000):
        '''
        init the seed_url and max_length
        :param max_length: we can get the max_length website at most
        '''
        self.seed_url = "http://s3.amazonaws.com/alexa-static/top-1m.csv.zip"
        self.max_length = max_length

    def __call__(self, url, html):
        '''
        get at most max_length website, and return their urls
        :param url:
        :param html:
        :return: urls which we want
        '''
        if url == self.seed_url:
            urls = []
            with ZipFile(StringIO(html)) as zf:
                csv_name = zf.namelist()[0]
                for _, website in csv.reader(zf.open(csv_name)):
                    urls.append("http://" + website)
                    if len(urls) == self.max_length:
                        break
            return urls

   有了獲取域名的函數以後,咱們就能夠對一個進程開啓多個線程,實現併發。咱們須要建立一個線程池,在沒有達到容許的最大線程或者須要下載的鏈接不爲空的時候,咱們就須要建立一個新的線程,在這個過程當中須要去除線程池中死去的線程。全部的線程都執行相同的代碼。併發

from Chapter3 import download
import threading
import time
import urlparse
import AlexaCallback
from Chapter3 import MongoDb

Sleep_Time = 1
def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2,
                cache=None, scrape_callback=None, timeout=60, max_threads=10):
    '''
    create max_threads threads to download html to realize parallel
    :param url:
    :param delay:
    :param user_agent:
    :param proxies:
    :param num_tries:
    :param cache:
    :param scrape_callback:
    :param timeout:
    :param max_threads:
    :return:
    '''
    crawl_queue = [url]
    seen = set(crawl_queue)
    d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies,
                            num_tries=num_tries, timeout=timeout)

    def process_queue():
        '''
        every thread exceed this code to create a download operation
        :return:
        '''
        while True:
            try:
                current_url = crawl_queue.pop()
            except IndexError:
                break
            else:
                html = d(current_url)
                if scrape_callback:
                    try:
                        links = scrape_callback(current_url, html) or []
                    except Exception as e:
                        print "error in callback for: {}: {}".format(current_url, e)
                    else:
                        for link in links:
                            link = normalize(url, link)
                            if link not in seen:
                                seen.add(link)
                                crawl_queue.append(link)
    # the thread pool
    threads = []
    while threads or crawl_queue:
        # remove the dead thread
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        # start a new thread
        while len(threads) < max_threads and crawl_queue:
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
        # all threads have been processed
        # sleep temporarily so cpu can focus execution elsewhere
        time.sleep(Sleep_Time)

def normalize(url, link):
    link, _ = urlparse.urldefrag(link)
    return urlparse.urljoin(url, link)
if __name__ =="__main__":
    scape_callback = AlexaCallback.AlexaCallback()
    cache = MongoDb.MongoDb()
    threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

 3、多進程爬蟲app

  多線程爬蟲只能由一個進程進行處理,若是咱們的電腦有多個cpu的時候,爲了使下載的速度更快,咱們可使用多進程爬蟲。要使用多進程爬蟲,爬蟲隊列就不能在內存中,由於這樣的話,其餘進程就沒有辦法訪問了。咱們須要將爬蟲隊列放到Mongdb中。這樣不一樣進程就能實現同步。函數

  首先咱們先要建立Mongdbqueue。這個隊列可以實現不一樣進程之間url下載的協調,同步。學習

from pymongo import errors, MongoClient
from datetime import datetime, timedelta
class MongdbQueue:
    # init the three state
    Outstanding, Proceeding, Complete = range(3)

    def __init__(self, client=None, timeout=300):
        self.client = MongoClient() if client is None else client
        self.db = self.client.cache
        self.timeout = timeout

    def __nonzero__(self):
        '''
        if there are more objects return true
        :return:
        '''
        record = self.db.crawl_queue.find_one({'status': {'$ne': self.Complete}})
        return True if record else False

    def push(self, url):
        '''
        insert url if it not exist
        :param url:
        :return:
        '''
        try:
            self.db.crawl_queue.insert({'_id': url, 'status': self.Outstanding})
        except errors.DuplicateKeyError as e:
            pass

    def pop(self):
        '''
        change the process which status is outstanding to proceeding, if not find a record raise
        key error
        :return:
        '''
        record = self.db.crawl_queue.find_and_modify(query={'status': self.Outstanding},
                                                     update={'$set': {'status': self.Proceeding,
                                                                      'timestamp': datetime.now()}})
        if record:
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def repair(self):
        '''
        release stalled jobs
        :return: the url of the stalled jobs
        '''
        record = self.db.crawl_queue.find_and_modify(query={'timestamp': {'$lt': datetime.now()-timedelta(self.timeout)
                                                                          }, 'status': self.Complete},
                                                     update={'$set': {'$ne': self.Outstanding}})
        if record:
            print "release: ", record['_id']

    def complete(self, url):
        '''
        change the status to complete if the process has finished
        :return:
        '''
        self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.Complete}})

    def clear(self):
        self.db.crawl_queue.drop()

    def peek(self):
        record = self.db.crawl_queue.find_one({'status': self.Outstanding})
        if record:
            return record['_id']

  而後就要用新建的MongodbQueue來重寫threadcrawler

from Chapter3 import download
import threading
import time
import urlparse
import AlexaCallback
from Chapter3 import MongoDb
from MongdbQueue import MongdbQueue

Sleep_Time = 1
def threadscrawler(url, delay=5, user_agent="wuyanjing", proxies=None, num_tries=2,
                cache=None, scrape_callback=None, timeout=60, max_threads=10):
    '''
    create max_threads threads to download html to realize parallel
    :param url:
    :param delay:
    :param user_agent:
    :param proxies:
    :param num_tries:
    :param cache:
    :param scrape_callback:
    :param timeout:
    :param max_threads:
    :return:
    '''
    crawl_queue = MongdbQueue()
    crawl_queue.clear()
    crawl_queue.push(url)
    d = download.Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies,
                            num_tries=num_tries, timeout=timeout)

    def process_queue():
        '''
        every thread exceed this code to create a download operation
        :return:
        '''
        while True:
            try:
                current_url = crawl_queue.pop()
            except IndexError:
                break
            else:
                html = d(current_url)
                if scrape_callback:
                    try:
                        links = scrape_callback(current_url, html) or []
                    except Exception as e:
                        print "error in callback for: {}: {}".format(current_url, e)
                    else:
                        for link in links:
                            link = normalize(url, link)
                            crawl_queue.push(link)
                crawl_queue.complete(current_url)
    # the thread pool
    threads = []
    while threads or crawl_queue:
        # remove the dead thread
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        # start a new thread
        while len(threads) < max_threads and crawl_queue.peek():
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
        # all threads have been processed
        # sleep temporarily so cpu can focus execution elsewhere
        time.sleep(Sleep_Time)

def normalize(url, link):
    link, _ = urlparse.urldefrag(link)
    return urlparse.urljoin(url, link)
# if __name__ == "__main__":
#     scape_callback = AlexaCallback.AlexaCallback()
#     cache = MongoDb.MongoDb()
#     threadscrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

  最後是多進程爬蟲,我寫的這個函數報錯了,可是不知道是哪裏出了問題,有待考證。

import multiprocessing
from ThreadsCrawler import threadscrawler
import AlexaCallback
from Chapter3 import MongoDb

def processcrawler(arg, **kwargs):
    num_cups = multiprocessing.cpu_count()
    print "start process num is ", num_cups
    process = []
    for i in range(num_cups):
        p = multiprocessing.Process(target=threadscrawler, args=(arg, ), kwargs=kwargs)
        p.start()
        process.append(p)
    for p in process:
        p.join()

if __name__ == "__main__":
    scape_callback = AlexaCallback.AlexaCallback()
    cache = MongoDb.MongoDb()
    processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)

  如下是個人出錯日誌:

E:\python27\python.exe E:/pycharm/crawling/Chapter4/ProcessCrawler.py
start process num is  4
Traceback (most recent call last):
  File "E:/pycharm/crawling/Chapter4/ProcessCrawler.py", line 20, in <module>
    processcrawler(scape_callback.seed_url, scrape_callback=scape_callback, cache=cache, max_threads=5, timeout=10)
  File "E:/pycharm/crawling/Chapter4/ProcessCrawler.py", line 12, in processcrawler
    p.start()
  File "E:\python27\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "E:\python27\lib\multiprocessing\forking.py", line 277, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "E:\python27\lib\multiprocessing\forking.py", line 199, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "E:\python27\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "E:\python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "E:\python27\lib\pickle.py", line 425, in save_reduce
    save(state)
  File "E:\python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "E:\python27\lib\pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "E:\python27\lib\pickle.py", line 687, in _batch_setitems
    save(v)
  File "E:\python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "E:\python27\lib\pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "E:\python27\lib\pickle.py", line 687, in _batch_setitems
    save(v)
  File "E:\python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "E:\python27\lib\pickle.py", line 731, in save_inst
    save(stuff)
  File "E:\python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "E:\python27\lib\pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "E:\python27\lib\pickle.py", line 687, in _batch_setitems
    save(v)
  File "E:\python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "E:\python27\lib\pickle.py", line 425, in save_reduce
    save(state)
  File "E:\python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "E:\python27\lib\pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "E:\python27\lib\pickle.py", line 687, in _batch_setitems
    save(v)
  File "E:\python27\lib\pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "E:\python27\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "E:\python27\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "E:\python27\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "E:\python27\lib\pickle.py", line 886, in load_eof
    raise EOFError
EOFError

Process finished with exit code 1

 

4、總結

  爲了可以提升下載大量頁面的速度,咱們採用了多線程和多進程的方式。在必定的範圍內,提升線程和進程數,可以明顯的提升咱們的下載速度,可是一旦超過某一個度的時候,就不會提高反而降低,由於線程之間的切換,會帶來大量的能量損耗。

相關文章
相關標籤/搜索