深刻flask中的request

緣起

  在使用flask的時候一直比較納悶request是什麼原理,他是如何保證多線程狀況下對不一樣請求參數的隔離的。json

準備知識

  在講request以前首先須要先理解一下werkzeug.local中的幾個類,由於request就是基於這幾個類來搞事情的。flask

# -*- coding: utf-8 -*-
import copy
from werkzeug._compat import PY2, implements_bool

# since each thread has its own greenlet we can just use those as identifiers
# for the context.  If greenlets are not available we fall back to the
# current thread ident depending on where it is.
try:
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident


def release_local(local):
    """Releases the contents of the local for the current context.
    This makes it possible to use locals without a manager.

    Example::

        >>> loc = Local()
        >>> loc.foo = 42
        >>> release_local(loc)
        >>> hasattr(loc, 'foo')
        False

    With this function one can release :class:`Local` objects as well
    as :class:`LocalStack` objects.  However it is not possible to
    release data held by proxies that way, one always has to retain
    a reference to the underlying local object in order to be able
    to release it.

    .. versionadded:: 0.6.1
    """
    local.__release_local__()


class Local(object):

    """
    用一個大字典實現局部上下文
        不一樣的線程或者greenlet調用該local對象時獲取的值都是本線程或者greenlet獨享的
        實際上就是爲每個線程或者協程在字典裏單獨開闢出了一個空間(實際上就是一個鍵值對,鍵就是線程或者greenlet的惟一標識),
        這空間用來存儲單個線程(或者greenlet)的私有變量

    """
    __slots__ = ('__storage__', '__ident_func__')

    def __init__(self):
        object.__setattr__(self, '__storage__', {})
        object.__setattr__(self, '__ident_func__', get_ident)

    def __iter__(self):
        return iter(self.__storage__.items())

    def __call__(self, proxy):
        """Create a proxy for a name."""
        return LocalProxy(self, proxy)

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)


class LocalStack(object):

    """
    LocalStack也是一個棧相關的局部上下文,底層實現是基於Local類。
        注意一下他的pop方法,若是當前棧的長度爲1,pop時會清空當前線程(greenlet)在底層的Local中所對應的"鍵值對"的
    """

    def __init__(self):
        self._local = Local()

    def __release_local__(self):
        self._local.__release_local__()

    def _get__ident_func__(self):
        return self._local.__ident_func__

    def _set__ident_func__(self, value):
        object.__setattr__(self._local, '__ident_func__', value)
    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
    del _get__ident_func__, _set__ident_func__

    def __call__(self):
        def _lookup():
            rv = self.top
            if rv is None:
                raise RuntimeError('object unbound')
            return rv
        return LocalProxy(_lookup)

    def push(self, obj):
        """Pushes a new item to the stack"""
        rv = getattr(self._local, 'stack', None)
        if rv is None:
            self._local.stack = rv = []
        rv.append(obj)
        return rv

    def pop(self):
        """Removes the topmost item from the stack, will return the
        old value or `None` if the stack was already empty.
        """
        stack = getattr(self._local, 'stack', None)
        if stack is None:
            return None
        elif len(stack) == 1:
            release_local(self._local)
            return stack[-1]
        else:
            return stack.pop()

    @property
    def top(self):
        """The topmost item on the stack.  If the stack is empty,
        `None` is returned.
        """
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None


