該模塊基於asyncore簡化了異步客戶端和服務器,並使其更容易元素處理由任意的字符串結束,或者是可變長度的的協議。它提供了抽象類async_chat,提供collect_incoming_data()和found_terminator()方法。循環和asyncore的同樣,有2種信道:asyncore.dispatcher和asynchat.async_chat,能夠自由混合信道。一般asyncore.dispatcher服務器通道在接收到鏈接請求時產生新的asynchat.async_chat通道對象。html
class asynchat.async_chat:這個類是asyncore.dispatcher的抽象子類。通常使用其collect_incoming_data()和found_terminator()方法,asyncore.dispatcher的方法均可以使用,儘管不是全部在消息/響應上下文都有意義。python
async_chat.close_when_done()推None到生產者FIFO。當此None彈出的fifo時,通道將關閉。程序員
async_chat.collect_incoming_data(data):數據到達。默認引起NotImplementedError異常。使用時必須重載。web
async_chat.discard_buffers():在緊急狀況下方法將丟棄在輸入和/或輸出緩衝器和生產者FIFO持有數據。shell
async_chat.found_terminator():當輸入的數據流相匹配set_terminator()中設定的終止條件時調用。默認引起NotImplementedError異常。使用時必須重載。緩衝輸入的數據在實例屬性是可用的。服務器
async_chat.get_terminator():返回當前通道的結束符。網絡
async_chat.push(data):往通道的FIFO中添加數據,固然可使用本身製做的更復雜的方案來實現加密和分塊之類的。session
async_chat.push_with_producer(producer):往通道的FIFO中添加數據生產者FIFO中添加生產者。當全部當前生產者隊列用盡以後將調用其more()方法併發送數據到遠端。併發
async_chat.set_terminator(term) 設定終止符號。類型以下:app
string: Will call found_terminator() when the string is found in the input stream。
integer: Will call found_terminator() when the indicated number of characters have been received。
None: The channel continues to collect data forever。
ac_in_buffer_size異步輸入緩衝區的大小(默認4096)。
ac_out_buffer_size異步輸出緩衝區的大小(默認4096)。
像asyncore.dispatcher, async_chat定義了一組由select()調用後的socket條件分析生成的事件。輪詢循環開始後async_chat對象的方法被事件處理框架調用,不須要程序員干預。。兩個類的屬性能夠修改,以提升性能,甚至節省內存。
不像asyncore.dispatcher, async_chat容許您定義生產者的先進先出隊列(FIFO)。生產者有惟一的方法中more(),返回通道上要傳輸的數據。生產者沒有數據時返回空字符串。這時async_chat對象從FIFO移除生產者並開始使用下一個生產者,若是有的話。當生產者FIFO爲空時handle_write()什麼都不會作。通道對象的set_terminator()方法來識別遠端的結束,重要的斷點,輸入。
爲了創建一個有效的async_chat子類,方法collect_incoming_data()和found_terminator()必須處理該通道異步接收數據。相關方法以下:
class asynchat.fifo([list=None]): 輔助類asynchat.fifo的方法以下:
is_empty(): Returns True if and only if the fifo is empty.
first(): Returns the least-recently push()ed item from the fifo.
push(data): Adds the given data (which may be a string or a producer object) to the producer fifo.
pop(): If the fifo is not empty, returns True, first(), deleting the popped item. Returns False, None for an empty fifo.
下例說明如何經過async_chat讀取HTTP請求。 Web服務器能夠爲每一個傳入客戶端鏈接建立一個http_request_handler對象。請注意, 初始的信道終結符設置爲HTTP頭的末尾空行和一個表示正在讀取頭的標誌。
報頭讀完後,若是請求的類型是POST的(這代表還有其餘數據),則內容長度:頭用來設置數值終止符。
數據整理完畢, 設置terminator爲None,確保web客戶端不會讀取多餘的數據以後調用handle_request()進行數據處理。
class http_request_handler(asynchat.async_chat): def __init__(self, sock, addr, sessions, log): asynchat.async_chat.__init__(self, sock=sock) self.addr = addr self.sessions = sessions self.ibuffer = [] self.obuffer = "" self.set_terminator("\r\n\r\n") self.reading_headers = True self.handling = False self.cgi_data = None self.log = log def collect_incoming_data(self, data): """Buffer the data""" self.ibuffer.append(data) def found_terminator(self): if self.reading_headers: self.reading_headers = False self.parse_headers("".join(self.ibuffer)) self.ibuffer = [] if self.op.upper() == "POST": clen = self.headers.getheader("content-length") self.set_terminator(int(clen)) else: self.handling = True self.set_terminator(None) self.handle_request() elif not self.handling: self.set_terminator(None) # browsers sometimes over-send self.cgi_data = parse(self.headers, "".join(self.ibuffer)) self.handling = True self.ibuffer = [] self.handle_request()
注意:這並非一個不添加代碼就能夠實際運行的實例。
下面實例來源於《The Python Standard Library by Example 2011》,更詳細。
asynchat_echo_server.py:
import asyncoreimport loggingimport socketfrom asynchat_echo_handler import EchoHandlerclass EchoServer(asyncore.dispatcher): """Receives connections and establishes handlers for each client. """ def __init__(self, address): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind(address) self.address = self.socket.getsockname() self.listen(1) return def handle_accept(self): # Called when a client connects to our socket client_info = self.accept() EchoHandler(sock=client_info[0]) # Only deal with one client at a time, # so close as soon as the handler is set up. # Under normal conditions, the server # would run forever or until it received # instructions to stop. self.handle_close() return def handle_close(self): self.close()
asynchat_echo_handler.py: EchoHandler繼承asynchat.async_chat,收發能夠自動處理,不過須要處理以下東東:
處理輸入數據,重載handle_incoming_data()
認識輸入數據的結束符,經過set_terminator()
收到完整信息後的處理:found_terminator()
發送數據push()
該示例應用程序有兩種工做模式。它要麼等待像"ECHO length\n"之類的命令,要麼等待要顯示的數據,經過實例變量process_data切換模式。一旦找到完整的命令,處理程序切換到消息處理模式,並等待接收的完整文本。當全部的數據可用時,它被推入到輸出信道。數據被髮送處理程序被設置爲關閉。
import asynchatimport loggingclass EchoHandler(asynchat.async_chat): """Handles echoing messages from a single client. """ # Artificially reduce buffer sizes to illustrate # sending and receiving partial messages. ac_in_buffer_size = 128 ac_out_buffer_size = 128 def __init__(self, sock): self.received_data = [] self.logger = logging.getLogger('EchoHandler') asynchat.async_chat.__init__(self, sock) # Start looking for the ECHO command self.process_data = self._process_command self.set_terminator('\n') return def collect_incoming_data(self, data): """Read an incoming message from the client and put it into the outgoing queue. """ self.logger.debug( 'collect_incoming_data() -> (%d bytes) %r', len(data), data) self.received_data.append(data) def found_terminator(self): """The end of a command or message has been seen.""" self.logger.debug('found_terminator()') self.process_data() def _process_command(self): """Have the full ECHO command""" command = ''.join(self.received_data) self.logger.debug('_process_command() %r', command) command_verb, command_arg = command.strip().split(' ') expected_data_len = int(command_arg) self.set_terminator(expected_data_len) self.process_data = self._process_message self.received_data = [] def _process_message(self): """Have read the entire message.""" to_echo = ''.join(self.received_data) self.logger.debug('_process_message() echoing %r', to_echo) self.push(to_echo) # Disconnect after sending the entire response # since we only want to do one thing at a time self.close_when_done()
客戶端和處理器相似。要發送的消息先傳遞給客戶端的構造函數。當socket鏈接創建後,調用handle_connect()發送命令和消息數據。
命令是直接push,消息文本則使用生產者類。生產者有輪詢機制把數據塊發送到網絡。當生產者返回空字符串,寫中止。
import asynchatimport loggingimport socketclass EchoClient(asynchat.async_chat): """Sends messages to the server and receives responses. """ # Artificially reduce buffer sizes to show # sending and receiving partial messages. ac_in_buffer_size = 128 ac_out_buffer_size = 128 def __init__(self, host, port, message): self.message = message self.received_data = [] self.logger = logging.getLogger('EchoClient') asynchat.async_chat.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.logger.debug('connecting to %s', (host, port)) self.connect((host, port)) return def handle_connect(self): self.logger.debug('handle_connect()') # Send the command self.push('ECHO %d\n' % len(self.message)) # Send the data self.push_with_producer( EchoProducer(self.message, buffer_size=self.ac_out_buffer_size) ) # We expect the data to come back as-is, # so set a length-based terminator self.set_terminator(len(self.message)) def collect_incoming_data(self, data): """Read an incoming message from the client and add it to the outgoing queue. """ self.logger.debug( 'collect_incoming_data() -> (%d) %r', len(data), data) self.received_data.append(data) def found_terminator(self): self.logger.debug('found_terminator()') received_message = ''.join(self.received_data) if received_message == self.message: self.logger.debug('RECEIVED COPY OF MESSAGE') else: self.logger.debug('ERROR IN TRANSMISSION') self.logger.debug('EXPECTED %r', self.message) self.logger.debug('RECEIVED %r', received_message) returnclass EchoProducer(asynchat.simple_producer): logger = logging.getLogger('EchoProducer') def more(self): response = asynchat.simple_producer.more(self) self.logger.debug('more() -> (%s bytes) %r', len(response), response) return response
主程序:
import asyncoreimport loggingimport socketfrom asynchat_echo_server import EchoServerfrom asynchat_echo_client import EchoClient logging.basicConfig(level=logging.DEBUG, format='%(name)-11s: %(message)s', )address = ('localhost', 0) # let the kernel give us a portserver = EchoServer(address)ip, port = server.address # find out what port we were givenmessage_data = open('lorem.txt', 'r').read()client = EchoClient(ip, port, message=message_data)asyncore.loop()
輔助文本:
# vi lorem.txt Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec egestas, enim et consectetuer ullamcorper, lectus ligula rutrum leo, a elementum elit tortor eu quam.
執行結果:
# ./asynchat_echo_main.py EchoClient : connecting to ('127.0.0.1', 58974)EchoClient : handle_connect()EchoHandler: collect_incoming_data() -> (8 bytes) 'ECHO 166'EchoHandler: found_terminator()EchoHandler: _process_command() 'ECHO 166'EchoProducer: more() -> (128 bytes) 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\n'EchoHandler: collect_incoming_data() -> (128 bytes) 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\n'EchoProducer: more() -> (38 bytes) 'leo, a elementum elit tortor eu quam.\n'EchoHandler: collect_incoming_data() -> (38 bytes) 'leo, a elementum elit tortor eu quam.\n'EchoHandler: found_terminator()EchoHandler: _process_message() echoing 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\nleo, a elementum elit tortor eu quam.\n'EchoProducer: more() -> (0 bytes) ''EchoClient : collect_incoming_data() -> (128) 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\n'EchoClient : collect_incoming_data() -> (38) 'leo, a elementum elit tortor eu quam.\n'EchoClient : found_terminator()EchoClient : RECEIVED COPY OF MESSAGE
《The Python Standard Library by Example 2011》
本站地址:python自動化測試http://automationtesting.sinaapp.com python開發自動化測試羣113938272和開發測試羣6089740 微博 http://weibo.com/cizhenshi