Python_學習之多協程html
1、yield和yield from區別
2、gevent構建多協程
3、asyncio構建多協程
一、名詞簡介
二、經常使用方法(api)
Loop
await
Task
Future
三、asyncio 經過yield from構建多任務協程
四、asyncio經過async和await【官方推薦】
五、asyncio.run() 構建循環事件【官方推薦】
六、實例操做
6.一、批量異步處理類型相同的數據?
6.二、但願不等待全部完成了纔對結果處理,而是消費了就對結果進行處理?
6.三、動態的異步消費,好比動態接收mq或redis插入的數據?
6.四、如何將線程池或進程池與協程一塊兒用,且讓不支持協程的模塊支持?
6.5. 在flask中應用
6.六、代替requests實現的aiohttp支持異步python
前面已經分佈介紹了Python中常常用的多線程和多線程,有興趣能夠參考一下,但願能給你一點幫助,本文主要記錄下Python中的多線程用法。web
協程又稱微線程,是一種用戶態的上下文切換技術,由程序(用戶)自身控制代碼與代碼之間的切換,效率高,安全,無鎖機制。redis
Python中實現的方式主要如下幾種:編程
yield,yield from 生成器json
greenlet爲第三方模塊【沒有解決遇IO自動切換,只是阻塞】,生產中主要用gevent模塊來實現flask
asyncio,Python3.4引入的模塊【主流】api
async&awiat ,Python3.5中引入的兩個關鍵字,結合asynio模塊使用【主流】緩存
1、yield和yield from區別安全
def study_yield1(items): """一次所有返回個列表""" yield items def study_yield2(items): """一個一個返回,yield from 等價於 for i in items: yield i""" yield from items item = ["I", "Like", "Python"] for i in study_yield1(item): print(i) # ['I', 'Like', 'Python'] for j in study_yield2(item): print(j) # 'I', 'Like', 'Python' # 函數中有yield關鍵字,便是生成器函數,遇到yield關鍵字就返回後面的值,同時記錄狀態,下次調用從該狀態處執行,而不是從新開始 # yield 直接返回一個列表,而yield from 是一個一個返回,本質將後面可迭代對象轉成生成器了,所以後面可接生成器函數 # yield from 內部解決了不少異常
2、gevent構建多協程
官方文檔:http://www.gevent.org/index.html 安裝 pip install gevent # 多任務 def asynchronous_func(func_name, job_list): """多少個任務,就開啓多少個協程處理""" g_list = [gevent.spawn(func_name, job) for job in job_list] gevent.joinall(g_list) return [g.value for g in g_list]
3、asyncio構建多協程
官方文檔:https://docs.python.org/zh-cn/3.8/library/asyncio.html
借用廖雪峯老師的話,asyncio的編程模型就是一個消息循環,咱們從asyncio模塊中直接獲取一個EventLoop的引用,而後把須要執行的協程扔到EventLoop中執行,就實現了異步IO。
Python3.4引入asyncio,經過裝飾器@asyncio.coroutine標識函數是一個協程,使用yield from來驅動即遇IO切換到另外一個任務。
一、名詞簡介
異步IO:發起一個IO操做,因其耗時,不用等其結束,能夠作其餘的事情,結束時會發來通知告知。
事件循環loop:管理全部的事件【任務】,在整個程序運行過程當中不斷循環執行並追蹤事件發生的順序將它們放到隊列中,空閒時,調用相應的事件處理者來處理這些事件。
任務對象Task:是Future的子類,做用是將一個協程打包成一個task對象,爲這個協程自動排一個日程準備當即執行,並追蹤其狀態。
結果對象Future:表示一個異步運算的最終結果的處理,asyncio 中須要 Future 對象以便容許經過 async/await 使用基於回調的代碼。
可等待對象:若是一個對象能夠在await語句中使用,那麼他就是可等待對象。主要有三種類型:協程、任務Task、Future
二、經常使用方法(api)
Loop
關於循環的文章可參考:https://mp.weixin.qq.com/s/fCWQAT-O27mbi8UvKIrjWw
loop = asyncio.get_event_loop()
獲取一個標準事件循環loop對象,全部協程都是經過它來循環做業的,能夠把它理解爲一個隊列
loop.run_until_complete(future_obj)
阻塞調用,入參爲Future對象,做用是運行全部的協程,直到全部的協程都處理完了返回結果或異常才結束。
loop.close()
關閉事件循環,清除全部隊列並當即關閉執行器,不會等沒有完成的任務完成,冪等【相同的參數執行相同的函數結果必須同樣】不可逆。
await
asyncio.wait(可等待對象awaitable,timeout=None)
內部將咱們傳入的任務封裝成task對象,返回值爲元祖,一個爲完成的done列表,一個爲還未完成的pending列表,若是設置timeout,在規定時間內,沒返回的都放到未完成列表中,協程返回的結果順序是無序的,完成的結果調用d.result()方法獲取任務的返回值
asyncio.gather(可等待對象)
功能通asyncio.wait(),但返回的結果順序,是按放入任務的順序,有序的
asyncio.sleep(秒數,result="默認返回的結果")
模擬IO操做,這種休眠不會阻塞事件循環,前面加上await後將控制權交給主事件循環,不能用time.sleep(),因其會釋放GIL,從而阻塞整個主線程,繼而阻塞整個事件循環。
Task
Task:是Future的子類,做用是將一個協程打包成一個task對象,爲這個協程自動排一個日程準備當即執行【白話就是將多個協程任務,排一個時間表自動併發的執行(遇到io就自動切另外一個任務)】。
底層接口:loop.create_task()
asyncio.create_task(協程)
將一個協程打包爲一個task,排入日程準備執行,返回task對象,Python3.7加入的,3.7以前經過asyncio.ensure_future(協程)實現
Future
Future:表示一個異步運算的最終結果,是一個awaitable對象,協程能夠等待 Future 對象直到它們有結果或異常集合或被取消。在 asyncio 中須要 Future 對象以便容許經過 async/await 使用基於回調的代碼。
asyncio.create_future(協程)
三、asyncio 經過yield from構建多任務協程
Python3.4引入asyncio,經過裝飾器@asyncio.coroutine標識函數是一個協程,使用yield from來驅動即遇IO切換到另外一個任務。
# 最多見用法 import time import asyncio @asyncio.coroutine def task(n): print(f"in {task.__name__}:{n} start") yield from asyncio.sleep(n) # 模擬IO時間 print(f"in {task.__name__}:{n} end") return f"01_{n}" @asyncio.coroutine def main(): # 構建任務集合 tasks = [ asyncio.ensure_future(task(1)), asyncio.ensure_future(task(2)), ] #或者以下也行,加入asyncio.wait() 會自動將協程轉成task對象 #tasks = [task(1), task(2)] print(tasks) done, pending = yield from asyncio.wait(tasks,timeout=None) # 返回兩個列表,done是完成的任務返回的結果列表,pending是未完成的列表,調用result()獲得返回值 # timeout默認爲None,表示一直等任務都完成,若是設置了時間,則規定時間沒有返回則放入未完成列表中 # 完成列表每一個元素調用result()方法就能夠獲得對應任務的結果 task_result = [d.result() for d in done] return task_result for d in done: print(f"協程任務結果爲:{d.result()}") if __name__ == '__main__': start = time.time() # 建立主線程的事件循環對象 loop = asyncio.get_event_loop() # 裝載任務 result = loop.run_until_complete(main()) # 關閉循環 loop.close() print(f"任務結果爲:{result}") print(f"總耗時:{time.time()-start}") """結果爲: [<Task pending name='Task-2' coro=<task() running at ...>>, <Task pending name='Task-3' coro=<task() running at...>>] def main(): in task:1 start in task:2 start in task:1 end in task:2 end 協程任務結果爲:01_2 協程任務結果爲:01_1 任務結果爲:['01_2', '01_1'] 總耗時:2.002303123474121 """
四、asyncio經過async和await【官方推薦】
python3.5後官方推薦爲了區分生成器和協程,其實就是將@asyncio.coroutine 換成了 async,yield from 換成了 await
import asyncio async def task01(n): print(1) await asyncio.sleep(n) print(2) return n async def task02(n): print(3) await asyncio.sleep(n) print(4) return n if __name__ == '__main__': tasks = [task01(3), task02(2)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
五、asyncio.run() 構建循環事件【官方推薦】
python3.7纔有run方法
""" asyncio.run() 函數用來運行最高層級的入口點 "main()" 函數 此函數運行傳入的協程,負責管理 asyncio 事件循環並 完結異步生成器。 當有其餘 asyncio 事件循環在同一線程中運行時,此函數不能被調用。 若是 debug 爲 True,事件循環將以調試模式運行。 此函數---->老是會建立一個新的事件循環並在結束時關閉之<-----。 它應當被用做 asyncio 程序的主入口點,理想狀況下應當只被調用一次。 """ import asyncio async def task(m): print(f"start{m}") await asyncio.sleep(m) # IO耗時,經過await 掛起當前協程,事件循環去執行其餘的協程,等IO耗時結束了,再繼續。 print(f"end{m}") return m async def main1(): # 入口 # 建立多個任務 task1 = asyncio.create_task(task(3)) task2 = asyncio.create_task(task(2)) ret1 = await task1 ret2 = await task2 ret = asyncio.run(main1()) # 寫法2: async def main2(): tasks = [task(3), task(2)] done, p = await asyncio.wait(tasks, timeout=None) return [d.result() for d in done] ret = asyncio.run(main2()) # 寫法3: tasks = [task(3), task(2)] done,p = asyncio.run(asyncio.wait(tasks)) ret = [d.result() for d in done] # 不容許這樣寫tasks = [asyncio.create_task(task(3)), asyncio.create_task(task(2))] """ 由於這時候尚未loop,看create_task源碼 def create_task(coro, *, name=None): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() task = loop.create_task(coro) _set_task_name(task, name) return task """ 注:經過asyncio模塊執行異步,全部的流程第一步是先有事件循環,而後才能將可等待對象封裝成task對象,放入事件循環,才能實現異步,決不容許先建task對象,而後建loop,這樣會報錯
六、實例操做
6.一、批量異步處理類型相同的數據?
import asyncio async def task(data): """消費消息,解析消息,處理消息,返回處理結果成功失敗""" # 假設data的格式爲{"order_id":訂單號,"num":數量} await asyncio.sleep(2) if data["num"]: return {"order_id": data["order_id"], "check": 1} else: return {"order_id": data["order_id"], "check": 0} if __name__ == '__main__': import time import random start_time = time.time() messages = [{"order_id": str(o), "num": random.choice([0, 1])} for o in range(835001, 835501)] jobs = [task(d) for d in messages] done, pending = asyncio.run(asyncio.wait(jobs)) print(f"消費結果爲:{[r.result() for r in done]}") print(f"總耗時:{time.time()-start_time}")
6.二、但願不等待全部完成了纔對結果處理,而是消費了就對結果進行處理?
import asyncio success_count = 0 fail_count = 0 async def task(data): """消費消息,解析消息,處理消息,返回處理結果成功失敗""" # 假設data的格式爲{"order_id":訂單號,"num":數量} print(f"開始執行:{data}") await asyncio.sleep(2) if data["num"]: res = {"order_id": data["order_id"], "check": 1} else: res = {"order_id": data["order_id"], "check": 0} print(f"執行完成:{data}") return res def my_call_back(future): """對消費的消息結果進行實時處理,好比統計成功率,用於實時可視化展現""" time.sleep(1) result = future.result() global success_count, fail_count if result["check"] == 1: # 能夠是操做緩存 success_count += 1 else: fail_count += 1 print(f"{result['order_id']}如今成功數:{success_count}") print(f"{result['order_id']}如今失敗數:{fail_count}") if __name__ == '__main__': import time import random start_time = time.time() messages = [{"order_id": str(o), "num": random.choice([0, 1])} for o in range(835001, 835006)] jobs = [] loop = asyncio.get_event_loop() for m in messages: job = loop.create_task(task(m)) job.add_done_callback(my_call_back) # 加入回調函數 jobs.append(job) loop.run_until_complete(asyncio.wait(jobs)) loop.close() print(f"總耗時:{time.time() - start_time}") print(f"最終成功數:{success_count}") print(f"最終失敗數:{fail_count}") # 注意:回調函數不能是協程,不能是協程,不能是協程!!!
6.三、動態的異步消費,好比動態接收mq或redis插入的數據?
# 生產者代碼忽略 # 消費者 """ 解決方案是:建立一個線程,用於讓事件循環永遠執行 """ import asyncio import threading def always_run_loop(event_loop): asyncio.set_event_loop(event_loop) event_loop.run_forever() def get_redis(): """獲取鏈接""" import redis conn_pool = redis.ConnectionPool(host="127.0.0.1", port=6379, max_connections=10) return redis.Redis(connection_pool=conn_pool) async def call_back_task(data): """消費數據""" print(data) await asyncio.sleep(2) return data if __name__ == '__main__': redis_pool = get_redis() loop = asyncio.new_event_loop() loop_th = threading.Thread(target=always_run_loop, args=(loop,)) loop_th.setDaemon(True) loop_th.start() while True: message = redis_pool.rpop("check") if message: # 異步動態添加到協程中 asyncio.run_coroutine_threadsafe(call_back_task(message), loop)
6.四、如何將線程池或進程池與協程一塊兒用,且讓不支持協程的模塊支持?
import asyncio import requests async def download_image(url): # 發送網絡請求,下載圖片(遇到網絡下載圖片的IO請求,自動化切換到其餘任務) print("開始下載:", url) loop = asyncio.get_event_loop() # requests模塊默認不支持異步操做,因此就使用線程池來配合實現了。 future = loop.run_in_executor(None, requests.get, url) response = await future print('下載完成') # 圖片保存到本地文件 file_name = url.rsplit('_')[-1] with open(file_name, mode='wb') as file_object: file_object.write(response.content) if __name__ == '__main__': url_list = [ 'https://img.yituyu.com/gallery/1110/01_PnhbzecG.jpg', 'https://img.yituyu.com/gallery/1110/02_rWAsk0kY.JPG', 'https://img.yituyu.com/gallery/1114/00_8Q85y28B.jpg' ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
6.5. 在flask中應用
import asyncio from flask import Flask app = Flask(__name__) async def first(): await asyncio.sleep(20) return 'first' async def second(): await asyncio.sleep(10) return 'second' async def third(): await asyncio.sleep(10) return 'third' def ordinary_generator(): import sys if not sys.platform.startswith("win"): import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for future in asyncio.as_completed([first(), second(), third()]): print('reached') yield loop.run_until_complete(future) @app.route('/') def healthcheck(): """ Retrieves the health of the service. """ import time time_s = time.time() for element in ordinary_generator(): print(element) print(f"{time.time() - time_s}") return "Health check passed" if __name__ == '__main__': app.run(debug=True)
6.六、代替requests實現的aiohttp支持異步
pip install aiohttp
官方文檔:https://docs.aiohttp.org/en/stable/
import asyncio import aiohttp # 客戶端使用 async def get_page(session, url): async with session.get(url) as response: if response.status == 200: text = await response.content.read() with open(f"圖片-{url.split('/')[-1]}", "wb") as f: f.write(text) f.flush() async def my_main(): async with aiohttp.ClientSession() as session: urls = [ "http://pic1.win4000.com/wallpaper/4/53ec50e410310.jpg", "http://pic1.win4000.com/m00/a5/d1/8ab24d2d749ad08fe2b99830d5b30065.jpg", "http://pic1.win4000.com/m00/f8/40/a0f4ea98e5b518c410b189a36704f459.jpg" ] tasks = [asyncio.create_task(get_page(session, url)) for url in urls] await asyncio.wait(tasks) asyncio.run(my_main()) # 服務端 from aiohttp import web async def health_check(request): print(f"version:請求HTTP版本->{request.version}") print(f"method:請求HTTP方法->{request.method}") print(f"scheme:是http仍是https->{request.scheme}") print(f"secure:是不是https,返回bool->{request.secure}") print(f"host:服務器地址->{request.host}") print(f"remote:請求來源的地址->{request.remote}") print(f"url:請求url全路徑->{request.url}") print(f"rel_url:請求url相對路徑,無host->{request.rel_url}") print(f"path_qs:包含路徑及參數->{request.path_qs}") print(f"path:url解碼過的->{request.path}") print(f"raw_path:url未解碼前的信息->{request.raw_path}") print(f"query:獲取get請求url中的參數->{request.query}") print(f"query_string:原始請求數據->{request.query_string}") print(f"headers:獲取header頭部信息->{request.headers}") print(f"content_type:獲取請求消息的格式->{request.content_type}") print(f"keep_alive:是否保持長連接->{request.keep_alive}") print(f"cookies:{request.cookies}") print(f"content:{request.content}") print(f"match_info:{request.match_info}->路由解析結果") return web.Response(text="aiohttp server is ok") async def save_db(request): data = await request.post() print(f"獲取post請求體:{data}") return web.json_response({"key": 1111}) # 建立應用實例 app = web.Application() # 註冊路由 app.add_routes( [ web.get('/', health_check), web.get('/{version}', health_check), web.post('/send', save_db) ] ) if __name__ == '__main__': web.run_app(app, host="127.0.0.88", port=9527)