@implements_bool
class LocalProxy(object):
    """
    代理模式: 給目標對象提供一個代理對象,並由代理對象控制對目標對象的引用
    """

    """Acts as a proxy for a werkzeug local.  Forwards all operations to
    a proxied object.  The only operations not supported for forwarding
    are right handed operands and any kind of assignment.

    Example usage::

        from werkzeug.local import Local
        l = Local()

        # these are proxies
        request = l('request')
        user = l('user')


        from werkzeug.local import LocalStack
        _response_local = LocalStack()

        # this is a proxy
        response = _response_local()

    Whenever something is bound to l.user / l.request the proxy objects
    will forward all operations.  If no object is bound a :exc:`RuntimeError`
    will be raised.

    To create proxies to :class:`Local` or :class:`LocalStack` objects,
    call the object as shown above.  If you want to have a proxy to an
    object looked up by a function, you can (as of Werkzeug 0.6.1) pass
    a function to the :class:`LocalProxy` constructor::

        session = LocalProxy(lambda: get_current_request().session)

    .. versionchanged:: 0.6.1
       The class can be instantiated with a callable as well now.
    """
    __slots__ = ('__local', '__dict__', '__name__', '__wrapped__')

    def __init__(self, local, name=None):
        # __local 會被重命名爲 _LocalProxy__local
        object.__setattr__(self, '_LocalProxy__local', local)
        object.__setattr__(self, '__name__', name)
        if callable(local) and not hasattr(local, '__release_local__'):
            # "local" is a callable that is not an instance of Local or
            # LocalManager: mark it as a wrapped function.
            object.__setattr__(self, '__wrapped__', local)

    def _get_current_object(self):
        """Return the current object.  This is useful if you want the real
        object behind the proxy at a time for performance reasons or because
        you want to pass the object into a different context.
        """
        if not hasattr(self.__local, '__release_local__'):
            return self.__local()
        try:
            return getattr(self.__local, self.__name__)
        except AttributeError:
            raise RuntimeError('no object bound to %s' % self.__name__)

    @property
    def __dict__(self):
        try:
            return self._get_current_object().__dict__
        except RuntimeError:
            raise AttributeError('__dict__')

    def __repr__(self):
        try:
            obj = self._get_current_object()
        except RuntimeError:
            return '<%s unbound>' % self.__class__.__name__
        return repr(obj)

    def __bool__(self):
        try:
            return bool(self._get_current_object())
        except RuntimeError:
            return False

    def __unicode__(self):
        try:
            return unicode(self._get_current_object())  # noqa
        except RuntimeError:
            return repr(self)

    def __dir__(self):
        try:
            return dir(self._get_current_object())
        except RuntimeError:
            return []

    def __getattr__(self, name):
        if name == '__members__':
            return dir(self._get_current_object())
        return getattr(self._get_current_object(), name)

    def __setitem__(self, key, value):
        self._get_current_object()[key] = value

    def __delitem__(self, key):
        del self._get_current_object()[key]

    if PY2:
        __getslice__ = lambda x, i, j: x._get_current_object()[i:j]

        def __setslice__(self, i, j, seq):
            self._get_current_object()[i:j] = seq

        def __delslice__(self, i, j):
            del self._get_current_object()[i:j]

    __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
    __delattr__ = lambda x, n: delattr(x._get_current_object(), n)
    __str__ = lambda x: str(x._get_current_object())
    __lt__ = lambda x, o: x._get_current_object() < o
    __le__ = lambda x, o: x._get_current_object() <= o
    __eq__ = lambda x, o: x._get_current_object() == o
    __ne__ = lambda x, o: x._get_current_object() != o
    __gt__ = lambda x, o: x._get_current_object() > o
    __ge__ = lambda x, o: x._get_current_object() >= o
    __cmp__ = lambda x, o: cmp(x._get_current_object(), o)  # noqa
    __hash__ = lambda x: hash(x._get_current_object())
    __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
    __len__ = lambda x: len(x._get_current_object())
    __getitem__ = lambda x, i: x._get_current_object()[i]
    __iter__ = lambda x: iter(x._get_current_object())
    __contains__ = lambda x, i: i in x._get_current_object()
    __add__ = lambda x, o: x._get_current_object() + o
    __sub__ = lambda x, o: x._get_current_object() - o
    __mul__ = lambda x, o: x._get_current_object() * o
    __floordiv__ = lambda x, o: x._get_current_object() // o
    __mod__ = lambda x, o: x._get_current_object() % o
    __divmod__ = lambda x, o: x._get_current_object().__divmod__(o)
    __pow__ = lambda x, o: x._get_current_object() ** o
    __lshift__ = lambda x, o: x._get_current_object() << o
    __rshift__ = lambda x, o: x._get_current_object() >> o
    __and__ = lambda x, o: x._get_current_object() & o
    __xor__ = lambda x, o: x._get_current_object() ^ o
    __or__ = lambda x, o: x._get_current_object() | o
    __div__ = lambda x, o: x._get_current_object().__div__(o)
    __truediv__ = lambda x, o: x._get_current_object().__truediv__(o)
    __neg__ = lambda x: -(x._get_current_object())
    __pos__ = lambda x: +(x._get_current_object())
    __abs__ = lambda x: abs(x._get_current_object())
    __invert__ = lambda x: ~(x._get_current_object())
    __complex__ = lambda x: complex(x._get_current_object())
    __int__ = lambda x: int(x._get_current_object())
    __long__ = lambda x: long(x._get_current_object())  # noqa
    __float__ = lambda x: float(x._get_current_object())
    __oct__ = lambda x: oct(x._get_current_object())
    __hex__ = lambda x: hex(x._get_current_object())
    __index__ = lambda x: x._get_current_object().__index__()
    __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o)
    __enter__ = lambda x: x._get_current_object().__enter__()
    __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
    __radd__ = lambda x, o: o + x._get_current_object()
    __rsub__ = lambda x, o: o - x._get_current_object()
    __rmul__ = lambda x, o: o * x._get_current_object()
    __rdiv__ = lambda x, o: o / x._get_current_object()
    if PY2:
        __rtruediv__ = lambda x, o: x._get_current_object().__rtruediv__(o)
    else:
        __rtruediv__ = __rdiv__
    __rfloordiv__ = lambda x, o: o // x._get_current_object()
    __rmod__ = lambda x, o: o % x._get_current_object()
    __rdivmod__ = lambda x, o: x._get_current_object().__rdivmod__(o)
    __copy__ = lambda x: copy.copy(x._get_current_object())
    __deepcopy__ = lambda x, memo: copy.deepcopy(x._get_current_object(), memo)
