zg手冊 之 twisted 開發(2)-- Deferreds 組件

Deferreds 異步回調序列

Deferred 本質上是一個回調函數的集合,twisted 提供了對函數延遲調用的機制。python

在 twisted 內部,使用 Deferred 對象管理回調序列。當異步請求結果返回時,使用 Deferred 對象調用回調序列中的函數。react

這裏經過幾個例子來了解 Deferreds 使用的方式和工做的原理。web


先看一個簡單的例子

from twisted.internet import reactor, defer

def getDummyData(x):
    """建立一個 Deferred 對象,並返回這個對象"""
    d = defer.Deferred()
    # 2 秒鐘後執行 Deferred 回調函數序列,把 x * 3 做爲參數傳遞給回調序列中的第一個函數
    # reactor.callLater 是一個定時延遲調用的方法
    reactor.callLater(2, d.callback, x * 3)
    return d

def printData(result):
    """    打印結果    """
    print resultd = getDummyData(3)

# 添加回調函數到回調函數序列中
d.addCallback(printData)

# 4 秒鐘後中止 reactor 循環(退出進程)
reactor.callLater(4, reactor.stop)

# 開始 Twisted reactor 事件循環
reactor.run()

使用 Deferred 的方式:

  1. d = defer.Deferred(), 建立 Deferred 對象。網絡

  2. d.addCallback(printData), 添加回調函數,能夠添加多個函數到回調序列中。app

  3. d.callback(result),回調序列開始執行(使用result做爲參數)。python2.7


在 Deferred 中,d.addCallbacks/a.addBoth 等函數均可以添加回調函數到序列中。還能夠添加 errback(錯誤回調函數)在回調函數出錯時進行處理。機器學習


Deferred 的內部機制

雖然調用 addCallback 的時候只傳入參數 callback, 可是回調序列中 callback/errback 老是成對出現的。twisted 內部的源代碼以下。異步

def addCallbacks(self, callback, errback=None,
                      callbackArgs=None, callbackKeywords=None,
                      errbackArgs=None, errbackKeywords=None):
         """
         Add a pair of callbacks (success and error) to this L{Deferred}.
         These will be executed when the 'master' callback is run.
         @return: C{self}.
         @rtype: a L{Deferred}"""
         assert callable(callback)
         assert errback == None or callable(errback)

         # 主要是這裏成對添加 callback/errback
         cbs = ((callback, callbackArgs, callbackKeywords),
                (errback or (passthru), errbackArgs, errbackKeywords))
         self.callbacks.append(cbs)
 
         if self.called:
             self._runCallbacks()
         return self


當回調序列中的函數發生異常,或者返回twisted.python.failure.Failure 實例時, 下一個callback/errback回調函數對中的錯誤回調函數會被調用。函數

錯誤回調函數的例子:學習

#!/usr/bin/env python
# coding: utf-8

from twisted.internet import defer, reactor
from pprint import pprint

def cback1(result):
    raise Exception('cback1')

def cerr1(failure):
    print 'cerr1: %s' % str(failure)

def cback2(result):
    return 'succ cback2'

def cerr2(failure):
    print 'cerr2: %s' % str(failure)

d = defer.Deferred()
d.addCallbacks(callback=cback1, errback=cerr1)
d.addCallbacks(callback=cback2, errback=cerr2)
d.addCallback(callback=lambda ign:pprint('cback3'))

# 執行回調函數序列
d.callback('run')

# 退出事件循環
reactor.callLater(2, reactor.stop)
reactor.run()

運行結果

user@host:~/test $ python testErr.py
cerr2: [Failure instance: Traceback: <type 'exceptions.Exception'>: cback1
testErr.py:26:<module>
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:382:callback
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:490:_startRunCallbacks
--- <exception caught here> ---
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:577:_runCallbacks
testErr.py:8:cback1]'cback3'

從例子中看運行機制:

  1. 在回調序列執行過程當中,當前回調函數的返回值就是下一個回調函數的第一個參數,d.callback('run') 使用字符串'run'做爲第一個回調函數的參數。

  2. 當回調函數 cback1 出現異常,下一個錯誤處理函數 cerr2 被調用。

  3. 當 cerr2 返回後,若是返回值不是 twisted.python.failure.Failure, 調用將轉回到下個 callback ( cback3 )函數,若是返回值是 Failure 對象,則繼續調用下一個的錯誤處理函數。


Deferred 結構圖


Deferred 調用圖


Deferred 回調函數的參數傳遞和回調函數內異步操做方法

添加到 Deferred 中的回調函數能夠傳入參數,而且在回調函數內能夠執行異步操做(異步網絡請求)。

下面是一個從 京東 獲取書籍信息的例子: 先從搜索的列表頁獲取一個書籍信息,再從詳情頁獲取詳細信息。

#!/usr/bin/env python
# coding: utf-8

import sys
import bs4 as bs

from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.python.util import println
from twisted.internet import defer

