cataloguehtml
1. Twisted理論基礎 2. 異步編程模式與Reactor 3. Twisted網絡編程 4. reactor進程管理編程 5. Twisted併發鏈接
1. Twisted理論基礎node
0x1: 異步編程模型python
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程react
在這個模型中,任務是交錯完成,值得注意的是: 這是在單線程的控制下。這要比多線程模型簡單多了,由於編程人員總能夠認爲只有一個任務在執行,而其它的在中止狀態
在異步編程模型與多線程模型之間還有一個不一樣git
1. 在多線程程序中,對於中止某個線程啓動另一個線程,其決定權並不在程序員手裏而在操做系統那裏,所以,程序員在編寫程序過程當中必需要假設在任什麼時候候一個線程都有可能被中止而啓動另一個線程 2. 相反,在異步模型中,全部事件是以異步的方式到達的,而後CPU一樣以異步的方式從Cache隊列中取出事件進行處理,一個任務要想運行必須顯式放棄當前運行的任務的控制權。這也是相比多線程模型來講,最簡潔的地方
0x2: 異步編程優勢程序員
1. 在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度 2. 在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙的bug
與同步模型相比,異步模型的優點在以下狀況下會獲得發揮編程
1. 有大量的任務,以致於能夠認爲在一個時刻至少有一個任務要運行 2. 任務執行大量的I/O操做,這樣同步模型就會在由於任務阻塞而浪費大量的時間 3. 任務之間相互獨立,以致於任務內部的交互不多 //這些條件大多在CS模式中的網絡比較繁忙的服務器端出現(如WEB服務器)
Relevant Link:設計模式
https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p01.html
2. 異步編程模式與Reactor安全
1. 異步模式客戶端一次性與所有服務器完成鏈接,而不像同步模式那樣一次只鏈接一個,鏈接完成後等待新事件的到來 2. 用來進行通訊的Socket方法是非阻塞模的,這是經過調用setblocking(0)來實現的 3. select模塊中的select方法是用來識別其監視的socket是否有完成數據接收的,若是沒有它就處於阻塞狀態。 4. 當從服務器中讀取數據時,會盡可能多地從Socket讀取數據直到它阻塞爲止,而後讀下一個Socket接收的數據(若是有數據接收的話)。這意味着咱們須要跟蹤記錄從不一樣服務器傳送過來數據的接收狀況
以上過程能夠被設計成爲一個模式: reactor模式服務器
這個循環就是個"reactor"(反應堆),由於它等待事件的發生而後對其做相應的反應。正由於如此,它也被稱做事件循環。因爲交互式系統都要進行I/O操做,所以這種循環也有時被稱做select loop,這是因爲select調用被用來等待I/O操做。所以,在本程序中的select循環中,一個事件的發生意味着一個socket端處有數據來到
值得注意的是,select並非惟一的等待I/O操做的函數,它僅僅是一個比較古老的函數,如今有一些新API能夠完成select的工做並且性能更優,它們已經在不一樣的系統上實現了。不考慮性能上的因素,它們都完成一樣的工做
1. 監視一系列sockets(文件描述符) 2. 並阻塞程序 3. 直到至少有一個準備好的I/O操做
一個真正reactor模式的實現是須要實現循環獨立抽象出來並具備以下的功能
1. 監視一系列與I/O操做相關的文件描述符(description) 2. 不停地彙報那些準備好的I/O操做的文件描述符 3. 處理全部不一樣系統會出現的I/O事件 4. 提供優雅的抽象來幫助在使用reactor時少花些心思去考慮它的存在 5. 提供能夠在抽象層外使用的公共協議實現
0x1: Twisted中的異步事件模型
Twisted實現了設計模式中的反應堆(reactor)模式,這種模式在單線程環境中調度多個事件源產生的事件到它們各自的事件處理例程中去
Twisted的核心就是reactor事件循環。Reactor能夠感知網絡、文件系統以及定時器事件。它等待而後處理這些事件,從特定於平臺的行爲中抽象出來,並提供統一的接口,使得在網絡協議棧的任何位置對事件作出響應都變得簡單
基本上reactor完成的任務就是
while True: timeout = time_until_next_timed_event() events = wait_for_events(timeout) events += timed_events_until(now()) for event in events: event.process()
Twisted目前在全部平臺上的默認reactor都是基於poll API的。此外,Twisted還支持一些特定於平臺的高容量多路複用API。這些reactor包括基於FreeBSD中kqueue機制的KQueue reactor,支持epoll接口的系統(目前是Linux 2.6)中的epoll reactor,以及基於Windows下的輸入輸出完成端口的IOCP reactor
在實現輪詢的相關細節中,Twisted須要考慮的包括
1. 網絡和文件系統的限制 2. 緩衝行爲 3. 如何檢測鏈接丟失 4. 出現錯誤時的返回值
Twisted的reactor實現同時也考慮了正確使用底層的非阻塞式API,並正確處理各類邊界狀況。因爲Python中沒有暴露出IOCP API,所以Twisted須要維護本身的實現
0x2: Deferreds
Deferred對象以抽象化的方式表達了一種思想,即結果還尚不存在。它一樣可以幫助管理產生這個結果所須要的回調鏈。當從函數中返回時,Deferred對象承諾在某個時刻函數將產生一個結果。返回的Deferred對象中包含全部註冊到事件上的回調引用,所以在函數間只須要傳遞這一個對象便可,跟蹤這個對象比單獨管理全部的回調要簡單的多
Deferred對象包含一對回調鏈
1. 一個是針對操做成功的回調 2. 一個是針對操做失敗的回調
初始狀態下Deferred對象的兩條鏈都爲空。在事件處理的過程當中,每一個階段都爲其添加處理成功的回調和處理失敗的回調。當一個異步結果到來時,Deferred對象就被"激活",那麼處理成功的回調和處理失敗的回調就能夠以合適的方式按照它們添加進來的順序依次獲得調用
0x3: Transports
Transports表明網絡中兩個通訊結點之間的鏈接。Transports負責描述鏈接的細節,好比鏈接是面向流式的仍是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可做爲transports的例子。它們被設計爲」知足最小功能單元,同時具備最大程度的可複用性「,並且從協議實現中分離出來,這讓許多協議能夠採用相同類型的傳輸。Transports實現了ITransports接口,它包含以下的方法
1. write: 以非阻塞的方式按順序依次將數據寫到物理鏈接上 2. writeSequence: 將一個字符串列表寫到物理鏈接上 3. loseConnection: 將全部掛起的數據寫入,而後關閉鏈接 4. getPeer: 取得鏈接中對端的地址信息 5. getHost: 取得鏈接中本端的地址信息
將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。能夠經過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查
0x4: Protocols
Protocols描述瞭如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含以下的方法
1. makeConnection: 在transport對象和服務器之間創建一條鏈接 2. connectionMade: 鏈接創建起來後調用 3. dataReceived: 接收數據時調用 4. connectionLost: 關閉鏈接時調用
Relevant Link:
https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p02.html https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p04.html http://blog.csdn.net/hanhuili/article/details/9389433 http://blog.sina.com.cn/s/blog_704b6af70100py9n.html
3. Twisted網絡編程
0x1: Writing Servers
from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor class Chat(LineReceiver): def __init__(self, users): self.users = users self.name = None self.state = "GETNAME" def connectionMade(self): self.sendLine("What's your name?") def connectionLost(self, reason): if self.name in self.users: del self.users[self.name] def lineReceived(self, line): if self.state == "GETNAME": self.handle_GETNAME(line) else: self.handle_CHAT(line) def handle_GETNAME(self, name): if name in self.users: self.sendLine("Name taken, please choose another.") return self.sendLine("Welcome, %s!" % (name,)) self.name = name self.users[name] = self self.state = "CHAT" def handle_CHAT(self, message): message = "<%s> %s" % (self.name, message) for name, protocol in self.users.iteritems(): if protocol != self: protocol.sendLine(message) class ChatFactory(Factory): def __init__(self): self.users = {} # maps user names to Chat instances def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8123, ChatFactory()) reactor.run()
0x2: Writing Clients
Twisted is a framework designed to be very flexible, and let you write powerful clients. The cost of this flexibility is a few layers in the way to writing your client
1. single-use clients
In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol. In those cases twisted.internet.endpoints provides the appropriate API, and in particular connectProtocol which takes a protocol instance rather than a factory.
from twisted.internet import reactor from twisted.internet.protocol import Protocol from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol class Greeter(Protocol): def sendMessage(self, msg): self.transport.write("MESSAGE %s\n" % msg) def gotProtocol(p): p.sendMessage("Hello") reactor.callLater(1, p.sendMessage, "This is sent in a second") reactor.callLater(2, p.transport.loseConnection) point = TCP4ClientEndpoint(reactor, "localhost", 1234) d = connectProtocol(point, Greeter()) d.addCallback(gotProtocol) reactor.run()
2. ClientFactory
Still, there’s plenty of code out there that uses lower-level APIs, and a few features (such as automatic reconnection) have not been re-implemented with endpoints yet, so in some cases they may be more convenient to use.
To use the lower-level connection APIs, you will need to call one of the reactor.connect* methods directly. For these cases, you need a ClientFactory . The ClientFactory is in charge of creating the Protocol and also receives events relating to the connection state. This allows it to do things like reconnect in the event of a connection error
from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory from sys import stdout class Echo(Protocol): def dataReceived(self, data): stdout.write(data) class EchoClientFactory(ClientFactory): def startedConnecting(self, connector): print 'Started to connect.' def buildProtocol(self, addr): print 'Connected.' return Echo() def clientConnectionLost(self, connector, reason): print 'Lost connection. Reason:', reason def clientConnectionFailed(self, connector, reason): print 'Connection failed. Reason:', reason reactor.connectTCP(host, port, EchoClientFactory()) reactor.run()
3. A Higher-Level Example: ircLogBot
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ An example IRC log bot - logs a channel's events to a file. If someone says the bot's name in the channel followed by a ':', e.g. <foo> logbot: hello! the bot will reply: <logbot> foo: I am a log bot Run this script with two arguments, the channel name the bot should connect to, and file to log to, e.g.: $ python ircLogBot.py test test.log will log channel #test to the file 'test.log'. To run the script: $ python ircLogBot.py <channel> <file> """ # twisted imports from twisted.words.protocols import irc from twisted.internet import reactor, protocol from twisted.python import log # system imports import time, sys class MessageLogger: """ An independent logger class (because separation of application and protocol logic is a good thing). """ def __init__(self, file): self.file = file def log(self, message): """Write a message to the file.""" timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time())) self.file.write('%s %s\n' % (timestamp, message)) self.file.flush() def close(self): self.file.close() class LogBot(irc.IRCClient): """A logging IRC bot.""" nickname = "twistedbot" def connectionMade(self): irc.IRCClient.connectionMade(self) self.logger = MessageLogger(open(self.factory.filename, "a")) self.logger.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) def connectionLost(self, reason): irc.IRCClient.connectionLost(self, reason) self.logger.log("[disconnected at %s]" % time.asctime(time.localtime(time.time()))) self.logger.close() # callbacks for events def signedOn(self): """Called when bot has succesfully signed on to server.""" self.join(self.factory.channel) def joined(self, channel): """This will get called when the bot joins the channel.""" self.logger.log("[I have joined %s]" % channel) def privmsg(self, user, channel, msg): """This will get called when the bot receives a message.""" user = user.split('!', 1)[0] self.logger.log("<%s> %s" % (user, msg)) # Check to see if they're sending me a private message if channel == self.nickname: msg = "It isn't nice to whisper! Play nice with the group." self.msg(user, msg) return # Otherwise check to see if it is a message directed at me if msg.startswith(self.nickname + ":"): msg = "%s: I am a log bot" % user self.msg(channel, msg) self.logger.log("<%s> %s" % (self.nickname, msg)) def action(self, user, channel, msg): """This will get called when the bot sees someone do an action.""" user = user.split('!', 1)[0] self.logger.log("* %s %s" % (user, msg)) # irc callbacks def irc_NICK(self, prefix, params): """Called when an IRC user changes their nickname.""" old_nick = prefix.split('!')[0] new_nick = params[0] self.logger.log("%s is now known as %s" % (old_nick, new_nick)) # For fun, override the method that determines how a nickname is changed on # collisions. The default method appends an underscore. def alterCollidedNick(self, nickname): """ Generate an altered version of a nickname that caused a collision in an effort to create an unused related name for subsequent registration. """ return nickname + '^' class LogBotFactory(protocol.ClientFactory): """A factory for LogBots. A new protocol instance will be created each time we connect to the server. """ def __init__(self, channel, filename): self.channel = channel self.filename = filename def buildProtocol(self, addr): p = LogBot() p.factory = self return p def clientConnectionLost(self, connector, reason): """If we get disconnected, reconnect to server.""" connector.connect() def clientConnectionFailed(self, connector, reason): print "connection failed:", reason reactor.stop() if __name__ == '__main__': # initialize logging log.startLogging(sys.stdout) # create factory protocol and application f = LogBotFactory(sys.argv[1], sys.argv[2]) # connect factory to this host and port reactor.connectTCP("irc.freenode.net", 6667, f) # run bot reactor.run()
4. Persistent Data in the Factory
When the protocol is created, it gets a reference to the factory as self.factory . It can then access attributes of the factory in its logic.
Factories have a default implementation of buildProtocol. It does the same thing the example above does using the protocol attribute of the factory to create the protocol instance. In the example above, the factory could be rewritten to look like this:
class LogBotFactory(protocol.ClientFactory): protocol = LogBot def __init__(self, channel, filename): self.channel = channel self.filename = filename
Relevant Link:
http://twisted.readthedocs.org/en/latest/core/howto/clients.html
4. reactor進程管理編程
Along with connection to servers across the internet, Twisted also connects to local processes with much the same API.
須要明白的是,reactor是一個編程範式,Twisted是基於這種異步事件編程模型實現的網絡編程框架,一樣的,reactor異步事件編程模型還能夠用在進程時間管理上
0x1: Example
#!/usr/bin/env python # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. from twisted.internet import protocol from twisted.internet import reactor import re class MyPP(protocol.ProcessProtocol): def __init__(self, verses): self.verses = verses self.data = "" def connectionMade(self): print "connectionMade!" for i in range(self.verses): self.transport.write("Aleph-null bottles of beer on the wall,\n" + "Aleph-null bottles of beer,\n" + "Take one down and pass it around,\n" + "Aleph-null bottles of beer on the wall.\n") self.transport.closeStdin() # tell them we're done def outReceived(self, data): print "outReceived! with %d bytes!" % len(data) self.data = self.data + data def errReceived(self, data): print "errReceived! with %d bytes!" % len(data) def inConnectionLost(self): print "inConnectionLost! stdin is closed! (we probably did it)" def outConnectionLost(self): print "outConnectionLost! The child closed their stdout!" # now is the time to examine what they wrote #print "I saw them write:", self.data (dummy, lines, words, chars, file) = re.split(r'\s+', self.data) print "I saw %s lines" % lines def errConnectionLost(self): print "errConnectionLost! The child closed their stderr." def processExited(self, reason): print "processExited, status %d" % (reason.value.exitCode,) def processEnded(self, reason): print "processEnded, status %d" % (reason.value.exitCode,) print "quitting" reactor.stop() pp = MyPP(10) reactor.spawnProcess(pp, "wc", ["wc"], {}) reactor.run()
0x2: Example
class GPGProtocol(ProcessProtocol): def __init__(self, crypttext): self.crypttext = crypttext self.plaintext = "" self.status = "" def connectionMade(self): self.transport.writeToChild(3, self.passphrase) self.transport.closeChildFD(3) self.transport.writeToChild(0, self.crypttext) self.transport.closeChildFD(0) def childDataReceived(self, childFD, data): if childFD == 1: self.plaintext += data if childFD == 4: self.status += data def processEnded(self, status): rc = status.value.exitCode if rc == 0: self.deferred.callback(self) else: self.deferred.errback(rc) def decrypt(crypttext): gp = GPGProtocol(crypttext) gp.deferred = Deferred() cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4", "--batch"] p = reactor.spawnProcess(gp, cmd[0], cmd, env=None, childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"}) return gp.deferred
Relevant Link:
http://twistedmatrix.com/documents/12.2.0/core/howto/process.html
5. Twisted併發鏈接
Some time back I had to write a network server which need to support ~50K concurrent clients in a single box. Server-Client communication used a propitiatory protocol on top of TCP where RawBinaryData Struct is used as the messaging format. Clients exchanged periodic keep-alives which server used to check health state. As most of the operations were IO based(socket/db) we decided to used python/twisted to implement server.
On performing load tests we found that server is able to handle only 1024 client after which connections are failing. Increased per process max open files (1024) to 100000 (ulimit -n 100000) and still the connections failed at 1024.
0x1: select limitation
select fails after 1024 fds as FD_SETSIZE max to 1024. Twisted's default reactor seems to be based on select. As a natural progression poll was tried next to over come max open fd issue.
0x2: poll limitation
poll solves the max fd issue. But as the number of concurrent clients started increasing, performance dropped drastically. Poll implementation does O(n) operations internally and performance drops as number of fds increases.
0x3: epoll
Epoll reactor solved both problems and gave awesome performance. libevent is another library build on top of epoll.
0x4: Async frameworks
do not waste time with 'select/poll' based approaches if the number of concurrent connection expected is above 1K. Following are some of the event-loop based frameworks where this is applicable.
1. Eventlet (python) 2. Gevent (python) is similar to eventlet uses libevent which is build on top of epoll. 3. C++ ACE 4. Java Netty 5. Ruby Eventmachine
0x5: Choosing a Reactor and GUI Toolkit Integration(new twisted)
Twisted provides a variety of implementations of the twisted.internet.reactor. The specialized implementations are suited for different purposes and are designed to integrate better with particular platforms.
The epoll()-based reactor is Twisted's default on Linux. Other platforms use poll(), or the most cross-platform reactor, select().
Platform-specific reactor implementations exist for:
Poll for Linux Epoll for Linux 2.6 WaitForMultipleObjects (WFMO) for Win32 Input/Output Completion Port (IOCP) for Win32 KQueue for FreeBSD and Mac OS X CoreFoundation for Mac OS X
1. Select()-based Reactor
The select reactor is the default on platforms that don't provide a better alternative that covers all use cases. If the select reactor is desired, it may be installed via:
from twisted.internet import selectreactor selectreactor.install() from twisted.internet import reactor
2. Poll-based Reactor
The PollReactor will work on any platform that provides select.poll. With larger numbers of connected sockets, it may provide for better performance than the SelectReactor.
from twisted.internet import pollreactor pollreactor.install() from twisted.internet import reactor
3. KQueue
The KQueue Reactor allows Twisted to use FreeBSD's kqueue mechanism for event scheduling
from twisted.internet import kqreactor kqreactor.install() from twisted.internet import reactor
4. WaitForMultipleObjects (WFMO) for Win32
from twisted.internet import win32eventreactor win32eventreactor.install() from twisted.internet import reactor
5. Input/Output Completion Port (IOCP) for Win32
Windows provides a fast, scalable event notification system known as IO Completion Ports, or IOCP for short. Twisted includes a reactor based on IOCP which is nearly complete.
from twisted.internet import iocpreactor iocpreactor.install() from twisted.internet import reactor
6. Epoll-based Reactor
The EPollReactor will work on any platform that provides epoll
, today only Linux 2.6 and over. The implementation of the epoll reactor currently uses the Level Triggered interface, which is basically like poll() but scales much better.
from twisted.internet import epollreactor epollreactor.install() from twisted.internet import reactor
Relevant Link:
https://moythreads.com/wordpress/2009/12/22/select-system-call-limitation/ http://pipeit.blogspot.com/2011/07/select-poll-and-epoll-twisted-story.html http://twistedmatrix.com/documents/13.2.0/core/howto/choosing-reactor.html#auto2
Copyright (c) 2016 LittleHann All rights reserved