werkzeug.local部分源碼

先來說Local對象

1 建立一個Local對象api

local = Local()

剛建立後, 這個local_context中負責存儲局部上下文變量的storage是一個空字典restful

local.__storage__ = {}

咱們用iden1, inde2 .... indeN 來表示n個同屬於一個進程的線程(或者greenlet), 假如當前的線程(或者greenlet)的id爲iden1, 咱們來操做一下localsession

local.name = "iden1_name"

實際執行的代碼是:多線程

local.__storage__.setdefault("iden1", {})["name"] = "iden1_name"

這個local中負責存儲局部上下文變量的storage就變成了這樣:app

local.__storage__ = {
    "iden1": {
        "name": "iden1_name"
    }
}

當咱們在不一樣的線程(或者greenlet)中操做後,local就可能會變成這樣ide

local.__storage__ = {
    "iden1": {...},
    "iden2": {...},
    ...
    "idenN": {...}
}

local對象有一個__release_local__方法, 執行該方法會清理掉當前線程(或者greenlet)對應的存儲空間, 假如當前的線程(或者greenlet)的id爲iden1,
當咱們執行完__release_local__方法後, local的存儲空間就會變成這樣: 函數

# 已經沒有iden1了  
local.__storage__ = {
    "iden2": {...},
    ...
    "idenN": {...}
}

local還定義了__call__方法, 當咱們執行local()後會返回一個LocalStack對象post

 

LocalStack對象

LocalStack底層使用的是Local,而後在Local實例中實現了一個棧

建立一個LocalStack對象

local_stack = LocalStack()

該對象的local屬性就是一個Local實例

isinstance(local_stack.local, Local) is True

local_stack的棧存儲在他的local屬性中, 當咱們調用local_stack.push(some_obj)的時候, 其實是執行了

