Python Twisted、Reactor

cataloguehtml

1. Twisted理論基礎
2. 異步編程模式與Reactor
3. Twisted網絡編程
4. reactor進程管理編程
5. Twisted併發鏈接

 

1. Twisted理論基礎node

0x1: 異步編程模型python

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程react

異步模型

在這個模型中,任務是交錯完成,值得注意的是: 這是在單線程的控制下。這要比多線程模型簡單多了,由於編程人員總能夠認爲只有一個任務在執行,而其它的在中止狀態
在異步編程模型與多線程模型之間還有一個不一樣git

1. 在多線程程序中,對於中止某個線程啓動另一個線程,其決定權並不在程序員手裏而在操做系統那裏,所以,程序員在編寫程序過程當中必需要假設在任什麼時候候一個線程都有可能被中止而啓動另一個線程
2. 相反,在異步模型中,全部事件是以異步的方式到達的,而後CPU一樣以異步的方式從Cache隊列中取出事件進行處理,一個任務要想運行必須顯式放棄當前運行的任務的控制權。這也是相比多線程模型來講,最簡潔的地方 

0x2: 異步編程優勢程序員

1. 在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度 

2. 在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙的bug 

與同步模型相比,異步模型的優點在以下狀況下會獲得發揮編程

1. 有大量的任務,以致於能夠認爲在一個時刻至少有一個任務要運行
2. 任務執行大量的I/O操做,這樣同步模型就會在由於任務阻塞而浪費大量的時間
3. 任務之間相互獨立,以致於任務內部的交互不多 
//這些條件大多在CS模式中的網絡比較繁忙的服務器端出現(如WEB服務器)

Relevant Link:設計模式

https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p01.html

 

2. 異步編程模式與Reactor安全

1. 異步模式客戶端一次性與所有服務器完成鏈接,而不像同步模式那樣一次只鏈接一個,鏈接完成後等待新事件的到來
2. 用來進行通訊的Socket方法是非阻塞模的,這是經過調用setblocking(0)來實現的 
3. select模塊中的select方法是用來識別其監視的socket是否有完成數據接收的,若是沒有它就處於阻塞狀態。
4. 當從服務器中讀取數據時,會盡可能多地從Socket讀取數據直到它阻塞爲止,而後讀下一個Socket接收的數據(若是有數據接收的話)。這意味着咱們須要跟蹤記錄從不一樣服務器傳送過來數據的接收狀況 

以上過程能夠被設計成爲一個模式: reactor模式服務器

reactor模式

這個循環就是個"reactor"(反應堆),由於它等待事件的發生而後對其做相應的反應。正由於如此,它也被稱做事件循環。因爲交互式系統都要進行I/O操做,所以這種循環也有時被稱做select loop,這是因爲select調用被用來等待I/O操做。所以,在本程序中的select循環中,一個事件的發生意味着一個socket端處有數據來到
值得注意的是,select並非惟一的等待I/O操做的函數,它僅僅是一個比較古老的函數,如今有一些新API能夠完成select的工做並且性能更優,它們已經在不一樣的系統上實現了。不考慮性能上的因素,它們都完成一樣的工做

1. 監視一系列sockets(文件描述符)
2. 並阻塞程序
3. 直到至少有一個準備好的I/O操做 

一個真正reactor模式的實現是須要實現循環獨立抽象出來並具備以下的功能

1. 監視一系列與I/O操做相關的文件描述符(description)
2. 不停地彙報那些準備好的I/O操做的文件描述符
3. 處理全部不一樣系統會出現的I/O事件
4. 提供優雅的抽象來幫助在使用reactor時少花些心思去考慮它的存在
5. 提供能夠在抽象層外使用的公共協議實現 

0x1: Twisted中的異步事件模型

Twisted實現了設計模式中的反應堆(reactor)模式,這種模式在單線程環境中調度多個事件源產生的事件到它們各自的事件處理例程中去
Twisted的核心就是reactor事件循環。Reactor能夠感知網絡、文件系統以及定時器事件。它等待而後處理這些事件,從特定於平臺的行爲中抽象出來,並提供統一的接口,使得在網絡協議棧的任何位置對事件作出響應都變得簡單
基本上reactor完成的任務就是

