當有大量url須要下載時,串行爬取速度較慢,須要使用多線程、多進程進行爬取,以及部署分佈式爬蟲等windows
1.多線程爬蟲 服務器
下面代碼中三個線程時,爬取61個url,花費16-25s;五個線程時,花費41-55s。(線程間的切換也消耗時間)多線程
#coding:utf-8 import threading import multiprocessing import requests import csv import time from datetime import datetime import urlparse #同一個域名的下載延遲 class Throttle(object): def __init__(self,delay): self.delay = delay self.domains={} def wait(self,url): domain = urlparse.urlparse(url).netloc #提取網址的域名 last_accessed = self.domains.get(domain) if self.delay>0 and last_accessed!=None: sleep_secs = self.delay-(datetime.now()-last_accessed).seconds if sleep_secs>0: time.sleep(sleep_secs) self.domains[domain]=datetime.now() #網頁下載 def download(url,user_agent=None,proxies=None,num_retries=3): response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies) if response.status_code and 500<=response.status_code<600: # 出現服務器端錯誤時重試三次 if num_retries > 0: response = download(url,user_agent,proxies,num_retries-1) return response
#讀取須要爬取的網址文件 def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def thread_crawler(max_threads=5): urls = get_urls() def process_url(): while True: try: url = urls.pop() except IndexError as e: print e break else: throttle = Throttle(3) throttle.wait(url) response=download(url) print url, response.status_code threads=[] while urls or threads: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads)<max_threads and urls: thread = threading.Thread(target=process_url()) thread.setDaemon(True) thread.start() threads.append(thread) #time.sleep(2)
if __name__=='__main__':
thread_crawler()
2.多進程爬蟲併發
2.1 多進程app
下面代碼中兩個進程,爬取61個url,平均花費20-30sdom
#coding:utf-8 import threading import multiprocessing import requests import csv import time from datetime import datetime import urlparse #同一個域名的下載延遲 class Throttle(object): def __init__(self,delay): self.delay = delay self.domains={} def wait(self,url): domain = urlparse.urlparse(url).netloc #提取網址的域名 last_accessed = self.domains.get(domain) if self.delay>0 and last_accessed!=None: sleep_secs = self.delay-(datetime.now()-last_accessed).seconds if sleep_secs>0: time.sleep(sleep_secs) self.domains[domain]=datetime.now() #網頁下載 def download(url,user_agent=None,proxies=None,num_retries=3): response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies) if response.status_code and 500<=response.status_code<600: # 出現服務器端錯誤時重試三次 if num_retries > 0: response = download(url,user_agent,proxies,num_retries-1) return response def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def process_url(url_queue): while url_queue.qsize()>0: try: url = url_queue.get() except Exception as e: print e else: throttle = Throttle(3) throttle.wait(url) response=download(url) print url, response.status_code def process_crawler(): num_cpus = multiprocessing.cpu_count() q = multiprocessing.Queue() urls = get_urls() for url in urls: q.put(url) start = time.time() print "開始時間:%s"%start processes=[] for i in range(num_cpus): process = multiprocessing.Process(target=process_url,args=(q,)) process.start() processes.append(process) for p in processes: p.join() end = time.time() print "結束時間:%s" %end print "爬取%s個url,消耗時間:%s"%(len(urls),int(end-start)) if __name__ == '__main__': # windows系統下運行必須加__name__=="__main__",不然多進程報錯 process_crawler()
2.2 多進程加多線程分佈式
下面代碼中兩個進程,每一個進程中又有三個線程,下載61個url,平均花費10-15side
#coding:utf-8 import threading import multiprocessing import requests import csv import time from datetime import datetime import urlparse #同一個域名的下載延遲 class Throttle(object): def __init__(self,delay): self.delay = delay self.domains={} def wait(self,url): domain = urlparse.urlparse(url).netloc #提取網址的域名 last_accessed = self.domains.get(domain) if self.delay>0 and last_accessed!=None: sleep_secs = self.delay-(datetime.now()-last_accessed).seconds if sleep_secs>0: time.sleep(sleep_secs) self.domains[domain]=datetime.now() #網頁下載 def download(url,user_agent=None,proxies=None,num_retries=3): response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies) if response.status_code and 500<=response.status_code<600: # 出現服務器端錯誤時重試三次 if num_retries > 0: response = download(url,user_agent,proxies,num_retries-1) return response def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def thread_crawler(url_queue,max_threads=5): def process_url(): while url_queue.qsize()>0: try: url = url_queue.get_nowait() #當queue爲空時,不等待 #print url_queue.qsize() except Exception as e: print e break else: throttle = Throttle(3) throttle.wait(url) response=download(url) print url, response.status_code threads=[] while url_queue.qsize()>0 or threads: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads)<max_threads and url_queue.qsize()>0: thread = threading.Thread(target=process_url) thread.setDaemon(True) thread.start() threads.append(thread) #time.sleep(2) def process_crawler(): num_cpus = multiprocessing.cpu_count() q = multiprocessing.Queue() for url in get_urls(): q.put(url) start = time.time() print "開始時間:%s" % start processes=[] for i in range(num_cpus): process = multiprocessing.Process(target=thread_crawler,args=(q,3)) process.start() processes.append(process) for p in processes: p.join() end = time.time() print "結束時間:%s" % end print "爬取61個url,消耗時間:%s" % (int(end - start)) if __name__ == '__main__': # windows系統下運行必須加__name__=="__main__",不然多進程報錯 process_crawler()
2.3 基於MongoDB的url隊列(分佈式?)url
能夠將須要爬取的url隊列部署在MongoDB上,多臺電腦能夠從中獲取url進行爬取,增大併發數,加速爬取。spa
採用多線程,MongoDB儲存url隊列,代碼以下:
#coding:utf-8 import threading import multiprocessing import requests import csv import time import urlparse from datetime import datetime, timedelta from pymongo import MongoClient,errors class MongQueue(object): OUTSTANDING, PROCESSING, COMPLETE=(1,2,3) #url的三種狀態:待下載,下載中,已下載 def __init__(self,client=None,timeout=300): self.client = MongoClient('127.0.0.1',27017) if client is None else client self.db = self.client.urls_db self.timeout = timeout def isEmpty(self): record = self.db.crawl_queue.find_one({'status':{'$ne':self.COMPLETE}}) #$ne: not equal if not record: return True else: return False def push(self,url): try: self.db.crawl_queue.insert({'_id':url,'status':self.OUTSTANDING}) except errors.DuplicateKeyError as e: print 'the url is already in the queue!' pass def pop(self): record = self.db.crawl_queue.find_and_modify( query={'status':self.OUTSTANDING}, update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}} ) if record: # print record['_id'],record['status'] return record['_id'] else: self.repair() raise KeyError() def complete(self,url): self.db.crawl_queue.update({'_id':url}, {'$set':{'status':self.COMPLETE}}) def repair(self): record = self.db.crawl_queue.find_and_modify( query={'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)}, 'status':{'$ne':self.COMPLETE}}, update={'$set':{'status':self.OUTSTANDING}} ) if record: print 'Released:%s'%record['_id'] def clear(self): self.db.crawl_queue.drop() #同一個域名的下載延遲 class Throttle(object): def __init__(self,delay): self.delay = delay self.domains={} def wait(self,url): domain = urlparse.urlparse(url).netloc #提取網址的域名 last_accessed = self.domains.get(domain) if self.delay>0 and last_accessed!=None: sleep_secs = self.delay-(datetime.now()-last_accessed).seconds if sleep_secs>0: time.sleep(sleep_secs) self.domains[domain]=datetime.now() #網頁下載 def download(url,user_agent=None,proxies=None,num_retries=3): response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies) if response.status_code and 500<=response.status_code<600: # 出現服務器端錯誤時重試三次 if num_retries > 0: response = download(url,user_agent,proxies,num_retries-1) return response def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def thread_crawler(url_queue, max_threads=5): def process_url(): while not url_queue.isEmpty(): try: url = url_queue.pop() except KeyError as e: print e break else: throttle = Throttle(3) throttle.wait(url) response=download(url) url_queue.complete(url) print url, response.status_code threads=[] start = time.time() print "開始時間:%s" % start while (not url_queue.isEmpty()) or threads: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads)<max_threads and (not url_queue.isEmpty()): thread = threading.Thread(target=process_url) thread.setDaemon(True) thread.start() threads.append(thread) #time.sleep(2) end = time.time() print "結束時間:%s" % end print "爬取61個url,消耗時間:%s" % (int(end - start)) if __name__ == '__main__': # windows系統下運行必須加__name__=="__main__",不然多進程報錯 url_queue = MongQueue() url_queue.clear() for url in get_urls(): url_queue.push(url) thread_crawler(url_queue)
採用多進程和多線程,MongoDB儲存url隊列,代碼以下:
#coding:utf-8 import threading import multiprocessing import requests import csv import time import urlparse from datetime import datetime, timedelta from pymongo import MongoClient,errors class MongQueue(object): OUTSTANDING, PROCESSING, COMPLETE=(1,2,3) #url的三種狀態:待下載,下載中,已下載 def __init__(self,client=None,timeout=300): self.client = MongoClient('127.0.0.1',27017) if client is None else client self.db = self.client.urls_db self.timeout = timeout def isEmpty(self): record = self.db.crawl_queue.find_one({'status':{'$ne':self.COMPLETE}}) #$ne: not equal if not record: return True else: return False def push(self,url): try: self.db.crawl_queue.insert({'_id':url,'status':self.OUTSTANDING}) except errors.DuplicateKeyError as e: print 'the url is already in the queue!' pass def pop(self): record = self.db.crawl_queue.find_and_modify( query={'status':self.OUTSTANDING}, update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}} ) if record: # print record['_id'],record['status'] return record['_id'] else: self.repair() raise KeyError() def complete(self,url): self.db.crawl_queue.update({'_id':url}, {'$set':{'status':self.COMPLETE}}) def repair(self): record = self.db.crawl_queue.find_and_modify( query={'timestamp':{'$lt':datetime.now()-timedelta(seconds=self.timeout)}, 'status':{'$ne':self.COMPLETE}}, update={'$set':{'status':self.OUTSTANDING}} ) if record: print 'Released:%s'%record['_id'] def clear(self): self.db.crawl_queue.drop() #同一個域名的下載延遲 class Throttle(object): def __init__(self,delay): self.delay = delay self.domains={} def wait(self,url): domain = urlparse.urlparse(url).netloc #提取網址的域名 last_accessed = self.domains.get(domain) if self.delay>0 and last_accessed!=None: sleep_secs = self.delay-(datetime.now()-last_accessed).seconds if sleep_secs>0: time.sleep(sleep_secs) self.domains[domain]=datetime.now() #網頁下載 def download(url,user_agent=None,proxies=None,num_retries=3): response=requests.get(url,headers={'User-Agent':user_agent},proxies=proxies) if response.status_code and 500<=response.status_code<600: # 出現服務器端錯誤時重試三次 if num_retries > 0: response = download(url,user_agent,proxies,num_retries-1) return response def get_urls(): url_lists=[] with open('urls.csv','r') as f: reader = csv.reader(f) for row in reader: url_lists.append(row[0].replace(' ','')) return url_lists def thread_crawler( max_threads=5): url_queue = MongQueue() def process_url(): while not url_queue.isEmpty(): try: url = url_queue.pop() except KeyError as e: print e break else: throttle = Throttle(3) throttle.wait(url) response=download(url) url_queue.complete(url) print url, response.status_code threads=[] while (not url_queue.isEmpty()) or threads: for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads)<max_threads and (not url_queue.isEmpty()): thread = threading.Thread(target=process_url) thread.setDaemon(True) thread.start() threads.append(thread) #time.sleep(2) def process_crawler(): num_cpus = multiprocessing.cpu_count() start = time.time() print "開始時間:%s" % start processes=[] for i in range(num_cpus): process = multiprocessing.Process(target=thread_crawler) process.start() processes.append(process) for p in processes: p.join() end = time.time() print "結束時間:%s" % end print "爬取61個url,消耗時間:%s" % (int(end - start)) if __name__ == '__main__': # windows系統下運行必須加__name__=="__main__",不然多進程報錯 process_crawler()