local_stack.local.stack.append(some_obj) if hasattr(local_stack.local, "stack") else local_stack.local.stack = [some_obj]

假如當前的線程(或者greenlet)的id爲iden1, 咱們push一個對象request_ctx_obj, 而後又push一個對象request_ctx_obj2, 那麼local_stack.local就會是這樣:

local_stack.local.__storage__ = {
    "iden1": {
        "stack": [request_ctx_obj, request_ctx_obj2]
    }
}

假如當前的線程(或者greenlet)的id爲iden1,咱們在調用local_stack.top()方法時,實際上執行的是:

return local_stack.local.stack[-1]

須要注意的是:

  若是咱們當前所處的線程(或者greenlet)中以前並無進行過push的話,那麼咱們調用local_stack.top()方法返回的結果是None

當咱們執行local_stack.pop()時, 實際上執行的是

local_stack.local.stack.pop()

須要注意兩點:
  1 若是當前線程(或者greenlet)中以前沒有push過, 那麼pop()方法會返回None
  2 若是當前線程(或者greenlet)中的棧中只有一個對象, 那麼本次pop()還會清理掉stack(實際上執行了local_stack.local.__release_local__方法),
  假如當前的線程(或者greenlet)的id爲iden2的話,沒有pop()以前是這樣的:

local_stack.local.__storage__ = {
    "iden1": {
        "stack": [request_ctx_obj, request_ctx_obj2]
    }
    "iden2": {
        "stack": [request_ctx_obj3]
    }
}

執行pop()則會將當前線程(或者greenlet)的局部上下文存儲空間清理掉, 變爲這樣:

local_stack.local.__storage__ = {
    "iden1": {
        "stack": [request_ctx_obj, request_ctx_obj2]
    }
}

LocalStack也提供了__call__方法, 執行該方法會生成一個LocalProxy對象

 

LocalProxy

LocalProxy實現了代理模式, 給目標對象提供一個代理對象,並由代理對象控制對目標對象的引用。
根據傳入參數的類型以及數量的不一樣他會有兩種表現形式.
第一種,第一個參數傳入一個Local實例, 而後第二個參數傳入想要代理的對象的名稱:
  咱們執行下面的語句:

local_1 = Local()
local_proxy1 = LocalProxy(local_1, "age")

  local_proxy1所代理的實際上就是local_1實例中的age屬性了。
  假如當前的線程(或者greenlet)的id爲iden1,那麼咱們執行local_proxy1 = 12, 實際執行的就是local_1.age = 12

第二種,只傳入一個函數,經過該函數能夠獲取到想要代理的對象:
  咱們執行下面的語句:

local_2 = Local()

def _find_raw_obj():
    return local_2.name

local_proxy2 = LocalProxy(_find_raw_obj)

  local_proxy2所代理的實際上就是local_2實例中的name屬性

 

flask源碼剖析

request源碼

def _lookup_req_object(name):
    top = _request_ctx_stack.top
    if top is None:
        raise RuntimeError(_request_ctx_err_msg)
    return getattr(top, name)


# context locals
_request_ctx_stack = LocalStack()
request = LocalProxy(partial(_lookup_req_object, 'request'))

只要看懂了文章上半部分的local,這裏實際上很簡單,

  _request_ctx_stack是一個LocalStack實例,而咱們每次調用request.some_attr 的時候其實是執行_request_ctx_stack.top.some_attr

再來看一下當請求過來的時候,flask是如何處理的:

# flask.app
class Flask(_PackageBoundObject):

    request_class = Request

    def request_context(self, environ):
        return RequestContext(self, environ)

    def wsgi_app(self, environ, start_response):
        # self.request_context是一個RequestContext實例
        ctx = self.request_context(environ)
        error = None
        try:
            try:
