from .base_events import * from .coroutines import * #協程模塊,能夠將函數裝飾爲協程 from .events import * #事件模塊,事件循環和任務調度都將使用到他 from .futures import * #異步併發模塊,該模塊對task封裝了許多方法,表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別 from .locks import * #異步保證資源同步 from .protocols import * from .queues import * from .streams import * from .subprocess import * from .tasks import * #建立任務,是對協程的封裝,能夠查看協程的狀態。能夠將任務集合 from .transports import *
import asyncio,time
@asyncio.coroutine #設爲異步函數
def func1(num):
yield from asyncio.sleep(5)
task = [func1(1),func1(2)]
if __name__ == "__main__":
begin = time.time()
loop = asyncio.get_event_loop() #進入事件循環
loop.run_until_complete(asyncio.gather(*task)) #將協同程序註冊到事件循環中
end = time.time()
1 before---func1---- 2 before---func1---- 1 after---func1---- 2 after---func1---- 5.00528621673584
import asyncio,time async def func1(num): #使用async關鍵字定義一個協程,協程也是一種對象,不能直接運行,須要加入事件循環中,才能被調用。 print(num,'before---func1----') if __name__ == "__main__": begin = time.time() coroutine = func1(2) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) loop.close() end = time.time() print(end-begin)
func1(2) #因爲使用async異步關鍵字,因此不能直接運行
D:/MyPython/day25/mq/ RuntimeWarning: coroutine 'func1' was never awaited
print(type(func1),type(coroutine)) #<class 'function'> <class 'coroutine'>
coroutine = func1(2) try: coroutine.send(None) except StopIteration: pass
class BaseEventLoop(events.AbstractEventLoop): def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) try: self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result()
import asyncio,time async def func1(num): print(num,'before---func1----') if __name__ == "__main__": begin = time.time() coroutine = func1(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) #建立了任務 print(task) #pending loop.run_until_complete(task) loop.close() print(task) #finished end = time.time() print(end-begin)
print(task) #pending
print(task) #finished
CORO_CREATED 2 before---func1---- <Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/> result=None> CORO_CLOSED
isinstance(task, asyncio.Future)
async def func1(num):
return "recv num %s"%num
def callback(future):
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(1)
loop = asyncio.get_event_loop()
end = time.time()
1 before---func1----
recv num 1
async def func1(num): print(num,'before---func1----') return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(1) loop = asyncio.get_event_loop() task1=asyncio.ensure_future(coroutine1) loop.run_until_complete(task1) print(task1) print(task1.result()) loop.close() end = time.time() print(end-begin)
1 before---func1---- <Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/> result='recv num 1'> recv num 1 0.0030002593994140625
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) loop = asyncio.get_event_loop() task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.gather(*[task1,task2]) #gather能夠實現同時註冊多個任務,實現併發操做。wait方法使用一致 loop.run_until_complete(tasks) loop.close() end = time.time() print(end-begin)
task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.gather(*[task1,task2]) loop.run_until_complete(tasks)
task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.wait([task1,task2]) loop.run_until_complete(tasks)
done, pending = yield from asyncio.wait(fs)
import asyncio,aiohttp async def fetch_async(url): print(url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(resp.status) print(await resp.text()) tasks = [fetch_async(''), fetch_async('')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] dones, pendings = await asyncio.wait(tasks) for task in dones: #對已完成的任務集合進行操做 print("Task ret: ",task.result()) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- Task ret: recv num 4 Task ret: recv num 5 Task ret: recv num 3 5.000285863876343
results = await asyncio.gather(*tasks) for result in results: print("Task ret: ",result)
async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] return await asyncio.gather(*tasks) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print("Task ret: ",result) loop.close() end = time.time() print(end-begin)
return await asyncio.wait(tasks) ---------------------------------------------------- dones,pendings = loop.run_until_complete(main()) for task in dones: print("Task ret: ",task.result())
Return an iterator whose values are coroutines. #返回一個可迭代的協程函數值 When waiting for the yielded coroutines you'll get the results (or exceptions!) of the original Futures (or coroutines), in the order in which and as soon as they complete.
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] for task in asyncio.as_completed(tasks): result = await task print("Task ret: ",result) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() end = time.time() print(end-begin)
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): #獲取全部任務 print(task.cancel()) #單個任務取消 loop.stop() #須要先stop循環 loop.run_forever() #須要在開啓事件循環 finally: loop.close() #統一關閉 end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- {<Task pending coro=<func1() running at> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loc als>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\]>, < Task pending coro=<wait() running at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks .py:361> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<func1() running at> wait _for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Loca l\Programs\Python\Python35\lib\asyncio\]>, <Task pending coro=<func1() running at> wait_f or=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Local\ Programs\Python\Python35\lib\asyncio\]>} #未處理,剛剛掛起爲pending狀態 True #返回True,表示cancel取消成功 True True True 3.014172315597534
True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常:
Task was destroyed but it is pending!
for task in asyncio.Task.all_tasks(): print(task) print(task.cancel()) print(task)
<Task pending coro=<func1() running at> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loca ls>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\]> True <Task pending coro=<func1() running at> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion () at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\]>
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] dones,pendings=await asyncio.wait(tasks) for task in dones: print("Task ret: ",task.result()) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) #咱們只是把上面的單個寫成了全部任務集合取消,和協程嵌套關係不大。上面也能夠這樣寫。不過協程嵌套能夠簡化代碼 loop.stop() loop.run_forever() finally: loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- <class 'asyncio.tasks._GatheringFuture'> True 3.008172035217285
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.gather(*tasks).cancel()) loop.stop() loop.run_forever() finally: loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- True 3.008171796798706
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.run_forever() end = time.time() print(end-begin)
import asyncio,time,threading def func1(num): print(num,'before---func1----') time.sleep(num) return "recv num %s"%num def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": begin = time.time() new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用) t = threading.Thread(target=start_loop,args=(new_loop,)) #開啓新的線程去啓動事件循環 t.start() new_loop.call_soon_threadsafe(func1,3) new_loop.call_soon_threadsafe(func1,2) new_loop.call_soon_threadsafe(func1,6) end = time.time() print(end-begin) #當前線程未阻塞,耗時0.02800154685974121
3 before---func1---- 0.02800154685974121 2 before---func1---- 6 before---func1----
import asyncio,time,threading async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用) t = threading.Thread(target=start_loop,args=(new_loop,)) #開啓新的線程去啓動事件循環 t.start() asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #傳參必須是協程對象 asyncio.run_coroutine_threadsafe(coroutine2,new_loop) asyncio.run_coroutine_threadsafe(coroutine3,new_loop) end = time.time() print(end-begin) #當前線程未阻塞,耗時0.010000467300415039
5 before---func1---- 3 before---func1---- 4 before---func1---- 0.010000467300415039