Python-asyncio

一、asyncio 多線程

  3.4版本加入標準庫併發

  asyncio 底層基於selectors 實現,看似庫,其實就是一個框架,包含異步IO,事件循環,協程,任務等待等內容。   框架

二、問題引出異步

def a():
    for x in range(3):
        print(x)

def b():
    for x in 'abc':
        print(x)
a()
b()

# 運行結果
0
1
2
a
b
c

 

這是一個串行的程序。async

def a():
    for x in range(3):
        print(x)
        yield

def b():
    for x in 'abc':
        print(x)
        yield

x = a()
y = b()
for i in range(3):
    next(x)
    next(y)

 

三、事件循環:tcp

  事件循環是asyncio 提供的核心運行機制ide

  

四、協程函數

  • 協程不是進程,也不是線程,它是用戶空間調度的完成併發處理的方式
  • 進程,線程由操做系統完成調度,而協程是線程內完成調度。它不須要更多的線程,天然也沒有多線程切換帶來的開銷
  • 協程是非搶佔式調度,只有一個協程主動讓出控制權,另外一個協程纔會被調度
  • 協程不須要使用鎖機制,由於在同一個線程中執行。
  • 多CPU下,能夠使用多進程和協程配合,既能進程併發,又能發揮協程在單線程中的 優點
  • Python中協程是基於生成器的。

五、協程的使用oop

  3.4引入asyncio ,使用裝飾器測試

  asyncio.sleep(0.001):也是一個coroutine,是一個生成器函數,yield值
 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x): #  協程函數
 5     for i in range(3):
 6         print('sleep {}'.format(i))
 7         yield from asyncio.sleep(x)
 8 
 9 loop = asyncio.get_event_loop()
10 loop.run_until_complete(sleep(3)) # 將sleep(3) 封裝成Task對象執行
11 loop.close()
12 print('===================')

    結果:每一秒打印一個,最終打印 ========

1 sleep 0
2 sleep 1
3 sleep 2
4 ===================

 

  將生成器函數,轉換爲協程函數,就能夠在時間循環中執行了。

  測試:

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print('sleeP {}'.format(i))
 7         yield from asyncio.sleep(x)
 8 
 9 loop = asyncio.get_event_loop()
10 
11 #本身封裝 task 對象
12 task = loop.create_task(sleep(3))
13 print(1, task)
14 loop.run_until_complete(task)
15 print(2, task)
16 loop.close()
17 print('======== end =======')

  結果:

1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:23>>
2 sleeP 0
3 sleeP 1
4 sleeP 2
5 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:23> result=None>
6 ======== end =======

 

  測試:添加回調函數,知道運行完,返回結果(異步非阻塞)

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print('sleeP {}'.format(i))
 7         yield from asyncio.sleep(0.001)
 8     # 給一個result
 9     return 2000
10 
11 def cb(future): # 回調函數
12     print(4, future,'===')
13     print(5, future.result())
14 
15 loop = asyncio.get_event_loop()
16 
17 #本身封裝 task 對象
18 task = loop.create_task(sleep(3))
19 task.add_done_callback(cb)# 註冊了一個回調函數
20 print(1, task)
21 loop.run_until_complete(task)
22 print(2, task)
23 print(3, task.result()) # 獲取結果
24 loop.close()
25 print('======== end =======')

 

  結果:打印2 以前,先執行了回調函數,且獲得最終結果以前,一直在運行

1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:50]>
2 sleeP 0
3 sleeP 1
4 sleeP 2
5 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> ===
6 5 2000
7 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>
8 3 2000
9 ======== end =======

 

 

  測試:多任務:

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print('sleeP {}'.format(i))
 7         yield from asyncio.sleep(0.001)
 8     # 給一個result
 9     return 2000
