(五)爬蟲之併發下載

  當有大量url須要下載時,串行爬取速度較慢,須要使用多線程、多進程進行爬取,以及部署分佈式爬蟲等windows

1.多線程爬蟲 服務器

    下面代碼中三個線程時,爬取61個url,花費16-25s;五個線程時,花費41-55s。(線程間的切換也消耗時間)多線程

#coding:utf-8
import threading
import multiprocessing
import requests
import csv
import time
from datetime import datetime
import urlparse

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

#網頁下載
def download(url,user_agent=None,proxies=None,num_retries=3):
    response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
    if response.status_code and 500<=response.status_code<600:  # 出現服務器端錯誤時重試三次
        if num_retries > 0:
            response = download(url,user_agent,proxies,num_retries-1)
    return response


#讀取須要爬取的網址文件
def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def thread_crawler(max_threads=5): urls = get_urls() def process_url(): while True: try: url = urls.pop() except IndexError as e: print e break else: throttle = Throttle(3) throttle.wait(url) response=download(url) print url, response.status_code threads=[] while urls or threads: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads)<max_threads and urls: thread = threading.Thread(target=process_url()) thread.setDaemon(True) thread.start() threads.append(thread) #time.sleep(2)
if __name__=='__main__':
thread_crawler()

2.多進程爬蟲併發

  2.1 多進程app

     下面代碼中兩個進程,爬取61個url,平均花費20-30sdom

#coding:utf-8

import threading
import multiprocessing
import requests
import csv
import time
from datetime import datetime
import urlparse

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

#網頁下載
def download(url,user_agent=None,proxies=None,num_retries=3):
    response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
    if response.status_code and 500<=response.status_code<600:  # 出現服務器端錯誤時重試三次
        if num_retries > 0:
            response = download(url,user_agent,proxies,num_retries-1)
    return response

def get_urls():
    url_lists=[]
    with open('urls.csv','r') as f:
        reader = csv.reader(f)
        for row in reader:
            url_lists.append(row[0].replace(' ',''))
    return url_lists

def process_url(url_queue):
    while url_queue.qsize()>0:
        try:
            url = url_queue.get()
        except Exception as e:
            print e
        else:
            throttle = Throttle(3)
            throttle.wait(url)
            response=download(url)
            print url, response.status_code

def process_crawler():
    num_cpus = multiprocessing.cpu_count()
    q = multiprocessing.Queue()
    urls = get_urls()
    for url in urls:
        q.put(url)
    start = time.time()
    print "開始時間:%s"%start
    processes=[]
    for i in range(num_cpus):
        process = multiprocessing.Process(target=process_url,args=(q,))
        process.start()
        processes.append(process)
    for p in processes:
        p.join()
    end = time.time()
    print "結束時間:%s" %end
    print "爬取%s個url,消耗時間:%s"%(len(urls),int(end-start))
if __name__ == '__main__':  # windows系統下運行必須加__name__=="__main__",不然多進程報錯
    process_crawler()

  2.2 多進程加多線程分佈式

    下面代碼中兩個進程,每一個進程中又有三個線程,下載61個url,平均花費10-15side

#coding:utf-8

import threading
import multiprocessing
import requests
import csv
import time
from datetime import datetime
import urlparse

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

#網頁下載
def download(url,user_agent=None,proxies=None,num_retries=3):
    response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
    if response.status_code and 500<=response.status_code<600:  # 出現服務器端錯誤時重試三次
        if num_retries > 0:
            response = download(url,user_agent,proxies,num_retries-1)
    return response

def get_urls():
    url_lists=[]
    with open('urls.csv','r') as f:
        reader = csv.reader(f)
        for row in reader:
            url_lists.append(row[0].replace(' ',''))
    return url_lists

def thread_crawler(url_queue,max_threads=5):

    def process_url():
        while url_queue.qsize()>0:
            try:
                url = url_queue.get_nowait() #當queue爲空時,不等待
                #print url_queue.qsize()
            except Exception as e:
                print e
                break
            else:
                throttle = Throttle(3)
                throttle.wait(url)
                response=download(url)
                print url, response.status_code
    threads=[]
    while url_queue.qsize()>0 or threads:
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads)<max_threads and url_queue.qsize()>0:
            thread = threading.Thread(target=process_url)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
        #time.sleep(2)