while True:
    timeout = time_until_next_timed_event()
    events = wait_for_events(timeout)
    events += timed_events_until(now())
    for event in events:
        event.process()

Twisted目前在全部平臺上的默認reactor都是基於poll API的。此外,Twisted還支持一些特定於平臺的高容量多路複用API。這些reactor包括基於FreeBSD中kqueue機制的KQueue reactor,支持epoll接口的系統(目前是Linux 2.6)中的epoll reactor,以及基於Windows下的輸入輸出完成端口的IOCP reactor
在實現輪詢的相關細節中,Twisted須要考慮的包括

1. 網絡和文件系統的限制
2. 緩衝行爲
3. 如何檢測鏈接丟失
4. 出現錯誤時的返回值

Twisted的reactor實現同時也考慮了正確使用底層的非阻塞式API,並正確處理各類邊界狀況。因爲Python中沒有暴露出IOCP API,所以Twisted須要維護本身的實現

0x2: Deferreds

Deferred對象以抽象化的方式表達了一種思想,即結果還尚不存在。它一樣可以幫助管理產生這個結果所須要的回調鏈。當從函數中返回時,Deferred對象承諾在某個時刻函數將產生一個結果。返回的Deferred對象中包含全部註冊到事件上的回調引用,所以在函數間只須要傳遞這一個對象便可,跟蹤這個對象比單獨管理全部的回調要簡單的多
Deferred對象包含一對回調鏈

1. 一個是針對操做成功的回調
2. 一個是針對操做失敗的回調

初始狀態下Deferred對象的兩條鏈都爲空。在事件處理的過程當中,每一個階段都爲其添加處理成功的回調和處理失敗的回調。當一個異步結果到來時,Deferred對象就被"激活",那麼處理成功的回調和處理失敗的回調就能夠以合適的方式按照它們添加進來的順序依次獲得調用

0x3: Transports

Transports表明網絡中兩個通訊結點之間的鏈接。Transports負責描述鏈接的細節,好比鏈接是面向流式的仍是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可做爲transports的例子。它們被設計爲」知足最小功能單元,同時具備最大程度的可複用性「,並且從協議實現中分離出來,這讓許多協議能夠採用相同類型的傳輸。Transports實現了ITransports接口,它包含以下的方法

1. write: 以非阻塞的方式按順序依次將數據寫到物理鏈接上
2. writeSequence: 將一個字符串列表寫到物理鏈接上
3. loseConnection: 將全部掛起的數據寫入,而後關閉鏈接
4. getPeer: 取得鏈接中對端的地址信息
5. getHost: 取得鏈接中本端的地址信息

將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。能夠經過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查

0x4: Protocols

Protocols描述瞭如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含以下的方法

1. makeConnection: 在transport對象和服務器之間創建一條鏈接
2. connectionMade: 鏈接創建起來後調用
3. dataReceived: 接收數據時調用
4. connectionLost: 關閉鏈接時調用

Relevant Link:

https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p02.html
https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p04.html
http://blog.csdn.net/hanhuili/article/details/9389433
http://blog.sina.com.cn/s/blog_704b6af70100py9n.html

 

3. Twisted網絡編程

0x1: Writing Servers

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class Chat(LineReceiver):

    def __init__(self, users):
        self.users = users
        self.name = None
        self.state = "GETNAME"

    def connectionMade(self):
        self.sendLine("What's your name?")

    def connectionLost(self, reason):
        if self.name in self.users:
            del self.users[self.name]

    def lineReceived(self, line):
        if self.state == "GETNAME":
            self.handle_GETNAME(line)
        else:
            self.handle_CHAT(line)

    def handle_GETNAME(self, name):
        if name in self.users:
            self.sendLine("Name taken, please choose another.")
            return
        self.sendLine("Welcome, %s!" % (name,))
        self.name = name
        self.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "<%s> %s" % (self.name, message)
        for name, protocol in self.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)


class ChatFactory(Factory):

    def __init__(self):
        self.users = {} # maps user names to Chat instances

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8123, ChatFactory())
reactor.run()

0x2: Writing Clients

Twisted is a framework designed to be very flexible, and let you write powerful clients. The cost of this flexibility is a few layers in the way to writing your client

1. single-use clients

In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol. In those cases twisted.internet.endpoints provides the appropriate API, and in particular connectProtocol which takes a protocol instance rather than a factory.

