異步編程html
重要概念web
同步、異步編程
阻塞、非阻塞session
區別*app
聯繫異步
同步IO、異步IO、IO多路複用socket
IO兩個階段async
IO模型異步編程
同步IO函數
阻塞IO
非阻塞IO
IO多路複用
異步IO
Python中的IO多路複用
selectors庫
#在selects模塊源碼最下面有以下代碼 # Choose the best implementation, roughly: # epoll|kqueue|devpoll > poll > select. # select() also can't accept a FD > FD_SETSIZE (usually around 1024) if 'KqueueSelector' in globals(): DefaultSelector = KqueueSelector elif 'EpollSelector' in globals(): DefaultSelector = EpollSelector elif 'DevpollSelector' in globals(): DefaultSelector = DevpollSelector elif 'PollSelector' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
1 #使用示例 2 import selectors 3 import threading 4 import socket 5 import logging 6 7 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s" 8 logging.basicConfig(level=logging.INFO,format=FORMAT) 9 10 #回調參數本身定義形參 11 def accept(sock:socket.socket,mask): 12 """mask:事件掩碼的或值""" 13 conn,addr = sock.accept() 14 conn.setblocking(False)#不阻塞 15 pass 16 17 #回調函數 18 def read(conn:socket.socket,mask): 19 pass 20 21 #構造缺省性能最優selector 22 selector = selectors.DefaultSelector() 23 24 #建立Tcp Server 25 socket = socket.socket() 26 socket.bind(('0.0.0.0',9999)) 27 socket.listen() 28 logging.info(socket) 29 socket.setblocking(False)#非阻塞 30 31 #註冊文件對象sock關注讀事件,返回SelectorKey 32 #將socket、關注事件、data都綁定到key實例屬性上 33 key = selector.register(socket,selectors.EVENT_READ,accept) 34 logging.info(key) 35 36 e = threading.Event() 37 38 def select(e:threading.Event): 39 while not e.is_set(): 40 #開始監視,等到有文件對象監控事件產生,返回(key,mask)元組 41 events = selector.select()#阻塞 42 print('-'*30) 43 for key,mask in events: 44 logging.info(key) 45 logging.info(mask) 46 callback = key.data #回調函數 47 callback(key.fileobj,mask) 48 49 threading.Thread(target=select,args=(e,),name='select').start() 50 51 def main(): 52 while not e.is_set(): 53 cmd = input('>>>') 54 if cmd.strip() == 'quit': 55 e.set() 56 fobjs = [] 57 logging.info("{}".format(list(selector.get_map().items()))) 58 59 for fd,key in selector.get_map().items(): 60 print(fd,key) 61 print(key.fileobj) 62 fobjs.append(key.fileobj) 63 64 for fobj in fobjs: 65 selector.unregister(fobj) 66 fobj.close() 67 selector.close() 68 69 if __name__ == '__main__': 70 main()
練習
1 import socket 2 import threading 3 import logging 4 import datetime 5 import selectors 6 7 FORMAT="%(asctime)s %(threadName)s %(thread)d %(message)s" 8 logging.basicConfig(level=logging.INFO,format=FORMAT) 9 10 #TCP Server 11 class ChatServer(): 12 def __init__(self,ip='0.0.0.0',port=9999): 13 self.addr = (ip,port) 14 self.socket = socket.socket() 15 self.event = threading.Event() 16 self.clients = {} 17 #增長 18 self.selector = selectors.DefaultSelector()#建立selector 19 20 def start(self):#啓動監聽 21 self.socket.bind(self.addr) 22 self.socket.listen() 23 # threading.Thread(target=self.accept,name='accept').start() 24 #增長 25 self.socket.setblocking(False) 26 self.selector.register(self.socket,selectors.EVENT_READ,self.accept)#註冊 27 threading.Thread(target=self.select,name='select',daemon=True).start() 28 29 #增長 30 def select(self):#阻塞 31 while not self.event.is_set(): 32 #開始監視,等到某文件對象被監控的事件產生,返回(key,mask)元組 33 events = self.selector.select() 34 print("*"*30) 35 for key,mask in events: 36 logging.info(key) 37 logging.info(mask) 38 callback = key.data#回調函數 39 callback(key.fileobj) 40 41 #回調函數 42 def accept(self,sock:socket.socket):#接受客戶端鏈接 43 conn,addr = sock.accept() 44 conn.setblocking(False) 45 #註冊,監視每個與客戶端的鏈接的socket對象 46 self.selector.register(conn,selectors.EVENT_READ,self.recv) 47 48 #回調函數 49 def recv(self,sock:socket.socket):#接受客戶端數據 50 data = sock.recv(1024) 51 if not data or data == b'quit':#客戶端主動斷開或退出,註銷並關閉socket 52 self.selector.unregister(sock) 53 sock.close() 54 return 55 msg = "{:%Y/%m/%d %H%:M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), 56 *sock.getpeername(),data.encode()) 57 #羣發 58 for key in self.selector.get_map().values(): 59 if key.data == self.recv:#排除self.accept 60 key.fileobj.send(msg) 61 62 #中止服務 63 def stop(self): 64 self.event.set() 65 fobjs = [] 66 for fd,key in self.selector.get_map().items():#fd:key對象 67 fobjs.append(key.fileobj) 68 for fobj in fobjs: 69 self.selector.unregister(fobj) 70 fobj.close() 71 self.selector.close() 72 73 def main(): 74 cs = ChatServer() 75 cs.start() 76 77 while True: 78 cmd = input(">>>") 79 if cmd == 'quit': 80 cs.stop() 81 break 82 logging.info(threading.enumerate()) 83 84 if __name__ == '__main__': 85 main()
1 #-*- codeing:utf-8 -*- 2 import socket 3 import threading 4 import datetime 5 import logging 6 import selectors 7 from queue import Queue 8 9 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s" 10 logging.basicConfig(level=logging.INFO,format=FORMAT) 11 12 class ChatServer: 13 def __init__(self,ip='127.0.0.1',port=9999): 14 self.sock = socket.socket() 15 self.addr = (ip,port) 16 self.clients = {} 17 self.event = threading.Event() 18 self.selector = selectors.DefaultSelector()#建立selector 19 20 def start(self):#啓動監聽 21 self.sock.bind(self.addr) 22 self.sock.listen() 23 self.sock.setblocking(False) 24 #註冊 25 self.selector.register(self.sock,selectors.EVENT_READ,self.accept) 26 threading.Thread(target=self.select,name='selector').start() 27 28 def select(self): 29 while not self.event.is_set(): 30 #開始監視,等到某文件對象被監控的事件發生,返回(key,mask)元組 31 events = self.selector.select()#阻塞,直到events 32 for key,mask in events: 33 if callable(key.data): 34 callback = key.data 35 callback = (key.fileobj,mask) 36 else: 37 callback = key.data[0] 38 callback(key,mask) 39 40 def accept(self,sock:socket.socket,mask):#接收客戶端鏈接 41 conn,raddr = self.sock.accept() 42 conn.setblocking(False) 43 self.clients[raddr] = (self.handler,Queue()) 44 #註冊,監視每個與客戶端鏈接的socket對象 45 self.selector.register(conn,selectors.EVENT_READ | selectors.EVENT_WRITE,self.clients[raddr]) 46 47 def handler(self,key:selectors.SelectorKey,mask):#接收客戶端數據 48 if mask & selectors.EVENT_READ: 49 sock = key.fileobj 50 raddr = sock.getpeername() 51 data = sock.recv(1024) 52 if not data or data == b'quit': # 客戶端主動斷開或退出,註銷並關閉socket 53 self.selector.unregister(sock) 54 sock.close() 55 self.clients.pop(raddr) 56 return 57 for k in self.selector.get_map().values(): 58 logging.info(k) 59 if isinstance(k.data,tuple): 60 k.data[1].put(data) 61 if mask & selectors.EVENT_WRITE: 62 #由於寫一直就緒,mask爲2,因此一直能夠寫,從而致使select()不斷循環,如同不阻塞同樣 63 if not key.data[1].empty(): 64 key.fileobj.send(key.data[1].get()) 65 66 # 中止服務 67 68 def stop(self): 69 self.event.set() 70 fobjs = [] 71 for fd, key in self.selector.get_map().items(): 72 fobjs.append(key.fileobj) 73 for fobj in fobjs: 74 self.selector.unregister(fobj) 75 fobj.close() 76 self.selector.close() 77 78 def main(): 79 cs = ChatServer() 80 cs.start() 81 82 while True: 83 cmd = input(">>>") 84 if cmd == 'quit': 85 cs.stop() 86 break 87 logging.info(threading.enumerate()) 88 89 if __name__ == '__main__': 90 main()
asyncio
問題的引出
def a(): for x in range(3): print(x) def b(): for x in "abc": print(x) a() b() #運行結果必定是 0 1 2 a b c
import threading import time def a(): for x in range(3): time.sleep(0.001) print(x) def b(): for x in "abc": time.sleep(0.001) print(x) threading.Thread(target=a,name='a').start() threading.Thread(target=b,name='b').start() #運行結果 a 0 b 1 c 2
#多進程版本 import multiprocessing import time def a(): for x in range(3): time.sleep(0.001) print(x) def b(): for x in "abc": time.sleep(0.001) print(x) if __name__ == '__main__': multiprocessing.Process(target=a, name='a').start() multiprocessing.Process(target=b, name='b').start() #運行結果 0 1 a 2 b c
#生成器版本 def a(): for x in range(3): print(x) yield def b(): for x in "abc": print(x) yield x = a() y = b() for i in range(3): next(x) next(y) #運行結果 0 a 1 b 2 c
事件循環
協程
協程的使用
import asyncio @asyncio.coroutine def sleep(x):#協程函數 for i in range(3): print('sleep {}'.format(i)) yield from asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(sleep(3)) loop.close()
import asyncio async def sleep(x): for i in range(3): print('sleep {}'.format(i)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(sleep(3)) loop.close()
import asyncio import threading async def sleep(x): for i in range(x): print('sleep {}'.format(i)) await asyncio.sleep(x) async def showthread(x): for i in range(x): print(threading.enumerate()) await asyncio.sleep(2) loop = asyncio.get_event_loop() tasks = [sleep(3),showthread(3)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() #運算結果 [<_MainThread(MainThread, started 21012)>] sleep 0 [<_MainThread(MainThread, started 21012)>] sleep 1 [<_MainThread(MainThread, started 21012)>] sleep 2
#協程版本 import asyncio @asyncio.coroutine def a(): for x in range(3): print('a.x',x) yield @asyncio.coroutine def b(): for x in 'abc': print('b.x',x) yield print(asyncio.iscoroutinefunction(a)) print(asyncio.iscoroutinefunction(b)) #大循環 loop = asyncio.get_event_loop() tasks = [a(),b()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() #運行結果 True True a.x 0 b.x a a.x 1 b.x b a.x 2 b.x c
TCP Echo Server舉例
import asyncio #TCP Echo Server舉例 async def handle(reader,writer): while True: data = await reader.read(1024) print(dir(reader)) print(dir(writer)) client = writer.get_extra_info('peername') message = "{} Your msg {}".format(client,data.decode()).encode() writer.writer(message) await writer.drain() loop = asyncio.get_event_loop() ip = '127.0.0.1' port = 9999 crt = asyncio.start_server(handle,ip,port,loop=loop) server = loop.run_until_complete(crt) print(server)#server是監聽的socket對象 try: loop.run_forever() except KeyboardInterrupt: pass finally: server.close() loop.close()
aiohttp庫
HTTP Server
from aiohttp import web async def indexhandle(request:web.Request): return web.Request(text=request.path,status=201) async def handle(request:web.Request): print(request.match_info) print(request.query_string)#http://127.0.0.1:8080/1?name=12301 return web.Request(text=request.match_info.get('id','0000'),status=200) app = web.Application() app.router.add_get("/",indexhandle)#http://127.0.0.1:8080/ app.router.add_get("/{id}",handle)#http://127.0.0.1:8080/12301 web.run_app(app,host='0.0.0.0',port=9977)
HTTP Client
import asyncio from aiohttp import ClientSession async def get_html(url:str): async with ClientSession() as session: async with session.get(url) as res: print(res.status) print(await res.text()) url = 'http://127.0.0.1/ziroom-web' loop = asyncio.get_event_loop() loop.run_until_complete(get_html(url)) loop.close()