Python開發【模塊】:aiohttp(一)

AIOHTTP

用於asyncio和Python的異步HTTP客戶端/服務器html

 

主要特色:python

  • 支持客戶端和HTTP服務器。
  • 支持服務器WebSockets和 客戶端WebSockets開箱即用,沒有回調地獄。
  • Web服務器具備中間件, 信號和可插拔路由。

 

 入門

客戶端:web

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())  

服務端:redis

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])

web.run_app(app)

  

 Web服務

 一、post、get接收參數數據數據庫

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web


class Application(object):
    '''
    aiohttp 接口
    '''

    def __init__(self):
        pass

    async def prepare_init(self):
        '''
        預加載
        :return:
        '''
        self.hearders_setting = [
            web.post('/', self.post),
            web.get('/', self.get),
        ]

    async def get(self,request):
        print(request.app['db'])
        arguments = request.query
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        return web.Response(text='get')

    async def post(self,request):
        arguments = await request.post()
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        # 獲取ip地址
        print(request.transport.get_extra_info('peername'))
        # ('127.0.0.1', 53245)
        print(request.path)
        # /
        print(request.raw_path)
        return web.Response(text='post')

    async def app_factory(self):
        '''
        配置app
        :return:
        '''
        await self.prepare_init()
        app = web.Application()
        app.add_routes(self.hearders_setting)
        app['db'] = 'db'
        return app

    def run_forever(self):
        '''
        開啓端口
        :return:
        '''
        web.run_app(self.app_factory())


