asyncio
是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持。html
asyncio
的編程模型就是一個消息循環。咱們從asyncio
模塊中直接獲取一個EventLoop
的引用,而後把須要執行的協程扔到EventLoop
中執行,就實現了異步IOpython
此模塊爲編寫單線程併發代碼提升基礎架構,經過使用協程、套接字和其餘資源的 I/O 多路複用,運行網絡客戶端和服務器,以及其餘相關的基元。編程
包內容的詳細的列表以下:windows
concurrent.futures
模塊但適合在事件循環中使用的 Future
類yield from
( PEP 380)的協程和任務幫助以順序的方式編寫併發代碼Future
和協程的操做支持threading
模塊裏那些中文文檔:http://python.usyiyi.cn/translate/python_352/library/asyncio.html服務器
官方文檔:https://docs.python.org/3/library/asyncio.html網絡
關於asyncio模塊詳細使用說明這裏再也不贅述,下面將爲你們展現一些例子,做爲快速學習之用:架構
一、使用 AbstractEventLoop.call_soon()
方法來安排回調的示例。回調顯示 "Hello World"
,而後中止事件循環:併發
import asyncio def hello_world(loop): print('Hello World') loop.stop() loop = asyncio.get_event_loop() # Schedule a call to hello_world() loop.call_soon(hello_world, loop) # Blocking call interrupted by loop.stop() loop.run_forever() loop.close()
二、回調示例每秒顯示當前日期。回調使用AbstractEventLoop.call_later()
方法在5秒內從新計劃自身,而後中止事件循環:異步
import asyncio import datetime def display_date(end_time, loop): print(datetime.datetime.now()) if (loop.time() + 1.0) < end_time: loop.call_later(1, display_date, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() # Schedule the first call to display_date() end_time = loop.time() + 5.0 loop.call_soon(display_date, end_time, loop) # Blocking call interrupted by loop.stop() loop.run_forever() loop.close()
三、等待文件描述器使用AbstractEventLoop.add_reader()
方法接收到一些數據,而後關閉事件循環:socket
import asyncio try: from socket import socketpair except ImportError: from asyncio.windows_utils import socketpair # Create a pair of connected file descriptors rsock, wsock = socketpair() loop = asyncio.get_event_loop() def reader(): data = rsock.recv(100) print("Received:", data.decode()) # We are done: unregister the file descriptor loop.remove_reader(rsock) # Stop the event loop loop.stop() # Register the file descriptor for read event loop.add_reader(rsock, reader) # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) # Run the event loop loop.run_forever() # We are done, close sockets and the event loop rsock.close() wsock.close() loop.close()
四、使用AbstractEventLoop.add_signal_handler()
方法的信號SIGINT
和SIGTERM
的寄存器處理程序:
import asyncio import functools import os import signal def ask_exit(signame): print("got signal %s: exit" % signame) loop.stop() loop = asyncio.get_event_loop() for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), functools.partial(ask_exit, signame)) print("Event loop running forever, press Ctrl+C to interrupt.") print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid()) try: loop.run_forever() finally: loop.close() #此示例僅適用於UNIX
五、組合Future
和coroutine function的示例:
協程函數負責計算(須要1秒),並將結果存儲到Future
。run_until_complete()
方法等待Future
的完成。
import asyncio @asyncio.coroutine def slow_operation(future): yield from asyncio.sleep(1) future.set_result('Future is done!') loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) loop.run_until_complete(future) print(future.result()) loop.close()
六、使用Future.add_done_callback()
方法來不一樣地編寫前面的示例來明確描述控制流:
在此示例中,Future
用於將slow_operation()
連接到got_result()
:當slow_operation()
完成時,got_result()
與結果一塊兒調用。
import asyncio @asyncio.coroutine def slow_operation(future): yield from asyncio.sleep(1) future.set_result('Future is done!') def got_result(future): print(future.result()) loop.stop() loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) future.add_done_callback(got_result) try: loop.run_forever() finally: loop.close()
七、並行執行3個任務(A,B,C)的示例:
任務在建立時自動計劃執行。全部任務完成後,事件循環中止。
import asyncio @asyncio.coroutine def factorial(name, number): f = 1 for i in range(2, number+1): print("Task %s: Compute factorial(%s)..." % (name, i)) yield from asyncio.sleep(1) f *= i print("Task %s: factorial(%s) = %s" % (name, number, f)) loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4))] loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
output:
Task A: Compute factorial(2)... Task B: Compute factorial(2)... Task C: Compute factorial(2)... Task A: factorial(2) = 2 Task B: Compute factorial(3)... Task C: Compute factorial(3)... Task B: factorial(3) = 6 Task C: Compute factorial(4)... Task C: factorial(4) = 24
八、TCP echo客戶端使用AbstractEventLoop.create_connection()
方法,TCP回顯服務器使用AbstractEventLoop.create_server()
方法
客戶端:
事件循環運行兩次。在這個簡短的例子中,優先使用run_until_complete()
方法來引起異常,若是服務器沒有監聽,而沒必要寫一個短的協程來處理異常並中止運行循環。在run_until_complete()
退出時,循環再也不運行,所以在發生錯誤時不須要中止循環。
import asyncio class EchoClientProtocol(asyncio.Protocol): def __init__(self, message, loop): self.message = message self.loop = loop def connection_made(self, transport): transport.write(self.message.encode()) print('Data sent: {!r}'.format(self.message)) def data_received(self, data): print('Data received: {!r}'.format(data.decode())) def connection_lost(self, exc): print('The server closed the connection') print('Stop the event loop') self.loop.stop() loop = asyncio.get_event_loop() message = 'Hello World!' coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), '127.0.0.1', 8888) loop.run_until_complete(coro) loop.run_forever() loop.close()
服務器:
Transport.close()
能夠在WriteTransport.write()
以後當即調用,即便數據還沒有在套接字上發送:兩種方法都是異步的。不須要yield from
,由於這些傳輸方法不是協程。
import asyncio class EchoServerClientProtocol(asyncio.Protocol): def connection_made(self, transport): peername = transport.get_extra_info('peername') print('Connection from {}'.format(peername)) self.transport = transport def data_received(self, data): message = data.decode() print('Data received: {!r}'.format(message)) print('Send: {!r}'.format(message)) self.transport.write(data) print('Close the client socket') self.transport.close() loop = asyncio.get_event_loop() # Each client connection will create a new protocol instance coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888) server = loop.run_until_complete(coro) # Serve requests until Ctrl+C is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()
九、UDP echo客戶端使用AbstractEventLoop.create_datagram_endpoint()
方法,UDP echo服務器使用AbstractEventLoop.create_datagram_endpoint()
方法
客戶端:
import asyncio class EchoClientProtocol: def __init__(self, message, loop): self.message = message self.loop = loop self.transport = None def connection_made(self, transport): self.transport = transport print('Send:', self.message) self.transport.sendto(self.message.encode()) def datagram_received(self, data, addr): print("Received:", data.decode()) print("Close the socket") self.transport.close() def error_received(self, exc): print('Error received:', exc) def connection_lost(self, exc): print("Socket closed, stop the event loop") loop = asyncio.get_event_loop() loop.stop() loop = asyncio.get_event_loop() message = "Hello World!" connect = loop.create_datagram_endpoint( lambda: EchoClientProtocol(message, loop), remote_addr=('127.0.0.1', 9999)) transport, protocol = loop.run_until_complete(connect) loop.run_forever() transport.close() loop.close()
服務器:
import asyncio class EchoServerProtocol: def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr): message = data.decode() print('Received %r from %s' % (message, addr)) print('Send %r to %s' % (message, addr)) self.transport.sendto(data, addr) loop = asyncio.get_event_loop() print("Starting UDP server") # One protocol instance will be created to serve all client requests listen = loop.create_datagram_endpoint( EchoServerProtocol, local_addr=('127.0.0.1', 9999)) transport, protocol = loop.run_until_complete(listen) try: loop.run_forever() except KeyboardInterrupt: pass transport.close() loop.close()
十、等待套接字使用協議使用AbstractEventLoop.create_connection()
方法接收數據,而後關閉事件循環
import asyncio try: from socket import socketpair except ImportError: from asyncio.windows_utils import socketpair # Create a pair of connected sockets rsock, wsock = socketpair() loop = asyncio.get_event_loop() class MyProtocol(asyncio.Protocol): transport = None def connection_made(self, transport): self.transport = transport def data_received(self, data): print("Received:", data.decode()) # We are done: close the transport (it will call connection_lost()) self.transport.close() def connection_lost(self, exc): # The socket has been closed, stop the event loop loop.stop() # Register the socket to wait for data connect_coro = loop.create_connection(MyProtocol, sock=rsock) transport, protocol = loop.run_until_complete(connect_coro) # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) # Run the event loop loop.run_forever() # We are done, close sockets and the event loop rsock.close() wsock.close() loop.close()
十一、TCP回顯客戶端使用asyncio.open_connection()
函數,TCP回顯服務器使用asyncio.start_server()
函數
客戶端:
import asyncio @asyncio.coroutine def tcp_echo_client(message, loop): reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=loop) print('Send: %r' % message) writer.write(message.encode()) data = yield from reader.read(100) print('Received: %r' % data.decode()) print('Close the socket') writer.close() message = 'Hello World!' loop = asyncio.get_event_loop() loop.run_until_complete(tcp_echo_client(message, loop)) loop.close()
服務器:
import asyncio @asyncio.coroutine def handle_echo(reader, writer): data = yield from reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print("Received %r from %r" % (message, addr)) print("Send: %r" % message) writer.write(data) yield from writer.drain() print("Close the client socket") writer.close() loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop) server = loop.run_until_complete(coro) # Serve requests until Ctrl+C is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()
十二、在命令行中獲取URL的HTTP頭的簡單示例:
import asyncio import urllib.parse import sys @asyncio.coroutine def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': connect = asyncio.open_connection(url.hostname, 443, ssl=True) else: connect = asyncio.open_connection(url.hostname, 80) reader, writer = yield from connect query = ('HEAD {path} HTTP/1.0\r\n' 'Host: {hostname}\r\n' '\r\n').format(path=url.path or '/', hostname=url.hostname) writer.write(query.encode('latin-1')) while True: line = yield from reader.readline() if not line: break line = line.decode('latin1').rstrip() if line: print('HTTP header> %s' % line) # Ignore the body, close the socket writer.close() url = sys.argv[1] loop = asyncio.get_event_loop() task = asyncio.ensure_future(print_http_headers(url)) loop.run_until_complete(task) loop.close()
#用法: python example.py http://example.com/path/page.html #使用HTTPS: python example.py https://example.com/path/page.html
1三、協程等待,直到套接字使用open_connection()
函數接收數據:
import asyncio try: from socket import socketpair except ImportError: from asyncio.windows_utils import socketpair @asyncio.coroutine def wait_for_data(loop): # Create a pair of connected sockets rsock, wsock = socketpair() # Register the open socket to wait for data reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop) # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) # Wait for data data = yield from reader.read(100) # Got data, we are done: close the socket print("Received:", data.decode()) writer.close() # Close the second socket wsock.close() loop = asyncio.get_event_loop() loop.run_until_complete(wait_for_data(loop)) loop.close()
1四、子進程協議的示例,用於獲取子進程的輸出並等待子進程退出。子過程由AbstractEventLoop.subprocess_exec()
方法建立:
import asyncio import sys class DateProtocol(asyncio.SubprocessProtocol): def __init__(self, exit_future): self.exit_future = exit_future self.output = bytearray() def pipe_data_received(self, fd, data): self.output.extend(data) def process_exited(self): self.exit_future.set_result(True) @asyncio.coroutine def get_date(loop): code = 'import datetime; print(datetime.datetime.now())' exit_future = asyncio.Future(loop=loop) # Create the subprocess controlled by the protocol DateProtocol, # redirect the standard output into a pipe create = loop.subprocess_exec(lambda: DateProtocol(exit_future), sys.executable, '-c', code, stdin=None, stderr=None) transport, protocol = yield from create # Wait for the subprocess exit using the process_exited() method # of the protocol yield from exit_future # Close the stdout pipe transport.close() # Read the output which was collected by the pipe_data_received() # method of the protocol data = bytes(protocol.output) return data.decode('ascii').rstrip() if sys.platform == "win32": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() date = loop.run_until_complete(get_date(loop)) print("Current date: %s" % date) loop.close()
1五、使用Process
類控制子進程和StreamReader
類從標準輸出讀取的示例。子過程由create_subprocess_exec()
函數建立:
import asyncio.subprocess import sys @asyncio.coroutine def get_date(): code = 'import datetime; print(datetime.datetime.now())' # Create the subprocess, redirect the standard output into a pipe create = asyncio.create_subprocess_exec(sys.executable, '-c', code, stdout=asyncio.subprocess.PIPE) proc = yield from create # Read one line of output data = yield from proc.stdout.readline() line = data.decode('ascii').rstrip() # Wait for the subprocess exit yield from proc.wait() return line if sys.platform == "win32": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() date = loop.run_until_complete(get_date()) print("Current date: %s" % date) loop.close()