10 
11 @asyncio.coroutine
12 def b():
13     for x in 'abc':
14         print(x)
15         yield from asyncio.sleep(0.001)
16 
17 
18 def cb(future): # 回調函數
19     print(4, future,'===')
20     print(5, future.result())
21 
22 loop = asyncio.get_event_loop()
23 
24 #本身封裝 task 對象
25 task = loop.create_task(sleep(3))
26 task.add_done_callback(cb)# 註冊了一個回調函數
27 print(1, task)
28 # 固定套路,多任務
29 tasks = [task, b()] 30 ret = loop.run_until_complete(asyncio.wait(tasks)) 31 
32 print(2, task)
33 print(3, task.result()) # 獲取結果
34 print(6, ret)
35 loop.close()
36 print('======== end =======')

  結果:

 1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:57]>
 2 sleeP 0 3 a 4 sleeP 1 5 b 6 sleeP 2 7 c  8 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> ===
 9 5 2000
10 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>
11 3 2000
12 6 ({<Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>, <Task finished coro=<b() done, defined at E:/code_pycharm/tt10.py:50> result=None>}, set()) 13 ======== end =======

 

  能夠看出,返回一個元組,把以前的任務都會放在裏邊

    因此獲取每一個任務的result的方式:

      一、將任務封裝爲task,經過回調函數,或者,直接調用result()

      二、經過任務列表返回的結果,遍歷獲取        

print(6, ret[0])
for i in ret[0]:
    print(i.result())

 

  3.5版本以後,Python提供關鍵字async,await,在語言上原生支持協程

 1 import asyncio
 2 
 3 async def sleep(x):
 4     for i in range(3):
 5         print('sleeP {}'.format(i))
 6         await asyncio.sleep(0.001)
 7     # 給一個result
 8     return 2000
 9 
10 async def b():
11     for x in 'abc':
12         print(x)
13         await asyncio.sleep(0.001)
14 
15 
16 def cb(future): # 回調函數
17     print(4, future,'===')
18     print(5, future.result())
19 
20 loop = asyncio.get_event_loop()
21 
22 #本身封裝 task 對象
23 task = loop.create_task(sleep(3))
24 task.add_done_callback(cb)# 註冊了一個回調函數
25 print(1, task)
26 
27 tasks = [task, b()]
28 ret = loop.run_until_complete(asyncio.wait(tasks))
29 
30 print(2, task)
31 print(3, task.result()) # 獲取結果
32 print(6, ret[0])
33 for i in ret[0]:
34     print(i.result())
35 loop.close()
36 print('======== end =======')

 

  async def 用來定義協程函數,iscoroutinefunction() 返回True,協程函數中能夠不包含await,async關鍵字,但不能使用yield 關鍵字

  如同生成器函數調用返生成器對象同樣,協程函數調用 也會返回一個對象稱爲協程對象,iscoroutine()返回True。

  await語句以後是awaitable對象,能夠是協程或者實現了__await__()方法的對象,await會暫停當前協程執行,使用loop調度其餘協程。

 

  tcp  ECho server:

 1 import asyncio
 2 
 3 async def handle(reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
 4     while True:
 5         data = await reader.read(1024)
 6         print(dir(reader))
 7         print(dir(writer))
 8         client = writer.get_extra_info('peername')
 9         message = '{} your msg {}'.format(client, data.decode()).encode()
10         writer.write(message)
11         await writer.drain() # 注意不是flush 方法
12 loop = asyncio.get_event_loop()
13 ip = '127.0.0.1'
14 port = 9999
15 crt = asyncio.start_server(handle, ip, port, loop=loop)
16 
17 server = loop.run_until_complete(crt)
18 print(server)
19 try:
20     print('=========')
21     loop.run_forever()
22 except KeyboardInterrupt:
23     pass
24 finally:
25     server.close()
26     loop.run_until_complete(server.wait_closed())
27     loop.close()
View Code

 

 

六、aiohttp庫(異步的)

pip install aiohttp

文檔:https://aiohttp.readthedocs.io/en/stable/

http server

http client

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息