from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol

class Greeter(Protocol):
    def sendMessage(self, msg):
        self.transport.write("MESSAGE %s\n" % msg)

def gotProtocol(p):
    p.sendMessage("Hello")
    reactor.callLater(1, p.sendMessage, "This is sent in a second")
    reactor.callLater(2, p.transport.loseConnection)

point = TCP4ClientEndpoint(reactor, "localhost", 1234)
d = connectProtocol(point, Greeter())
d.addCallback(gotProtocol)
reactor.run()

2. ClientFactory

Still, there’s plenty of code out there that uses lower-level APIs, and a few features (such as automatic reconnection) have not been re-implemented with endpoints yet, so in some cases they may be more convenient to use.
To use the lower-level connection APIs, you will need to call one of the reactor.connect* methods directly. For these cases, you need a ClientFactory . The ClientFactory is in charge of creating the Protocol and also receives events relating to the connection state. This allows it to do things like reconnect in the event of a connection error

from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ClientFactory
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)

class EchoClientFactory(ClientFactory):
    def startedConnecting(self, connector):
        print 'Started to connect.'

    def buildProtocol(self, addr):
        print 'Connected.'
        return Echo()

    def clientConnectionLost(self, connector, reason):
        print 'Lost connection.  Reason:', reason

    def clientConnectionFailed(self, connector, reason):
        print 'Connection failed. Reason:', reason

reactor.connectTCP(host, port, EchoClientFactory())
reactor.run()

3. A Higher-Level Example: ircLogBot

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


"""
An example IRC log bot - logs a channel's events to a file.

If someone says the bot's name in the channel followed by a ':',
e.g.

    <foo> logbot: hello!

the bot will reply:

    <logbot> foo: I am a log bot

Run this script with two arguments, the channel name the bot should
connect to, and file to log to, e.g.:

    $ python ircLogBot.py test test.log

will log channel #test to the file 'test.log'.

To run the script:

    $ python ircLogBot.py <channel> <file>
"""


# twisted imports
from twisted.words.protocols import irc
from twisted.internet import reactor, protocol
from twisted.python import log

# system imports
import time, sys


class MessageLogger:
    """
    An independent logger class (because separation of application
    and protocol logic is a good thing).
    """
    def __init__(self, file):
        self.file = file

    def log(self, message):
        """Write a message to the file."""
        timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time()))
        self.file.write('%s %s\n' % (timestamp, message))
        self.file.flush()

    def close(self):
        self.file.close()


class LogBot(irc.IRCClient):
    """A logging IRC bot."""
    
    nickname = "twistedbot"
    
    def connectionMade(self):
        irc.IRCClient.connectionMade(self)
        self.logger = MessageLogger(open(self.factory.filename, "a"))
        self.logger.log("[connected at %s]" % 
                        time.asctime(time.localtime(time.time())))

    def connectionLost(self, reason):
        irc.IRCClient.connectionLost(self, reason)
        self.logger.log("[disconnected at %s]" % 
                        time.asctime(time.localtime(time.time())))
        self.logger.close()


    # callbacks for events

    def signedOn(self):
        """Called when bot has succesfully signed on to server."""
        self.join(self.factory.channel)

    def joined(self, channel):
        """This will get called when the bot joins the channel."""
        self.logger.log("[I have joined %s]" % channel)

    def privmsg(self, user, channel, msg):
        """This will get called when the bot receives a message."""
        user = user.split('!', 1)[0]
        self.logger.log("<%s> %s" % (user, msg))
        
        # Check to see if they're sending me a private message
        if channel == self.nickname:
            msg = "It isn't nice to whisper!  Play nice with the group."
            self.msg(user, msg)
            return

        # Otherwise check to see if it is a message directed at me
        if msg.startswith(self.nickname + ":"):
            msg = "%s: I am a log bot" % user
            self.msg(channel, msg)
            self.logger.log("<%s> %s" % (self.nickname, msg))

    def action(self, user, channel, msg):
        """This will get called when the bot sees someone do an action."""
        user = user.split('!', 1)[0]
        self.logger.log("* %s %s" % (user, msg))

    # irc callbacks

    def irc_NICK(self, prefix, params):
        """Called when an IRC user changes their nickname."""
        old_nick = prefix.split('!')[0]
        new_nick = params[0]
        self.logger.log("%s is now known as %s" % (old_nick, new_nick))


    # For fun, override the method that determines how a nickname is changed on
    # collisions. The default method appends an underscore.
    def alterCollidedNick(self, nickname):
        """
        Generate an altered version of a nickname that caused a collision in an
        effort to create an unused related name for subsequent registration.
        """
        return nickname + '^'



