from .base_events import * from .coroutines import * #協程模塊,能夠將函數裝飾爲協程 from .events import * #事件模塊,事件循環和任務調度都將使用到他 from .futures import * #異步併發模塊,該模塊對task封裝了許多方法,表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別 from .locks import * #異步保證資源同步 from .protocols import * from .queues import * from .streams import * from .subprocess import * from .tasks import * #建立任務,是對協程的封裝,能夠查看協程的狀態。能夠將任務集合 from .transports import *
1.當咱們給一個函數添加了async關鍵字,或者使用asyncio.coroutine裝飾器裝飾,就會把它變成一個異步函數。
2.每一個線程有一個事件循環,主線程調用asyncio.get_event_loop時會建立事件循環,
3.將任務封裝爲集合asyncio.gather(*args),以後一塊兒傳入事件循環中
4.要把異步的任務丟給這個循環的run_until_complete方法,事件循環會安排協同程序的執行。和方法名字同樣,該方法會等待異步的任務徹底執行纔會結束。
import asyncio,time
@asyncio.coroutine #設爲異步函數
def func1(num):
print(num,'before---func1----')
yield from asyncio.sleep(5)
print(num,'after---func1----')
task = [func1(1),func1(2)]
if __name__ == "__main__":
begin = time.time()
loop = asyncio.get_event_loop() #進入事件循環
loop.run_until_complete(asyncio.gather(*task)) #將協同程序註冊到事件循環中
loop.close()
end = time.time()
print(end-begin)
1 before---func1---- 2 before---func1---- 1 after---func1---- 2 after---func1---- 5.00528621673584
import asyncio,time async def func1(num): #使用async關鍵字定義一個協程,協程也是一種對象,不能直接運行,須要加入事件循環中,才能被調用。 print(num,'before---func1----') if __name__ == "__main__": begin = time.time() coroutine = func1(2) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) loop.close() end = time.time() print(end-begin)
func1(2) #因爲使用async異步關鍵字,因此不能直接運行
D:/MyPython/day25/mq/multhread.py:15: RuntimeWarning: coroutine 'func1' was never awaited
func1(2)html
print(type(func1),type(coroutine)) #<class 'function'> <class 'coroutine'>
同:python---await/async關鍵字python
coroutine = func1(2) try: coroutine.send(None) except StopIteration: pass
協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象.編程
task對象是Future類的子類,保存了協程運行後的狀態,用於將來獲取協程的結果session
class BaseEventLoop(events.AbstractEventLoop): def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) try: self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result()
import asyncio,time async def func1(num): print(num,'before---func1----') if __name__ == "__main__": begin = time.time() coroutine = func1(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) #建立了任務 print(task) #pending loop.run_until_complete(task) loop.close() print(task) #finished end = time.time() print(end-begin)
對於協程的4種狀態:python---協程理解併發
print(task) #pending
print(getcoroutinestate(coroutine))
loop.run_until_complete(task)
loop.close()
print(task) #finished
print(getcoroutinestate(coroutine))
CORO_CREATED 2 before---func1---- <Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:4> result=None> CORO_CLOSED
深刻了解:關於Task,create_task(),ensure_future均可以用來建立任務,那麼應該使用哪一個?app
條件使用ensure_future,他是最外層函數,其中調用了create_task()方法,功能全面,而Task官方不推薦直接使用異步
isinstance(task, asyncio.Future)
將會輸出True。
async def func1(num):
print(num,'before---func1----')
return "recv num %s"%num
def callback(future):
print(future.result())
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(1)
loop = asyncio.get_event_loop()
task1=asyncio.ensure_future(coroutine1)
task1.add_done_callback(callback)
loop.run_until_complete(task1)
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
recv num 1
0.004000186920166016async
能夠看到,coroutine執行結束時候會調用回調函數。並經過參數future獲取協程執行的結果。咱們建立的task和回調裏的future對象,其實是同一個對象。ide
async def func1(num): print(num,'before---func1----') return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(1) loop = asyncio.get_event_loop() task1=asyncio.ensure_future(coroutine1) loop.run_until_complete(task1) print(task1) print(task1.result()) loop.close() end = time.time() print(end-begin)
1 before---func1---- <Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:6> result='recv num 1'> recv num 1 0.0030002593994140625
使用async關鍵字定義的協程對象,使用await能夠針對耗時的操做進行掛起(是生成器中的yield的替代,可是本地協程函數不容許使用),讓出當前控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘協程也掛起,或者執行完畢,在進行下一個協程的執行函數
使用asyncio.sleep模擬阻塞操做。
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) loop = asyncio.get_event_loop() task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.gather(*[task1,task2]) #gather能夠實現同時註冊多個任務,實現併發操做。wait方法使用一致 loop.run_until_complete(tasks) loop.close() end = time.time() print(end-begin)
task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.gather(*[task1,task2]) loop.run_until_complete(tasks)
task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.wait([task1,task2]) loop.run_until_complete(tasks)
Usage:
done, pending = yield from asyncio.wait(fs)
二者的返回值也是不一樣的
import asyncio,aiohttp async def fetch_async(url): print(url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(resp.status) print(await resp.text()) tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] dones, pendings = await asyncio.wait(tasks) for task in dones: #對已完成的任務集合進行操做 print("Task ret: ",task.result()) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- Task ret: recv num 4 Task ret: recv num 5 Task ret: recv num 3 5.000285863876343
results = await asyncio.gather(*tasks) for result in results: print("Task ret: ",result)
async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] return await asyncio.gather(*tasks) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print("Task ret: ",result) loop.close() end = time.time() print(end-begin)
return await asyncio.wait(tasks) ---------------------------------------------------- dones,pendings = loop.run_until_complete(main()) for task in dones: print("Task ret: ",task.result())
Return an iterator whose values are coroutines. #返回一個可迭代的協程函數值 When waiting for the yielded coroutines you'll get the results (or exceptions!) of the original Futures (or coroutines), in the order in which and as soon as they complete.
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] for task in asyncio.as_completed(tasks): result = await task print("Task ret: ",result) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() end = time.time() print(end-begin)
future對象有幾個狀態:
建立future的時候,task爲pending,
事件循環調用執行的時候固然就是running,
調用完畢天然就是done,
若是須要中止事件循環,就須要先把task取消。
可使用asyncio.Task獲取事件循環的task
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): #獲取全部任務 print(task.cancel()) #單個任務取消 loop.stop() #須要先stop循環 loop.run_forever() #須要在開啓事件循環 finally: loop.close() #統一關閉 end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- {<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loc als>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, < Task pending coro=<wait() running at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks .py:361> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<func1() running at multhread.py:5> wait _for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Loca l\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, <Task pending coro=<func1() running at multhread.py:5> wait_f or=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Local\ Programs\Python\Python35\lib\asyncio\tasks.py:428]>} #未處理,剛剛掛起爲pending狀態 True #返回True,表示cancel取消成功 True True True 3.014172315597534
True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常:
Task was destroyed but it is pending!
由於cancel後task的狀態依舊是pending
for task in asyncio.Task.all_tasks(): print(task) print(task.cancel()) print(task)
<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loca ls>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]> True <Task pending coro=<func1() running at multhread.py:5> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion () at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] dones,pendings=await asyncio.wait(tasks) for task in dones: print("Task ret: ",task.result()) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) #咱們只是把上面的單個寫成了全部任務集合取消,和協程嵌套關係不大。上面也能夠這樣寫。不過協程嵌套能夠簡化代碼 loop.stop() loop.run_forever() finally: loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- <class 'asyncio.tasks._GatheringFuture'> True 3.008172035217285
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.gather(*tasks).cancel()) loop.stop() loop.run_forever() finally: loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- True 3.008171796798706
在當前線程中建立一個事件循環(不啓用,單純獲取標識),開啓一個新的線程,在新的線程中啓動事件循環。在當前線程依據事件循環標識,能夠向事件中添加協程對象。當前線程不會因爲事件循環而阻塞了。
上面在一個線程中執行的事件循環,只有咱們主動關閉事件close,事件循環纔會結束,會阻塞。
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.run_forever() end = time.time() print(end-begin)
import asyncio,time,threading def func1(num): print(num,'before---func1----') time.sleep(num) return "recv num %s"%num def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": begin = time.time() new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用) t = threading.Thread(target=start_loop,args=(new_loop,)) #開啓新的線程去啓動事件循環 t.start() new_loop.call_soon_threadsafe(func1,3) new_loop.call_soon_threadsafe(func1,2) new_loop.call_soon_threadsafe(func1,6) end = time.time() print(end-begin) #當前線程未阻塞,耗時0.02800154685974121
3 before---func1---- 0.02800154685974121 2 before---func1---- 6 before---func1----
import asyncio,time,threading async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用) t = threading.Thread(target=start_loop,args=(new_loop,)) #開啓新的線程去啓動事件循環 t.start() asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #傳參必須是協程對象 asyncio.run_coroutine_threadsafe(coroutine2,new_loop) asyncio.run_coroutine_threadsafe(coroutine3,new_loop) end = time.time() print(end-begin) #當前線程未阻塞,耗時0.010000467300415039
5 before---func1---- 3 before---func1---- 4 before---func1---- 0.010000467300415039
主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。