Python之Scrapy框架源碼解析

接下來會寫一個按照Scrapy框架的原理流程實現自定義的Scrapy框架,然後再看源碼的時候更便於閱讀。html

前戲

Scrapy內部實現併發操做採用的是twisted模塊,簡單實現一個小DEMOreact

from twisted.internet import reactor    # 事件循環(終止條件,全部的socket都已經移除)
from twisted.web.client import getPage  # socket對象(若是下載完成,自動從事件循環中移除...)
from twisted.internet import defer      # defer.Deferred 特殊的socket對象 (不會發請求,手動移除)


def response(body):
    print(body)


@defer.inlineCallbacks
def task():
    url = 'http://www.baidu.com'
    d = getPage(url.encode('utf-8'))
    d.addCallback(response)
    yield d

task()
reactor.run()  # 開啓事件循環

在 Twisted 中,有一種特殊的對象用於實現事件循環。這個對象叫作 reactor。能夠把反應器(reactor)想象爲 Twisted 程序的中樞神經。除了分發事件循環以外,反應器還作不少重要的工做:定時任務、線程、創建網絡鏈接、監聽鏈接。爲了讓反應器能夠正常工做,須要啓動事件循環。web

from twisted.internet import reactor    # 事件循環(終止條件,全部的socket都已經移除)
from twisted.web.client import getPage  # socket對象(若是下載完成,自動從事件循環中移除...)
from twisted.internet import defer      # defer.Deferred 特殊的socket對象 (不會發請求,手動移除)


#########################
# 1.利用getPage建立socket
# 2.將socket添加到事件循環中
# 3.開始事件循環(自動結束)
#########################
def response(content):
    print(content)

# 該裝飾器裝飾的內容,只要yield是一個阻塞的對象都會轉交給reactor接手
@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode('utf-8'))
    d.addCallback(response)
    yield d
    url = "http://www.baidu.com"
    d = getPage(url.encode('utf-8'))
    d.addCallback(response)
    yield d

def done(*args,**kwargs):
    reactor.stop()

li = []
for i in range(10):
    d = task()
    li.append(d)
# DeferredList也屬於defer的對象,也會轉交給reactor接手
dd = defer.DeferredList(li)
# 給它增長了一個回調函數
dd.addBoth(done)

reactor.run()

自定製爬蟲CrazyScrapy

from twisted.internet import reactor     # 事件循環(終止條件,全部的socket都已經移除)
from twisted.web.client import getPage  # socket對象(若是下載完成,自動從時間循環中移除...)
from twisted.internet import defer      # defer.Deferred 特殊的socket對象 (不會發請求,手動移除)


# 自定義一個Request 類
class Request(object):
    def __init__(self, url, callback):
        """
        初始化接受url和callback回調函數
        :param url: 請求的url
        :param callback: 獲取內容後的callback
        """
        self.url = url
        self.callback = callback


# 響應對象
class HttpResponse(object):
    def __init__(self, content, request):
        """
        初始化相應內容
        :param content: 下載 下來的響應的content
        :param request: response對應的request
        """
        # 響應的內容
        self.content = content
        # 響應的請求
        self.request = request
        # response對應的request
        self.url = request.url
        # 將內容轉換爲文本
        self.text = str(content, encoding='utf-8')


class ChoutiSpider(object):
    """
    初始化頂一個小蜘蛛
    """
    name = 'chouti'

    # 蜘蛛一開始的執行方法
    def start_requests(self):
        start_url = ['http://www.baidu.com', 'http://www.bing.com', ]
        for url in start_url:
            yield Request(url, self.parse)

    # 收到response後的解析函數
    def parse(self, response):
        print(response)  # response是下載的頁面
        yield Request('http://www.cnblogs.com', callback=self.parse)



import queue
# 這裏是調度器
Q = queue.Queue()


# 定義了一個引擎類
class Engine(object):
    def __init__(self):
        # 引擎關閉
        self._close = None
        # 最大的請求數
        self.max = 5
        # 正在爬的請求
        self.crawlling = []

    # 拿着相應的回調函數
    def get_response_callback(self, content, request):
        """

        :param content: 響應的content
        :param request: 響應對應的request
        :return:
        """
        # 一旦執行回調函數,就能夠從調度中拿走這個請求
        self.crawlling.remove(request)
        # 將內容封裝成 一個 HttpResponse對象
        rep = HttpResponse(content, request)
        # 調用請求時的回調函數,將封裝的HttpResponse傳遞進去
        result = request.callback(rep)
        import types
        # 查看回調函數是否繼續返回迭代器對象
        if isinstance(result, types.GeneratorType):
            # 將回調函數 新的請求放到調度器
            for req in result:
                Q.put(req)

    # 從調度器取請求,執行,下載,並控制最大併發數
    def _next_request(self):
        """
        去取request對象,併發送請求
        最大併發數限制
        :return:
        """
        print(self.crawlling, Q.qsize())
        # 若是調度器的長度爲0,並且處於正在爬取的數目也爲 0 ,那麼就說明該關閉了
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            # 直接調用 defer.Deferred().callback(None)就會關閉defer
            self._close.callback(None)
            return

        # 若是正在爬取的數目超過了最大的併發限制,直接返回
        if len(self.crawlling) >= self.max:
            return
        # 若是沒有達到併發限制,就執行如下內容
        while len(self.crawlling) < self.max:
            try:
                # 從 調度器 取一個請求 任務
                req = Q.get(block=False)
                # 把拿到的請求放到 正在爬取的列表中
                self.crawlling.append(req)
                # 獲取相應的頁面
                d = getPage(req.url.encode('utf-8'))
                # 頁面下載完成,get_response_callback,調用用戶spider中定義的parse方法,而且將新請求添加到調度器
                d.addCallback(self.get_response_callback, req)
                # 未達到最大併發數,能夠再去調度器中獲取Request
                # 繼續給d添加回調函數,這個回調函數能夠是匿名函數
                d.addCallback(lambda _: reactor.callLater(0, self._next_request))
            except Exception as e:
                print(e)
                return

    @defer.inlineCallbacks
    def crawl(self, spider):
        # 將start_requests包含的生成器,初始Request對象添加到調度器
        start_requests = iter(spider.start_requests())
        while True:
            try:
                # 拿到每一個request,放到調度器中
                request = next(start_requests)
                Q.put( request)
            except StopIteration as e:
                break

        # 去調度器中取request,併發送請求
        # self._next_request()

        reactor.callLater(0, self._next_request)
        # 初始化self._close
        self._close = defer.Deferred()
        yield self._close

# 初始化一個抽屜爬蟲
spider = ChoutiSpider()

_active = set()
# 實例化一個引擎對象
engine = Engine()

# 引擎對象 調用 crawl方法,運行指定的spider
d = engine.crawl(spider)

# 將crawl方法放到set中
_active.add(d)
# 實例化一個DeferredList,將_active 內容放進去,返回一個defer.Deferred()對象,若defer.Deferred()被關閉,dd就爲空
dd = defer.DeferredList(_active)

# 一旦dd裏面爲空,就調用reactor.stop()方法
dd.addBoth(lambda a: reactor.stop())

# 讓它run起來
reactor.run()
DEMO

定製版CrazyScrapy:點擊下載網絡

更多文檔參見:http://scrapy-chs.readthedocs.io/zh_CN/latest/index.html併發

相關文章
相關標籤/搜索