# 2 執行了request_ctx_stack.push(ctx) ctx.push()
# 3 處理請求獲得響應 response
= self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None
# 4 request_ctx_stack.pop(ctx) ctx.auto_pop(error)
# 1 當請求來的時候會執行app.__call__()方法
def __call__(self, environ, start_response): """The WSGI server calls the Flask application object as the WSGI application. This calls :meth:`wsgi_app` which can be wrapped to applying middleware.""" return self.wsgi_app(environ, start_response) # flask.ctx class RequestContext(object): def __init__(self, app, environ, request=None): self.app = app if request is None:
       # request是一個Request對象 request
= app.request_class(environ) self.request = request self._implicit_app_ctx_stack = [] def push(self): top = _request_ctx_stack.top if top is not None and top.preserved: top.pop(top._preserved_exc) # Before we push the request context we have to ensure that there # is an application context. app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else: self._implicit_app_ctx_stack.append(None) if hasattr(sys, 'exc_clear'): sys.exc_clear()
# 2.1 這裏是重點 _request_ctx_stack.push(self)
def pop(self, exc=_sentinel): app_ctx = self._implicit_app_ctx_stack.pop() try: if not self._implicit_app_ctx_stack: self.preserved = False self._preserved_exc = None if exc is _sentinel: exc = sys.exc_info()[1] self.app.do_teardown_request(exc) request_close = getattr(self.request, 'close', None) if request_close is not None: request_close() finally: rv = _request_ctx_stack.pop()# Get rid of the app as well if necessary. if app_ctx is not None: app_ctx.pop(exc) assert rv is self, 'Popped wrong request context. ' \ '(%r instead of %r)' % (rv, self)

 當請求過來時:

  1 將請求封裝爲一個RequestContext實例

  2 而後將請求的environ封裝成Request對象

  3 執行_request_ctx_stack.push(RequestContext實例)

  4 處理請求獲得響應

  5 執行_request_ctx_stack.pop()

  6 返回結果

 

看到這裏,大致原理咱們也就懂了。

 

來點深刻的高級用法

需求

工做中使用到flask flask-restful,有這樣的場景:

  1 首先是遵循restful

  2 我但願全部接口有統一的參數傳遞格式,相似於這樣:    

    timestamp: int                     # 以秒爲單位 
    token:  str                           # 這個就不用說了
    data: str                          # base64.encode(json.dumps(原始參數數據))
    signature: md5(data + string(timestamp) + key)        # 簽名,注:key爲加密密鑰

假如是登錄接口,客戶端須要給我傳遞手機號以及驗證碼,我但願格式是json,因此原始參數數據大概是這樣:

{"mobile": "12377776666", "code": "1234"}

  3 我但願全部接口有統一的響應格式,相似於這樣:

{
    "code": 0, # 這成功。 1 -9999 錯誤
    "message": "", 
    "data": {
     "mobile": "sdfasdf",
     "code": ""
    }
}

  4 我但願請求參數數據通過統一的參數檢測以後, request的args(若是請求方式爲get) 或者form(若是請求方式爲post) 或者values屬性變爲原始參數數據.這樣就能夠正常使用RequestParser()

實現

如下是本人的實現源碼(生產環境測試無問題,但有幾處須要改進的地方):

import time
import json
import base64
import traceback
import logging
from hashlib import md5
from flask import request
from functools import wraps
from flask_restful import reqparse
from flask.globals import _request_ctx_stack
from werkzeug.exceptions import HTTPException
from werkzeug.datastructures import ImmutableMultiDict, CombinedMultiDict, MultiDict


logger = logging.getLogger("api")


def get_user_info(token):
    """根據token獲取用戶信息,這個本身實現吧"""
    pass


class ServerRequestParser(reqparse.RequestParser):

    def parse_args(self, req=None, strict=False):
        try:
            # print(request.values)
            return super().parse_args(req, strict)
        except HTTPException as e:
            raise ServerException(3, str(e.data))