def process_crawler():
    num_cpus = multiprocessing.cpu_count()
    q = multiprocessing.Queue()
    for url in get_urls():
        q.put(url)
    start = time.time()
    print "開始時間:%s" % start
    processes=[]
    for i in range(num_cpus):
        process = multiprocessing.Process(target=thread_crawler,args=(q,3))
        process.start()
        processes.append(process)
    for p in processes:
        p.join()
    end = time.time()
    print "結束時間:%s" % end
    print "爬取61個url,消耗時間:%s" % (int(end - start))
if __name__ == '__main__':  # windows系統下運行必須加__name__=="__main__",不然多進程報錯
    process_crawler()    

  2.3 基於MongoDB的url隊列(分佈式?)url

    能夠將須要爬取的url隊列部署在MongoDB上,多臺電腦能夠從中獲取url進行爬取,增大併發數,加速爬取。spa

    採用多線程,MongoDB儲存url隊列,代碼以下:

#coding:utf-8

import threading
import multiprocessing
import requests
import csv
import time
import urlparse
from datetime import datetime, timedelta
from pymongo import MongoClient,errors

class MongQueue(object):
    OUTSTANDING, PROCESSING, COMPLETE=(1,2,3)  #url的三種狀態:待下載,下載中,已下載

    def __init__(self,client=None,timeout=300):
        self.client = MongoClient('127.0.0.1',27017) if client is None else client
        self.db = self.client.urls_db
        self.timeout = timeout

    def isEmpty(self):
        record = self.db.crawl_queue.find_one({'status':{'$ne':self.COMPLETE}})  #$ne: not equal
        if not record:
            return True
        else:
            return False

    def push(self,url):
        try:
            self.db.crawl_queue.insert({'_id':url,'status':self.OUTSTANDING})
        except errors.DuplicateKeyError as e:
            print 'the url is already in the queue!'
            pass

    def pop(self):

        record = self.db.crawl_queue.find_and_modify(
            query={'status':self.OUTSTANDING},
            update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}}
        )
        if record:
            # print record['_id'],record['status']
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def complete(self,url):
        self.db.crawl_queue.update({'_id':url},
                                   {'$set':{'status':self.COMPLETE}})

    def repair(self):
        record = self.db.crawl_queue.find_and_modify(
            query={'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)},
                   'status':{'$ne':self.COMPLETE}},
            update={'$set':{'status':self.OUTSTANDING}}
        )
        if record:
            print 'Released:%s'%record['_id']
    def clear(self):
        self.db.crawl_queue.drop()

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

#網頁下載
def download(url,user_agent=None,proxies=None,num_retries=3):
    response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
    if response.status_code and 500<=response.status_code<600:  # 出現服務器端錯誤時重試三次
        if num_retries > 0:
            response = download(url,user_agent,proxies,num_retries-1)
    return response

def get_urls():
    url_lists=[]
    with open('urls.csv','r') as f:
        reader = csv.reader(f)
        for row in reader:
            url_lists.append(row[0].replace(' ',''))
    return url_lists


def thread_crawler(url_queue, max_threads=5):

    def process_url():
        while not url_queue.isEmpty():
            try:
                url = url_queue.pop()
            except KeyError as e:
                print e
                break
            else:
                throttle = Throttle(3)
                throttle.wait(url)
                response=download(url)
                url_queue.complete(url)
                print url, response.status_code
    threads=[]
    start = time.time()
    print "開始時間:%s" % start
    while (not url_queue.isEmpty()) or threads:
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads)<max_threads and (not url_queue.isEmpty()):
            thread = threading.Thread(target=process_url)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
        #time.sleep(2)
    end = time.time()
    print "結束時間:%s" % end
    print "爬取61個url,消耗時間:%s" % (int(end - start))

