用於asyncio和Python的異步HTTP客戶端/服務器html
主要特色:python
客戶端:web
import aiohttp import asyncio async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'http://python.org') print(html) loop = asyncio.get_event_loop() loop.run_until_complete(main())
服務端:redis
from aiohttp import web async def handle(request): name = request.match_info.get('name', "Anonymous") text = "Hello, " + name return web.Response(text=text) app = web.Application() app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) web.run_app(app)
一、post、get接收參數數據數據庫
#!/usr/bin/env python # -*- coding:utf-8 -*- import asyncio from aiohttp import web class Application(object): ''' aiohttp 接口 ''' def __init__(self): pass async def prepare_init(self): ''' 預加載 :return: ''' self.hearders_setting = [ web.post('/', self.post), web.get('/', self.get), ] async def get(self,request): print(request.app['db']) arguments = request.query print(arguments) # < MultiDictProxy('model': 'aiohttp') > return web.Response(text='get') async def post(self,request): arguments = await request.post() print(arguments) # < MultiDictProxy('model': 'aiohttp') > # 獲取ip地址 print(request.transport.get_extra_info('peername')) # ('127.0.0.1', 53245) print(request.path) # / print(request.raw_path) return web.Response(text='post') async def app_factory(self): ''' 配置app :return: ''' await self.prepare_init() app = web.Application() app.add_routes(self.hearders_setting) app['db'] = 'db' return app def run_forever(self): ''' 開啓端口 :return: ''' web.run_app(self.app_factory()) Application().run_forever()
1 import requests 2 3 4 requests.post(url='http://127.0.0.1:8080/',data={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'}) 5 requests.get(url='http://127.0.0.1:8080/',params={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
二、web處理程序取消json
#!/usr/bin/env python # -*- coding:utf-8 -*- import asyncio from aiohttp import web async def something(): await asyncio.sleep(2) print('handle out') async def post(request): arguments = await request.post() await something() return web.Response(text='post')
注:當客戶端請求過程當中,中斷請求,此時handle out 並不會打印執行,直接取消運行;服務器
官方:await
若是客戶端斷開鏈接而不讀取整個響應的BODY,則能夠取消每一個 Web處理程序執行。這種行爲與經典的WSGI框架(如Flask和Django)大相徑庭websocket
有時它是一種理想的行爲:在處理GET
請求時,代碼可能從數據庫或其餘Web資源獲取數據,提取可能很慢。網絡
取消此很是好:對等已經斷開鏈接,沒有理由經過從DB獲取數據而沒有任何機會將其發送回對等來浪費時間和資源(內存等)。session
但有時取消很糟糕:根據POST
要求,一般須要將數據保存到數據庫,而無論對等關閉。
取消預防能夠經過如下幾種方式實施:
asyncio.shield()
將數據保存到DB的協同程序① shield
import asyncio from aiohttp import web async def something(): await asyncio.sleep(2) print('handle out') async def post(request): arguments = await request.post() await asyncio.shield(something()) return web.Response(text='post')
asyncio.shield()
工做得很好。惟一的缺點是你須要將Web處理程序分紅兩個異步函數:一個用於處理程序自己,另外一個用於受保護的代碼。
async def handler(request): await asyncio.shield(write_to_redis(request)) await asyncio.shield(write_to_postgres(request)) return web.Response(text='OK')
在REDIS中保存數據後可能會發生取消, write_to_postgres
不會被調用
② 建立任務
import asyncio from aiohttp import web async def something(): await asyncio.sleep(2) print('handle out') async def post(request): arguments = await request.post() request.loop.create_task(something()) return web.Response(text='post')
產生一項新任務的狀況要糟糕得多:沒有地方能夠await
產生任務 ;request.loop.create_task(something()) 這個沒有辦法awit獲取值,若是加上awit,則致使中斷時,something中止工做
三、數據共享
aiohttp.web
不鼓勵使用全局變量,每一個變量都應該有本身的非全局上下文。
因此,Application
並Request
支持一個collections.abc.MutableMapping
接口(即它們是相似dict的對象),容許它們用做數據存儲。
① 應用程序的配置
要存儲相似全局變量,請隨意將它們保存在 Application
實例中:
app['my_private_key'] = data
並在Web處理程序中獲取它:
async def handler(request): data = request.app['my_private_key']
在嵌套應用程序的狀況下,所需的查找策略可能以下:
爲此,請使用Request.config_dict
只讀屬性:
async def handler(request): data = request.config_dict['my_private_key']
② 請求的存儲
Variables that are only needed for the lifetime of a Request
, can be stored in a Request
:
async def handler(request): request['my_private_key'] = "data" ...
這對於中間件和 信號處理程序來講很是有用,能夠存儲數據以供鏈中的下一個處理程序進一步處理。
③ 響應的存儲
StreamResponse
和Response
對象也支持collections.abc.MutableMapping
接口。當您但願在處理程序中的全部工做完成後與信號和中間件共享數據時,這很是有用:
async def handler(request): [ do all the work ] response['my_metric'] = 123 return response
四、中間件
aiohttp.web
提供了一種經過中間件自定義請求處理程序的強大機制 。
一箇中間件是能夠修改請求或響應中的協程。例如,這是一個附加 到響應的簡單中間件:'wink'
from aiohttp.web import middleware @middleware async def middleware(request, handler): resp = await handler(request) resp.text = resp.text + ' wink' return resp
注意:該示例不適用於流式響應或websockets
每一箇中間件都應該接受兩個參數,一個request
實例和一個處理程序,並返回響應或引起異常。若是異常不是HTTPException
它的實例,則500
HTTPInternalServerError
在處理中間件鏈以後將 其轉換爲。
警告:第二個參數應該徹底命名爲handler
建立時Application
,這些中間件將傳遞給僅限關鍵字的middlewares
參數:
app = web.Application(middlewares=[middleware_1, middleware_2])
在內部,經過以相反的順序將中間件鏈應用於原始處理程序來構造單個請求處理程序,並由RequestHandler
做爲常規處理程序調用。
因爲中間件自己就是協程,所以await
在建立新的處理程序時可能會執行額外的 調用,例如調用數據庫等。
中間件一般會調用處理程序,可是他們可能會選擇忽略它,例如,若是用戶沒有訪問底層資源的權限,則顯示403 Forbidden頁面或引起HTTPForbidden
異常。它們還可能呈現處理程序引起的錯誤,執行一些預處理或後處理,如處理CORS等。
如下代碼演示了中間件的執行順序:
from aiohttp import web async def test(request): print('Handler function called') return web.Response(text="Hello") @web.middleware async def middleware1(request, handler): print('Middleware 1 called') response = await handler(request) print('Middleware 1 finished') return response @web.middleware async def middleware2(request, handler): print('Middleware 2 called') response = await handler(request) print('Middleware 2 finished') return response app = web.Application(middlewares=[middleware1, middleware2]) app.router.add_get('/', test) web.run_app(app)
輸出
Middleware 1 called Middleware 2 called Handler function called Middleware 2 finished Middleware 1 finished
中間件的常見用途是實現自定義錯誤頁面。如下示例將使用JSON響應呈現404錯誤,由於可能適合JSON REST服務:
from aiohttp import web @web.middleware async def error_middleware(request, handler): try: response = await handler(request) if response.status != 404: return response message = response.message except web.HTTPException as ex: if ex.status != 404: raise message = ex.reason return web.json_response({'error': message}) app = web.Application(middlewares=[error_middleware])
中間件工廠
一箇中間件工廠是建立與傳遞參數的中間件功能。例如,這是一個簡單的中間件工廠:
def middleware_factory(text): @middleware async def sample_middleware(request, handler): resp = await handler(request) resp.text = resp.text + text return resp return sample_middleware
請記住,與常規中間件相反,您須要中間件工廠的結果而不是功能自己。所以,當將中間件工廠傳遞給應用程序時,您實際須要調用它:
app = web.Application(middlewares=[middleware_factory(' wink')])
五、信號
雖然middleware能夠在準備響應以前或以後定製請求處理程序,但在準備響應時不能定製響應。For this aiohttp.web
provides signals.
例如,中間件只能爲未準備好的 響應更改HTTP標頭(請參閱參考資料StreamResponse.prepare()
),但有時咱們須要一個鉤子來更改流式響應和WebSockets的HTTP標頭。這能夠經過訂閱Application.on_response_prepare
信號來完成 :
async def on_prepare(request, response): response.headers['My-Header'] = 'value' app.on_response_prepare.append(on_prepare)
此外,能夠訂閱Application.on_startup
和 Application.on_cleanup
信號以進行應用程序組件設置並相應地拆除。
如下示例將正確初始化並配置aiopg鏈接引擎:
from aiopg.sa import create_engine async def create_aiopg(app): app['pg_engine'] = await create_engine( user='postgre', database='postgre', host='localhost', port=5432, password='' ) async def dispose_aiopg(app): app['pg_engine'].close() await app['pg_engine'].wait_closed() app.on_startup.append(create_aiopg) app.on_cleanup.append(dispose_aiopg)
信號處理程序不該返回值,但能夠修改傳入的可變參數。
信號處理程序將按順序運行,以便添加它們。從aiohttp 3.0開始,全部處理程序必須是異步的。
Application.on_startup
/ Application.on_cleanup
對仍有陷阱:信號處理程序彼此獨立。
E.g. we have [create_pg, create_redis]
in startup signal and [dispose_pg,dispose_redis]
in cleanup.
If, for example, create_pg(app)
call fails create_redis(app)
is not called. But on application cleanup both dispose_pg(app)
and dispose_redis(app)
are still called:
清理信號不知道啓動/清理對及其執行狀態。
解決方案是Application.cleanup_ctx
用法:
async def pg_engine(app): app['pg_engine'] = await create_engine( user='postgre', database='postgre', host='localhost', port=5432, password='' ) yield app['pg_engine'].close() await app['pg_engine'].wait_closed() app.cleanup_ctx.append(pg_engine)
屬性是列表生成器,代碼以前 yield
是(稱爲上初始化階段的啓動),碼 以後yield
被上執行清理。生成器必須只有一個yield
。
aiohttp保證當且僅當啓動代碼成功完成時才調用 清理代碼。
Python 3.6+支持異步生成器,在Python 3.5上請使用async_generator 庫。
版本3.1中的新功能。
七、後臺任務
有時須要在應用程序啓動後執行一些異步操做。
更重要的是,在一些複雜的系統中,可能須要在事件循環中運行一些後臺任務以及應用程序的請求處理程序。例如,監聽消息隊列或其餘網絡消息/事件源(例如,ZeroMQ,Redis Pub / Sub,AMQP等)以對應用程序內的接收消息做出反應。
例如,後臺任務能夠在zmq.SUB
套接字上偵聽ZeroMQ ,處理並將檢索到的消息轉發到經過WebSocket鏈接的客戶端,這些客戶端存儲在應用程序中的某個位置(例如,在application['websockets']
列表中)。
爲了運行這種短時間和長期運行的後臺任務,aiohttp提供了註冊Application.on_startup
將與應用程序的請求處理程序一塊兒運行的信號處理程序的能力。
例如,須要運行一個快速任務和兩個長時間運行的任務,這些任務將一直存在,直到應用程序處於活動狀態。相應的後臺任務能夠註冊爲Application.on_startup
信號處理程序,以下例所示:
async def listen_to_redis(app): try: sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop) ch, *_ = await sub.subscribe('news') async for msg in ch.iter(encoding='utf-8'): # Forward message to all connected websockets: for ws in app['websockets']: ws.send_str('{}: {}'.format(ch.name, msg)) except asyncio.CancelledError: pass finally: await sub.unsubscribe(ch.name) await sub.quit() async def start_background_tasks(app): app['redis_listener'] = app.loop.create_task(listen_to_redis(app)) async def cleanup_background_tasks(app): app['redis_listener'].cancel() await app['redis_listener'] app = web.Application() app.on_startup.append(start_background_tasks) app.on_cleanup.append(cleanup_background_tasks) web.run_app(app)
任務listen_to_redis()
將永遠運行。要正確關閉它,Application.on_cleanup
信號處理程序可用於向其發送取消。
八、處理異常錯誤
https://aiohttp-demos.readthedocs.io/en/latest/tutorial.html#aiohttp-demos-polls-middlewares
九、request中獲取add_routes中url
async def post(self, request): url = [resource._path for resource in request.app.router._resources] print(url) # ['/', '/6773', '/', '/1234/'] Response = web.Response(text='post') return Response