class ServerException(Exception):
    code_msg = {
        -1: "未知錯誤",
        -2: "非法請求",
        1: "缺乏token",
        2: "非法token",
        3: "參數錯誤: %s"
    }

    def __init__(self, code, *args):
        self.code = code
        self.args = args

    def to_json(self):
        return {
            "code": self.code,
            "message": self.code_msg[self.code] % self.args,
            "data": {}
        }


class ServerRequest(object):
    """
    該類主要是配合uniform_verification_wechat方法,對flask.wrappers.Request對象作封裝後進行替換
    """

    def __init__(self, json_data):
        self._raw_request = request._get_current_object()
        self.args = ImmutableMultiDict()
        self.form = ImmutableMultiDict()
        self.json = {}
        if request.method.lower() == "get":
            self.args = json_data
        elif request.content_type == "application/json":
            self.json = json_data
        else:
            self.form = json_data
        setattr(_request_ctx_stack._local.stack[-1], "request", self)

    def __getattr__(self, item):
        return getattr(self._raw_request, item)

    @property
    def values(self):
        args = []
        for d in self.args, self.form:
            if not isinstance(d, MultiDict):
                d = MultiDict(d)
            args.append(d)
        return CombinedMultiDict(args)


uniform_wechat_parser = reqparse.RequestParser()
uniform_wechat_parser.add_argument("timestamp", type=int, required=True)
uniform_wechat_parser.add_argument("signature", required=True)
uniform_wechat_parser.add_argument("data", required=True)
uniform_wechat_parser.add_argument("token", required=True)


def uniform_verification_wechat(check_token=True):

    def wrapper(func):
        current_time = time.time()

        @wraps(func)
        def inner(*args, **kwargs):
            request_data_dict = uniform_wechat_parser.parse_args()
            request_log_info = "url: %s, method: %s, request_data: %s" % (
                request.url, request.method, json.dumps(request_data_dict).encode())
            signature = request_data_dict["signature"]
            timestamp = request_data_dict["timestamp"]
            data = request_data_dict["data"]
            token = request_data_dict.get("token", None)
            try:
                if current_time - timestamp >= 120:
                    raise ServerException(-2)
                _strings = "%s%s%s" % (data, timestamp, "密鑰key")
                if signature != md5(_strings.encode()).hexdigest():
                    raise ServerException(-2)
                try:
                    data = json.loads(base64.b64decode(data.encode()).decode())
                except Exception:
                    raise ServerException(-2)
                request_log_info = "url: %s, method: %s, request_data: %s" % (
                    request.url, request.method, json.dumps(data).encode())
                user_info = {}
                if check_token:
                    if not token:
                        raise ServerException(1)
                    user_info = get_user_info(token)
                    if user_info is None:
                        raise ServerException(2)
                    logger.info("checking token... %s" % user_info)

                request.user_info = user_info
                ServerRequest(data)
                _result = func(*args, **kwargs)
                result = {
                    "code": 0,
                    "message": "",
                    "data": _result
                }
            except ServerException as e:
                result = e.to_json()
            except Exception as e:
                if hasattr(e, "to_json"):
                    result = e.to_json()
                else:
                    logger.info(traceback.print_exc())
                    result = ServerException(-1).to_json()
            response_log_info = result
            logger.info("%s, %s" % (request_log_info, json.dumps(response_log_info).encode()))
            return result

        return inner

    return wrapper


from flask import Flask
from flask_restful import Resource, Api

app = Flask(__name__)
api = Api(app)


class Hello(Resource):
    hello_parser = ServerRequestParser()
    hello_parser.add_argument("name", required=True)

    @uniform_verification_wechat(False)
    def get(self):
        args = self.hello_parser.parse_args()
        return {'hello': args["name"]}


api.add_resource(Hello, '/')

if __name__ == "__main__":
    app.run(debug=True)
統一參數解析處理

 

參考:

  https://blog.tonyseek.com/post/the-context-mechanism-of-flask/

  flask源碼

  flask-restful源碼

相關文章
相關標籤/搜索