class Goods(object):
    '''商品信息分析類'''
    _urlFmt='http://search.jd.com/search?keyword=%s&enc=utf-8&rt=1&book=y&area=1&wtype=1'

    _name=''
    _id=''
    _price=0
    _detail=''
    _detailUrl=''

    def __init__(self, keyword):
        self._url = self._urlFmt % keyword

    def getGoodsUrl(self):
        return self._url

    def getGoodsDetailUrl(self):
        return self._detailUrl

    def getGoods(self, page):
        soup = bs.BeautifulSoup(page)

        # 獲取商品名稱,詳情頁url
        infos = soup.find_all(class_='info')
        info = infos[0].select('dt.p-name > a')[0]
        self._name = info.get_text().strip()
        self._detailUrl = info['href']

    def getGoodsDetail(self, page):
        #print page
        soup = bs.BeautifulSoup(page)

        # 獲取價格
        price = soup.select('#summary-price strong')[0]
        self._price = price.get_text().strip()

        # 獲取詳細信息
        details = soup.select('#product-detail-1')[0].strings
        self._detail = ''.join(details)
        return True

    def printResult(self, IsPrint=True):
        if IsPrint:
            print self._name
            print self._price
            print self._detailUrl
            print ' ' * 10, self._detail

def main():
    errback=lambda error:(println("an error occurred", error),reactor.stop())
    def goodsCallback(page, goods):
        ''''''
        #print page
        goods.getGoods(page)
        d2 = getPage(goods.getGoodsDetailUrl())
        d2.addCallbacks(callback=goods.getGoodsDetail, errback=errback)
        return d2

    # 根據關鍵詞建立商品信息對象
    goods = Goods(sys.argv[1])

    # 異步查詢商品信息
    d1 = getPage(goods.getGoodsUrl())

    # 添加獲取商品信息的回調函數到 defer 對象
    d1.addCallbacks(callback=goodsCallback, errback=errback, callbackArgs=[goods])
    d1.addCallbacks(callback=lambda successed:(goods.printResult(successed)), errback=errback)

    # 結束主事件監控循環
    d1.addCallbacks(callback=lambda ign: reactor.stop(), errback=errback)

    reactor.run()

if __name__ == '__main__':
    main()

運行例子程序:

python goodsJD.py 機器學習

例子中 Deferred 對象的使用:

  1. d.addCallbacks 的 callbackArgs 參數接收一個 list 做爲添加的回調函數的參數,errbackArgs 也是一個 list, 做爲添加的錯誤處理函數的參數。

  2. 在回調序列中的執行過程當中,若是回調函數返回了一個 Deferred 對象,例如上面的 goodsCallback 函數。在新返回的 d2 回調序列被執行完以前,d1的調用序列會暫停執行,在 d2 被執行完以後,d1 回調序列纔會繼續執行。(這個功能對於嵌套的異步請求很重要,例如我有三個請求A,B,C, 這三個請求有依賴關係,必須根據 A 的響應發送 B 請求,根據 B 的響應發送 C 請求)


DeferredList 對象的使用

有時候須要等待幾個異步事件完成,而後再執行某些任務。這時候 DeferredList 就能夠發揮做用了。

看官網的一個例子,我修改並註釋一些代碼

#!/usr/bin/env python
# coding: utf-8

from twisted.internet import defer

# DeferredList 回調函數,參數是它包含的 Deferred 執行後的結果組成的 list
def printResult(result):
    for (success, value) in result:
        if success:
            print 'Success:', value
        else:
            print 'Failure:', value.getErrorMessage()

# deferred1 的回調函數
def printDeferred(result):
    print 'deferred1: %s' % result
    return 'deferred1: %s' % result

# 建立三個新的 Deferred
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# 打包三個 Deferred 對象到 DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# 添加回調函數到 DeferredList,
# 當它包含的 Deferred 都執行完後,將執行 DeferredList 這個調用序列
dl.addCallback(printResult)

# 注意這個回調函數添加的位置,
# 由於在 defer.DeferredList 後面,因此這個回調將不影響 DeferredList 結果
# 只有添加在 defer.DeferredList 以前,結果纔會傳遞給 defer.DeferredList 的回調序列
deferred1.addCallback(printDeferred)

# 調用三個回調序列
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

運行結果

user@host ~/test/twisted $ python testDeferredList.py
deferred1: one
Success: one
Failure: bang!
Success: three

DeferredList 運行須要注意的地方

  1. DeferredList 打包的回調序列 deferred1,deferred2,deferred3,這幾個序列中的回調函數要在打包以前(建立DeferredList對象以前)添加,(我例子中添加的位置是不正確的!!!)

  2. DeferredList 的回調函數接收的參數是個 list, list 中的每一個元素是 (succ, result),表示 deferred1,deferred2,deferred3 執行成功的標誌和最後結果


原文連接:http://www.hopez.org/blog/9/1403586037

相關文章
相關標籤/搜索