class LogBotFactory(protocol.ClientFactory):
    """A factory for LogBots.

    A new protocol instance will be created each time we connect to the server.
    """

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

    def buildProtocol(self, addr):
        p = LogBot()
        p.factory = self
        return p

    def clientConnectionLost(self, connector, reason):
        """If we get disconnected, reconnect to server."""
        connector.connect()

    def clientConnectionFailed(self, connector, reason):
        print "connection failed:", reason
        reactor.stop()


if __name__ == '__main__':
    # initialize logging
    log.startLogging(sys.stdout)
    
    # create factory protocol and application
    f = LogBotFactory(sys.argv[1], sys.argv[2])

    # connect factory to this host and port
    reactor.connectTCP("irc.freenode.net", 6667, f)

    # run bot
    reactor.run()

4. Persistent Data in the Factory

When the protocol is created, it gets a reference to the factory as self.factory . It can then access attributes of the factory in its logic.
Factories have a default implementation of buildProtocol. It does the same thing the example above does using the protocol attribute of the factory to create the protocol instance. In the example above, the factory could be rewritten to look like this:

class LogBotFactory(protocol.ClientFactory):
    protocol = LogBot

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

Relevant Link:

http://twisted.readthedocs.org/en/latest/core/howto/clients.html

 

4. reactor進程管理編程

Along with connection to servers across the internet, Twisted also connects to local processes with much the same API.
須要明白的是,reactor是一個編程範式,Twisted是基於這種異步事件編程模型實現的網絡編程框架,一樣的,reactor異步事件編程模型還能夠用在進程時間管理上

0x1: Example

#!/usr/bin/env python

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.internet import protocol
from twisted.internet import reactor
import re

class MyPP(protocol.ProcessProtocol):
    def __init__(self, verses):
        self.verses = verses
        self.data = ""
    def connectionMade(self):
        print "connectionMade!"
        for i in range(self.verses):
            self.transport.write("Aleph-null bottles of beer on the wall,\n" +
                                 "Aleph-null bottles of beer,\n" +
                                 "Take one down and pass it around,\n" +
                                 "Aleph-null bottles of beer on the wall.\n")
        self.transport.closeStdin() # tell them we're done
    def outReceived(self, data):
        print "outReceived! with %d bytes!" % len(data)
        self.data = self.data + data
    def errReceived(self, data):
        print "errReceived! with %d bytes!" % len(data)
    def inConnectionLost(self):
        print "inConnectionLost! stdin is closed! (we probably did it)"
    def outConnectionLost(self):
        print "outConnectionLost! The child closed their stdout!"
        # now is the time to examine what they wrote
        #print "I saw them write:", self.data
        (dummy, lines, words, chars, file) = re.split(r'\s+', self.data)
        print "I saw %s lines" % lines
    def errConnectionLost(self):
        print "errConnectionLost! The child closed their stderr."
    def processExited(self, reason):
        print "processExited, status %d" % (reason.value.exitCode,)
    def processEnded(self, reason):
        print "processEnded, status %d" % (reason.value.exitCode,)
        print "quitting"
        reactor.stop()

pp = MyPP(10)
reactor.spawnProcess(pp, "wc", ["wc"], {})
reactor.run()

0x2: Example

class GPGProtocol(ProcessProtocol):
    def __init__(self, crypttext):
        self.crypttext = crypttext
        self.plaintext = ""
        self.status = ""
    def connectionMade(self):
        self.transport.writeToChild(3, self.passphrase)
        self.transport.closeChildFD(3)
        self.transport.writeToChild(0, self.crypttext)
        self.transport.closeChildFD(0)
    def childDataReceived(self, childFD, data):
        if childFD == 1: self.plaintext += data
        if childFD == 4: self.status += data
    def processEnded(self, status):
        rc = status.value.exitCode
        if rc == 0:
            self.deferred.callback(self)
        else:
            self.deferred.errback(rc)