Application().run_forever()
1 import requests
2 
3 
4 requests.post(url='http://127.0.0.1:8080/',data={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
5 requests.get(url='http://127.0.0.1:8080/',params={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
請求腳本

 

二、web處理程序取消json

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    await something()
    return web.Response(text='post')

注:當客戶端請求過程當中,中斷請求,此時handle out 並不會打印執行,直接取消運行;服務器

官方:await 若是客戶端斷開鏈接而不讀取整個響應的BODY,則能夠取消每一個 Web處理程序執行。這種行爲與經典的WSGI框架(如Flask和Django)大相徑庭websocket

有時它是一種理想的行爲:在處理GET請求時,代碼可能從數據庫或其餘Web資源獲取數據,提取可能很慢。網絡

  取消此很是好:對等已經斷開鏈接,沒有理由經過從DB獲取數據而沒有任何機會將其發送回對等來浪費時間和資源(內存等)。session

  但有時取消很糟糕:根據POST要求,一般須要將數據保存到數據庫,而無論對等關閉。

取消預防能夠經過如下幾種方式實施:

  • 應用於asyncio.shield()將數據保存到DB的協同程序
  • 產生DB保存的新任務
  • 使用aiojobs或其餘第三方庫

①  shield

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    await asyncio.shield(something())
    return web.Response(text='post')

asyncio.shield()工做得很好。惟一的缺點是你須要將Web處理程序分紅兩個異步函數:一個用於處理程序自己,另外一個用於受保護的代碼。

async def handler(request):
    await asyncio.shield(write_to_redis(request))
    await asyncio.shield(write_to_postgres(request))
    return web.Response(text='OK')

在REDIS中保存數據後可能會發生取消, write_to_postgres不會被調用

② 建立任務

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    request.loop.create_task(something())
    return web.Response(text='post')

產生一項新任務的狀況要糟糕得多:沒有地方能夠await 產生任務 ;request.loop.create_task(something()) 這個沒有辦法awit獲取值,若是加上awit,則致使中斷時,something中止工做

 

三、數據共享

aiohttp.web不鼓勵使用全局變量,每一個變量都應該有本身的非全局上下文。

因此,ApplicationRequest 支持一個collections.abc.MutableMapping接口(即它們是相似dict的對象),容許它們用做數據存儲。

① 應用程序的配置

要存儲相似全局變量,請隨意將它們保存在 Application實例中:

app['my_private_key'] = data

並在Web處理程序中獲取它

async def handler(request):
    data = request.app['my_private_key']

嵌套應用程序狀況下,所需的查找策略可能以下:

  1. 搜索當前嵌套應用程序中的鍵。
  2. 若是未找到密鑰,請繼續在父應用程序中進行搜索。

爲此,請使用Request.config_dict只讀屬性:

async def handler(request):
    data = request.config_dict['my_private_key']

② 請求的存儲

Variables that are only needed for the lifetime of a Request, can be stored in a Request:

async def handler(request):
  request['my_private_key'] = "data"
  ...

這對於中間件和 信號處理程序來講很是有用,能夠存儲數據以供鏈中的下一個處理程序進一步處理。

 ③ 響應的存儲

StreamResponseResponse對象也支持collections.abc.MutableMapping接口。當您但願在處理程序中的全部工做完成後與信號和中間件共享數據時,這很是有用:

async def handler(request):
  [ do all the work ]
  response['my_metric'] = 123
  return response

 

四、中間件

aiohttp.web提供了一種經過中間件自定義請求處理程序的強大機制 。

一箇中間件是能夠修改請求或響應中的協程。例如,這是一個附加 到響應的簡單中間件:'wink'

from aiohttp.web import middleware

@middleware
async def middleware(request, handler):
    resp = await handler(request)
    resp.text = resp.text + ' wink'
    return resp

注意:該示例不適用於流式響應或websockets

每一箇中間件都應該接受兩個參數,一個request實例和一個處理程序,並返回響應或引起異常。若是異常不是HTTPException它的實例,則500 HTTPInternalServerError在處理中間件鏈以後將 其轉換爲。

警告:第二個參數應該徹底命名爲handler

建立時Application,這些中間件將傳遞給僅限關鍵字的middlewares參數:

app = web.Application(middlewares=[middleware_1,
                                   middleware_2])

在內部,經過以相反的順序將中間件鏈應用於原始處理程序來構造單個請求處理程序,並由RequestHandler做爲常規處理程序調用。

因爲中間件自己就是協程,所以await在建立新的處理程序時可能會執行額外的 調用,例如調用數據庫等。

中間件一般會調用處理程序,可是他們可能會選擇忽略它,例如,若是用戶沒有訪問底層資源的權限,則顯示403 Forbidden頁面或引起HTTPForbidden異常。它們還可能呈現處理程序引起的錯誤,執行一些預處理或後處理,如處理CORS等。

如下代碼演示了中間件的執行順序:

from aiohttp import web

async def test(request):
    print('Handler function called')
    return web.Response(text="Hello")

@web.middleware
async def middleware1(request, handler):
    print('Middleware 1 called')
    response = await handler(request)
    print('Middleware 1 finished')
    return response

@web.middleware
async def middleware2(request, handler):
    print('Middleware 2 called')
    response = await handler(request)
    print('Middleware 2 finished')
    return response


app = web.Application(middlewares=[middleware1, middleware2])
app.router.add_get('/', test)
web.run_app(app) 

輸出

Middleware 1 called
Middleware 2 called
Handler function called
Middleware 2 finished
Middleware 1 finished

中間件的常見用途是實現自定義錯誤頁面。如下示例將使用JSON響應呈現404錯誤,由於可能適合JSON REST服務:

from aiohttp import web

@web.middleware
async def error_middleware(request, handler):
    try:
        response = await handler(request)
        if response.status != 404:
            return response
        message = response.message
    except web.HTTPException as ex:
        if ex.status != 404:
            raise
        message = ex.reason
    return web.json_response({'error': message})

app = web.Application(middlewares=[error_middleware])

中間件工廠

一箇中間件工廠是建立與傳遞參數的中間件功能。例如,這是一個簡單的中間件工廠:

def middleware_factory(text):
    @middleware
    async def sample_middleware(request, handler):
        resp = await handler(request)
        resp.text = resp.text + text
        return resp
    return sample_middleware

請記住,與常規中間件相反,您須要中間件工廠的結果而不是功能自己。所以,當將中間件工廠傳遞給應用程序時,您實際須要調用它:

app = web.Application(middlewares=[middleware_factory(' wink')])

  

五、信號

雖然middleware能夠在準備響應以前或以後定製請求處理程序,但在準備響應時不能定製響應。For this aiohttp.web provides signals.

例如,中間件只能爲未準備好的 響應更改HTTP標頭(請參閱參考資料StreamResponse.prepare()),但有時咱們須要一個鉤子來更改流式響應和WebSockets的HTTP標頭。這能夠經過訂閱Application.on_response_prepare信號來完成 :

async def on_prepare(request, response):
    response.headers['My-Header'] = 'value'

app.on_response_prepare.append(on_prepare)

此外,能夠訂閱Application.on_startup和 Application.on_cleanup信號以進行應用程序組件設置並相應地拆除。

如下示例將正確初始化並配置aiopg鏈接引擎:

from aiopg.sa import create_engine

async def create_aiopg(app):
    app['pg_engine'] = await create_engine(
        user='postgre',
        database='postgre',
        host='localhost',
        port=5432,
        password=''
    )

async def dispose_aiopg(app):
    app['pg_engine'].close()
    await app['pg_engine'].wait_closed()

app.on_startup.append(create_aiopg)
app.on_cleanup.append(dispose_aiopg)

信號處理程序不該返回值,但能夠修改傳入的可變參數。

信號處理程序將按順序運行,以便添加它們。aiohttp 3.0開始,全部處理程序必須是異步的

 

六、清理上下文

Application.on_startupApplication.on_cleanup 對仍有陷阱:信號處理程序彼此獨立。

E.g. we have [create_pg, create_redis] in startup signal and [dispose_pg,dispose_redis] in cleanup.

If, for example, create_pg(app) call fails create_redis(app) is not called. But on application cleanup both dispose_pg(app) and dispose_redis(app) are still called: 

清理信號不知道啓動/清理對及其執行狀態。

解決方案是Application.cleanup_ctx用法:

async def pg_engine(app):
    app['pg_engine'] = await create_engine(
        user='postgre',
        database='postgre',
        host='localhost',
        port=5432,
        password=''
    )
    yield
    app['pg_engine'].close()
    await app['pg_engine'].wait_closed()

app.cleanup_ctx.append(pg_engine)

屬性是列表生成器,代碼以前 yield是(稱爲上初始化階段的啓動),碼 以後yield被上執行清理。生成器必須只有一個yield

aiohttp保證當且僅當啓動代碼成功完成時才調用 清理代碼。

Python 3.6+支持異步生成器,在Python 3.5上請使用async_generator 庫。

版本3.1中的新功能。

 

 七、後臺任務

有時須要在應用程序啓動後執行一些異步操做。

更重要的是,在一些複雜的系統中,可能須要在事件循環中運行一些後臺任務以及應用程序的請求處理程序。例如,監聽消息隊列或其餘網絡消息/事件源(例如,ZeroMQ,Redis Pub / Sub,AMQP等)以對應用程序內的接收消息做出反應。

例如,後臺任務能夠在zmq.SUB套接字上偵聽ZeroMQ ,處理並將檢索到的消息轉發到經過WebSocket鏈接的客戶端,這些客戶端存儲在應用程序中的某個位置(例如,在application['websockets']列表中)。

爲了運行這種短時間和長期運行的後臺任務,aiohttp提供了註冊Application.on_startup將與應用程序的請求處理程序一塊兒運行的信號處理程序的能力。

例如,須要運行一個快速任務和兩個長時間運行的任務,這些任務將一直存在,直到應用程序處於活動狀態。相應的後臺任務能夠註冊爲Application.on_startup 信號處理程序,以下例所示:

async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        await sub.unsubscribe(ch.name)
        await sub.quit()


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    app['redis_listener'].cancel()
    await app['redis_listener']


app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)

任務listen_to_redis()將永遠運行。要正確關閉它,Application.on_cleanup信號處理程序可用於向其發送取消。

 

八、處理異常錯誤

https://aiohttp-demos.readthedocs.io/en/latest/tutorial.html#aiohttp-demos-polls-middlewares

 

九、request中獲取add_routes中url

    async def post(self, request):
        url = [resource._path for resource in request.app.router._resources]
        print(url)
        # ['/', '/6773', '/', '/1234/']
        Response = web.Response(text='post')
        return Response
相關文章
相關標籤/搜索