接下來會寫一個按照Scrapy框架的原理流程實現自定義的Scrapy框架,然後再看源碼的時候更便於閱讀。html
前戲
Scrapy內部實現併發操做採用的是twisted模塊,簡單實現一個小DEMOreact
from twisted.internet import reactor # 事件循環(終止條件,全部的socket都已經移除) from twisted.web.client import getPage # socket對象(若是下載完成,自動從事件循環中移除...) from twisted.internet import defer # defer.Deferred 特殊的socket對象 (不會發請求,手動移除) def response(body): print(body) @defer.inlineCallbacks def task(): url = 'http://www.baidu.com' d = getPage(url.encode('utf-8')) d.addCallback(response) yield d task() reactor.run() # 開啓事件循環
在 Twisted 中,有一種特殊的對象用於實現事件循環。這個對象叫作 reactor。能夠把反應器(reactor)想象爲 Twisted 程序的中樞神經。除了分發事件循環以外,反應器還作不少重要的工做:定時任務、線程、創建網絡鏈接、監聽鏈接。爲了讓反應器能夠正常工做,須要啓動事件循環。web
from twisted.internet import reactor # 事件循環(終止條件,全部的socket都已經移除) from twisted.web.client import getPage # socket對象(若是下載完成,自動從事件循環中移除...) from twisted.internet import defer # defer.Deferred 特殊的socket對象 (不會發請求,手動移除) ######################### # 1.利用getPage建立socket # 2.將socket添加到事件循環中 # 3.開始事件循環(自動結束) ######################### def response(content): print(content) # 該裝飾器裝飾的內容,只要yield是一個阻塞的對象都會轉交給reactor接手 @defer.inlineCallbacks def task(): url = "http://www.baidu.com" d = getPage(url.encode('utf-8')) d.addCallback(response) yield d url = "http://www.baidu.com" d = getPage(url.encode('utf-8')) d.addCallback(response) yield d def done(*args,**kwargs): reactor.stop() li = [] for i in range(10): d = task() li.append(d) # DeferredList也屬於defer的對象,也會轉交給reactor接手 dd = defer.DeferredList(li) # 給它增長了一個回調函數 dd.addBoth(done) reactor.run()
自定製爬蟲CrazyScrapy
from twisted.internet import reactor # 事件循環(終止條件,全部的socket都已經移除) from twisted.web.client import getPage # socket對象(若是下載完成,自動從時間循環中移除...) from twisted.internet import defer # defer.Deferred 特殊的socket對象 (不會發請求,手動移除) # 自定義一個Request 類 class Request(object): def __init__(self, url, callback): """ 初始化接受url和callback回調函數 :param url: 請求的url :param callback: 獲取內容後的callback """ self.url = url self.callback = callback # 響應對象 class HttpResponse(object): def __init__(self, content, request): """ 初始化相應內容 :param content: 下載 下來的響應的content :param request: response對應的request """ # 響應的內容 self.content = content # 響應的請求 self.request = request # response對應的request self.url = request.url # 將內容轉換爲文本 self.text = str(content, encoding='utf-8') class ChoutiSpider(object): """ 初始化頂一個小蜘蛛 """ name = 'chouti' # 蜘蛛一開始的執行方法 def start_requests(self): start_url = ['http://www.baidu.com', 'http://www.bing.com', ] for url in start_url: yield Request(url, self.parse) # 收到response後的解析函數 def parse(self, response): print(response) # response是下載的頁面 yield Request('http://www.cnblogs.com', callback=self.parse) import queue # 這裏是調度器 Q = queue.Queue() # 定義了一個引擎類 class Engine(object): def __init__(self): # 引擎關閉 self._close = None # 最大的請求數 self.max = 5 # 正在爬的請求 self.crawlling = [] # 拿着相應的回調函數 def get_response_callback(self, content, request): """ :param content: 響應的content :param request: 響應對應的request :return: """ # 一旦執行回調函數,就能夠從調度中拿走這個請求 self.crawlling.remove(request) # 將內容封裝成 一個 HttpResponse對象 rep = HttpResponse(content, request) # 調用請求時的回調函數,將封裝的HttpResponse傳遞進去 result = request.callback(rep) import types # 查看回調函數是否繼續返回迭代器對象 if isinstance(result, types.GeneratorType): # 將回調函數 新的請求放到調度器 for req in result: Q.put(req) # 從調度器取請求,執行,下載,並控制最大併發數 def _next_request(self): """ 去取request對象,併發送請求 最大併發數限制 :return: """ print(self.crawlling, Q.qsize()) # 若是調度器的長度爲0,並且處於正在爬取的數目也爲 0 ,那麼就說明該關閉了 if Q.qsize() == 0 and len(self.crawlling) == 0: # 直接調用 defer.Deferred().callback(None)就會關閉defer self._close.callback(None) return # 若是正在爬取的數目超過了最大的併發限制,直接返回 if len(self.crawlling) >= self.max: return # 若是沒有達到併發限制,就執行如下內容 while len(self.crawlling) < self.max: try: # 從 調度器 取一個請求 任務 req = Q.get(block=False) # 把拿到的請求放到 正在爬取的列表中 self.crawlling.append(req) # 獲取相應的頁面 d = getPage(req.url.encode('utf-8')) # 頁面下載完成,get_response_callback,調用用戶spider中定義的parse方法,而且將新請求添加到調度器 d.addCallback(self.get_response_callback, req) # 未達到最大併發數,能夠再去調度器中獲取Request # 繼續給d添加回調函數,這個回調函數能夠是匿名函數 d.addCallback(lambda _: reactor.callLater(0, self._next_request)) except Exception as e: print(e) return @defer.inlineCallbacks def crawl(self, spider): # 將start_requests包含的生成器,初始Request對象添加到調度器 start_requests = iter(spider.start_requests()) while True: try: # 拿到每一個request,放到調度器中 request = next(start_requests) Q.put( request) except StopIteration as e: break # 去調度器中取request,併發送請求 # self._next_request() reactor.callLater(0, self._next_request) # 初始化self._close self._close = defer.Deferred() yield self._close # 初始化一個抽屜爬蟲 spider = ChoutiSpider() _active = set() # 實例化一個引擎對象 engine = Engine() # 引擎對象 調用 crawl方法,運行指定的spider d = engine.crawl(spider) # 將crawl方法放到set中 _active.add(d) # 實例化一個DeferredList,將_active 內容放進去,返回一個defer.Deferred()對象,若defer.Deferred()被關閉,dd就爲空 dd = defer.DeferredList(_active) # 一旦dd裏面爲空,就調用reactor.stop()方法 dd.addBoth(lambda a: reactor.stop()) # 讓它run起來 reactor.run()
定製版CrazyScrapy:點擊下載網絡
更多文檔參見:http://scrapy-chs.readthedocs.io/zh_CN/latest/index.html併發