if __name__ == '__main__':  # windows系統下運行必須加__name__=="__main__",不然多進程報錯
    url_queue = MongQueue()
    url_queue.clear()
    for url in get_urls():
        url_queue.push(url)
    thread_crawler(url_queue)
View Code

    採用多進程和多線程,MongoDB儲存url隊列,代碼以下:

#coding:utf-8

import threading
import multiprocessing
import requests
import csv
import time
import urlparse
from datetime import datetime, timedelta
from pymongo import MongoClient,errors

class MongQueue(object):
    OUTSTANDING, PROCESSING, COMPLETE=(1,2,3)  #url的三種狀態:待下載,下載中,已下載

    def __init__(self,client=None,timeout=300):
        self.client = MongoClient('127.0.0.1',27017) if client is None else client
        self.db = self.client.urls_db
        self.timeout = timeout

    def isEmpty(self):
        record = self.db.crawl_queue.find_one({'status':{'$ne':self.COMPLETE}})  #$ne: not equal
        if not record:
            return True
        else:
            return False

    def push(self,url):
        try:
            self.db.crawl_queue.insert({'_id':url,'status':self.OUTSTANDING})
        except errors.DuplicateKeyError as e:
            print 'the url is already in the queue!'
            pass

    def pop(self):

        record = self.db.crawl_queue.find_and_modify(
            query={'status':self.OUTSTANDING},
            update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}}
        )
        if record:
            # print record['_id'],record['status']
            return record['_id']
        else:
            self.repair()
            raise KeyError()

    def complete(self,url):
        self.db.crawl_queue.update({'_id':url},
                                   {'$set':{'status':self.COMPLETE}})

    def repair(self):
        record = self.db.crawl_queue.find_and_modify(
            query={'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)},
                   'status':{'$ne':self.COMPLETE}},
            update={'$set':{'status':self.OUTSTANDING}}
        )
        if record:
            print 'Released:%s'%record['_id']
    def clear(self):
        self.db.crawl_queue.drop()

#同一個域名的下載延遲
class Throttle(object):
    def __init__(self,delay):
        self.delay = delay
        self.domains={}

    def wait(self,url):
        domain = urlparse.urlparse(url).netloc  #提取網址的域名
        last_accessed = self.domains.get(domain)
        if self.delay>0 and last_accessed!=None:
            sleep_secs = self.delay-(datetime.now()-last_accessed).seconds
            if sleep_secs>0:
                time.sleep(sleep_secs)
        self.domains[domain]=datetime.now()

#網頁下載
def download(url,user_agent=None,proxies=None,num_retries=3):
    response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies)
    if response.status_code and 500<=response.status_code<600:  # 出現服務器端錯誤時重試三次
        if num_retries > 0:
            response = download(url,user_agent,proxies,num_retries-1)
    return response

def get_urls():
    url_lists=[]
    with open('urls.csv','r') as f:
        reader = csv.reader(f)
        for row in reader:
            url_lists.append(row[0].replace(' ',''))
    return url_lists


def thread_crawler( max_threads=5):
    url_queue = MongQueue()
    def process_url():
        while not url_queue.isEmpty():
            try:
                url = url_queue.pop()
            except KeyError as e:
                print e
                break
            else:
                throttle = Throttle(3)
                throttle.wait(url)
                response=download(url)
                url_queue.complete(url)
                print url, response.status_code
    threads=[]
 
    while (not url_queue.isEmpty()) or threads:
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads)<max_threads and (not url_queue.isEmpty()):
            thread = threading.Thread(target=process_url)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
        #time.sleep(2)

def process_crawler():
    num_cpus = multiprocessing.cpu_count()
    start = time.time()
    print "開始時間:%s" % start
    processes=[]
    for i in range(num_cpus):
        process = multiprocessing.Process(target=thread_crawler)
        process.start()
        processes.append(process)
    for p in processes:
        p.join()
    end = time.time()
    print "結束時間:%s" % end
    print "爬取61個url,消耗時間:%s" % (int(end - start))
if __name__ == '__main__':  # windows系統下運行必須加__name__=="__main__",不然多進程報錯
      process_crawler()
View Code
相關文章
相關標籤/搜索