一、asyncio 多線程
3.4版本加入標準庫併發
asyncio 底層基於selectors 實現,看似庫,其實就是一個框架,包含異步IO,事件循環,協程,任務等待等內容。 框架
二、問題引出異步
def a(): for x in range(3): print(x) def b(): for x in 'abc': print(x) a() b() # 運行結果 0 1 2 a b c
這是一個串行的程序。async
def a(): for x in range(3): print(x) yield def b(): for x in 'abc': print(x) yield x = a() y = b() for i in range(3): next(x) next(y)
三、事件循環:tcp
事件循環是asyncio 提供的核心運行機制ide
四、協程函數
五、協程的使用oop
3.4引入asyncio ,使用裝飾器測試
asyncio.sleep(0.001):也是一個coroutine,是一個生成器函數,yield值
1 import asyncio 2 3 @asyncio.coroutine 4 def sleep(x): # 協程函數 5 for i in range(3): 6 print('sleep {}'.format(i)) 7 yield from asyncio.sleep(x) 8 9 loop = asyncio.get_event_loop() 10 loop.run_until_complete(sleep(3)) # 將sleep(3) 封裝成Task對象執行 11 loop.close() 12 print('===================')
結果:每一秒打印一個,最終打印 ========
1 sleep 0 2 sleep 1 3 sleep 2 4 ===================
將生成器函數,轉換爲協程函數,就能夠在時間循環中執行了。
測試:
1 import asyncio 2 3 @asyncio.coroutine 4 def sleep(x): 5 for i in range(3): 6 print('sleeP {}'.format(i)) 7 yield from asyncio.sleep(x) 8 9 loop = asyncio.get_event_loop() 10 11 #本身封裝 task 對象 12 task = loop.create_task(sleep(3)) 13 print(1, task) 14 loop.run_until_complete(task) 15 print(2, task) 16 loop.close() 17 print('======== end =======')
結果:
1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:23>> 2 sleeP 0 3 sleeP 1 4 sleeP 2 5 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:23> result=None> 6 ======== end =======
測試:添加回調函數,知道運行完,返回結果(異步非阻塞)
1 import asyncio 2 3 @asyncio.coroutine 4 def sleep(x): 5 for i in range(3): 6 print('sleeP {}'.format(i)) 7 yield from asyncio.sleep(0.001) 8 # 給一個result 9 return 2000 10 11 def cb(future): # 回調函數 12 print(4, future,'===') 13 print(5, future.result()) 14 15 loop = asyncio.get_event_loop() 16 17 #本身封裝 task 對象 18 task = loop.create_task(sleep(3)) 19 task.add_done_callback(cb)# 註冊了一個回調函數 20 print(1, task) 21 loop.run_until_complete(task) 22 print(2, task) 23 print(3, task.result()) # 獲取結果 24 loop.close() 25 print('======== end =======')
結果:打印2 以前,先執行了回調函數,且獲得最終結果以前,一直在運行
1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:50]> 2 sleeP 0 3 sleeP 1 4 sleeP 2 5 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> === 6 5 2000 7 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> 8 3 2000 9 ======== end =======
測試:多任務:
1 import asyncio 2 3 @asyncio.coroutine 4 def sleep(x): 5 for i in range(3): 6 print('sleeP {}'.format(i)) 7 yield from asyncio.sleep(0.001) 8 # 給一個result 9 return 2000 10 11 @asyncio.coroutine 12 def b(): 13 for x in 'abc': 14 print(x) 15 yield from asyncio.sleep(0.001) 16 17 18 def cb(future): # 回調函數 19 print(4, future,'===') 20 print(5, future.result()) 21 22 loop = asyncio.get_event_loop() 23 24 #本身封裝 task 對象 25 task = loop.create_task(sleep(3)) 26 task.add_done_callback(cb)# 註冊了一個回調函數 27 print(1, task) 28 # 固定套路,多任務 29 tasks = [task, b()] 30 ret = loop.run_until_complete(asyncio.wait(tasks)) 31 32 print(2, task) 33 print(3, task.result()) # 獲取結果 34 print(6, ret) 35 loop.close() 36 print('======== end =======')
結果:
1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:57]> 2 sleeP 0 3 a 4 sleeP 1 5 b 6 sleeP 2 7 c 8 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> === 9 5 2000 10 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> 11 3 2000 12 6 ({<Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>, <Task finished coro=<b() done, defined at E:/code_pycharm/tt10.py:50> result=None>}, set()) 13 ======== end =======
能夠看出,返回一個元組,把以前的任務都會放在裏邊
因此獲取每一個任務的result的方式:
一、將任務封裝爲task,經過回調函數,或者,直接調用result()
二、經過任務列表返回的結果,遍歷獲取
print(6, ret[0]) for i in ret[0]: print(i.result())
3.5版本以後,Python提供關鍵字async,await,在語言上原生支持協程
1 import asyncio 2 3 async def sleep(x): 4 for i in range(3): 5 print('sleeP {}'.format(i)) 6 await asyncio.sleep(0.001) 7 # 給一個result 8 return 2000 9 10 async def b(): 11 for x in 'abc': 12 print(x) 13 await asyncio.sleep(0.001) 14 15 16 def cb(future): # 回調函數 17 print(4, future,'===') 18 print(5, future.result()) 19 20 loop = asyncio.get_event_loop() 21 22 #本身封裝 task 對象 23 task = loop.create_task(sleep(3)) 24 task.add_done_callback(cb)# 註冊了一個回調函數 25 print(1, task) 26 27 tasks = [task, b()] 28 ret = loop.run_until_complete(asyncio.wait(tasks)) 29 30 print(2, task) 31 print(3, task.result()) # 獲取結果 32 print(6, ret[0]) 33 for i in ret[0]: 34 print(i.result()) 35 loop.close() 36 print('======== end =======')
async def 用來定義協程函數,iscoroutinefunction() 返回True,協程函數中能夠不包含await,async關鍵字,但不能使用yield 關鍵字
如同生成器函數調用返生成器對象同樣,協程函數調用 也會返回一個對象稱爲協程對象,iscoroutine()返回True。
await語句以後是awaitable對象,能夠是協程或者實現了__await__()方法的對象,await會暫停當前協程執行,使用loop調度其餘協程。
tcp ECho server:
1 import asyncio 2 3 async def handle(reader:asyncio.StreamReader, writer:asyncio.StreamWriter): 4 while True: 5 data = await reader.read(1024) 6 print(dir(reader)) 7 print(dir(writer)) 8 client = writer.get_extra_info('peername') 9 message = '{} your msg {}'.format(client, data.decode()).encode() 10 writer.write(message) 11 await writer.drain() # 注意不是flush 方法 12 loop = asyncio.get_event_loop() 13 ip = '127.0.0.1' 14 port = 9999 15 crt = asyncio.start_server(handle, ip, port, loop=loop) 16 17 server = loop.run_until_complete(crt) 18 print(server) 19 try: 20 print('=========') 21 loop.run_forever() 22 except KeyboardInterrupt: 23 pass 24 finally: 25 server.close() 26 loop.run_until_complete(server.wait_closed()) 27 loop.close()
六、aiohttp庫(異步的)
pip install aiohttp
文檔:https://aiohttp.readthedocs.io/en/stable/
http server
http client