def decrypt(crypttext):
    gp = GPGProtocol(crypttext)
    gp.deferred = Deferred()
    cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4",
           "--batch"]
    p = reactor.spawnProcess(gp, cmd[0], cmd, env=None,
                             childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"})
    return gp.deferred

Relevant Link:

http://twistedmatrix.com/documents/12.2.0/core/howto/process.html

 

5. Twisted併發鏈接

Some time back I had to write a network server which need to support ~50K concurrent clients in a single box. Server-Client communication used a propitiatory protocol on top of TCP where RawBinaryData Struct is used as the messaging format. Clients exchanged periodic keep-alives which server used to check health state. As most of the operations were IO based(socket/db) we decided to used python/twisted to implement server.
On performing load tests we found that server is able to handle only 1024 client after which connections are failing. Increased per process max open files (1024) to 100000 (ulimit -n 100000) and still the connections failed at 1024.

0x1: select limitation

select fails after 1024 fds as FD_SETSIZE max to 1024. Twisted's default reactor seems to be based on select. As a natural progression poll was tried next to over come max open fd issue.

0x2: poll limitation

poll solves the max fd issue. But as the number of concurrent clients started increasing, performance dropped drastically. Poll implementation does O(n) operations internally and performance drops as number of fds increases.

0x3: epoll

Epoll reactor solved both problems and gave awesome performance. libevent is another library build on top of epoll.

0x4: Async frameworks

do not waste time with 'select/poll' based approaches if the number of concurrent connection expected is above 1K.  Following are some of the event-loop based frameworks where this is applicable.

1. Eventlet (python)
2. Gevent (python) is similar to eventlet uses libevent which is build on top of epoll.
3. C++ ACE 
4. Java Netty
5. Ruby Eventmachine

0x5: Choosing a Reactor and GUI Toolkit Integration(new twisted)

Twisted provides a variety of implementations of the twisted.internet.reactor. The specialized implementations are suited for different purposes and are designed to integrate better with particular platforms.
The epoll()-based reactor is Twisted's default on Linux. Other platforms use poll(), or the most cross-platform reactor, select().
Platform-specific reactor implementations exist for:

Poll for Linux
Epoll for Linux 2.6
WaitForMultipleObjects (WFMO) for Win32
Input/Output Completion Port (IOCP) for Win32
KQueue for FreeBSD and Mac OS X
CoreFoundation for Mac OS X

1. Select()-based Reactor

The select reactor is the default on platforms that don't provide a better alternative that covers all use cases. If the select reactor is desired, it may be installed via:

from twisted.internet import selectreactor
selectreactor.install()

from twisted.internet import reactor

2. Poll-based Reactor

The PollReactor will work on any platform that provides select.poll. With larger numbers of connected sockets, it may provide for better performance than the SelectReactor.

from twisted.internet import pollreactor
pollreactor.install()

from twisted.internet import reactor

3. KQueue

The KQueue Reactor allows Twisted to use FreeBSD's kqueue mechanism for event scheduling

from twisted.internet import kqreactor
kqreactor.install()

from twisted.internet import reactor

4. WaitForMultipleObjects (WFMO) for Win32

from twisted.internet import win32eventreactor
win32eventreactor.install()

from twisted.internet import reactor

5. Input/Output Completion Port (IOCP) for Win32

Windows provides a fast, scalable event notification system known as IO Completion Ports, or IOCP for short. Twisted includes a reactor based on IOCP which is nearly complete.

from twisted.internet import iocpreactor
iocpreactor.install()

from twisted.internet import reactor

6. Epoll-based Reactor

The EPollReactor will work on any platform that provides epoll, today only Linux 2.6 and over. The implementation of the epoll reactor currently uses the Level Triggered interface, which is basically like poll() but scales much better.

from twisted.internet import epollreactor
epollreactor.install()

from twisted.internet import reactor

Relevant Link:

https://moythreads.com/wordpress/2009/12/22/select-system-call-limitation/
http://pipeit.blogspot.com/2011/07/select-poll-and-epoll-twisted-story.html
http://twistedmatrix.com/documents/13.2.0/core/howto/choosing-reactor.html#auto2

 

Copyright (c) 2016 LittleHann All rights reserved

相關文章
相關標籤/搜索