在使用flask的時候一直比較納悶request是什麼原理,他是如何保證多線程狀況下對不一樣請求參數的隔離的。json
在講request以前首先須要先理解一下werkzeug.local中的幾個類,由於request就是基於這幾個類來搞事情的。flask
# -*- coding: utf-8 -*- import copy from werkzeug._compat import PY2, implements_bool # since each thread has its own greenlet we can just use those as identifiers # for the context. If greenlets are not available we fall back to the # current thread ident depending on where it is. try: from greenlet import getcurrent as get_ident except ImportError: try: from thread import get_ident except ImportError: from _thread import get_ident def release_local(local): """Releases the contents of the local for the current context. This makes it possible to use locals without a manager. Example:: >>> loc = Local() >>> loc.foo = 42 >>> release_local(loc) >>> hasattr(loc, 'foo') False With this function one can release :class:`Local` objects as well as :class:`LocalStack` objects. However it is not possible to release data held by proxies that way, one always has to retain a reference to the underlying local object in order to be able to release it. .. versionadded:: 0.6.1 """ local.__release_local__() class Local(object): """ 用一個大字典實現局部上下文 不一樣的線程或者greenlet調用該local對象時獲取的值都是本線程或者greenlet獨享的 實際上就是爲每個線程或者協程在字典裏單獨開闢出了一個空間(實際上就是一個鍵值對,鍵就是線程或者greenlet的惟一標識), 這空間用來存儲單個線程(或者greenlet)的私有變量 """ __slots__ = ('__storage__', '__ident_func__') def __init__(self): object.__setattr__(self, '__storage__', {}) object.__setattr__(self, '__ident_func__', get_ident) def __iter__(self): return iter(self.__storage__.items()) def __call__(self, proxy): """Create a proxy for a name.""" return LocalProxy(self, proxy) def __release_local__(self): self.__storage__.pop(self.__ident_func__(), None) def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__(self, name): try: del self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) class LocalStack(object): """ LocalStack也是一個棧相關的局部上下文,底層實現是基於Local類。 注意一下他的pop方法,若是當前棧的長度爲1,pop時會清空當前線程(greenlet)在底層的Local中所對應的"鍵值對"的 """ def __init__(self): self._local = Local() def __release_local__(self): self._local.__release_local__() def _get__ident_func__(self): return self._local.__ident_func__ def _set__ident_func__(self, value): object.__setattr__(self._local, '__ident_func__', value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) del _get__ident_func__, _set__ident_func__ def __call__(self): def _lookup(): rv = self.top if rv is None: raise RuntimeError('object unbound') return rv return LocalProxy(_lookup) def push(self, obj): """Pushes a new item to the stack""" rv = getattr(self._local, 'stack', None) if rv is None: self._local.stack = rv = [] rv.append(obj) return rv def pop(self): """Removes the topmost item from the stack, will return the old value or `None` if the stack was already empty. """ stack = getattr(self._local, 'stack', None) if stack is None: return None elif len(stack) == 1: release_local(self._local) return stack[-1] else: return stack.pop() @property def top(self): """The topmost item on the stack. If the stack is empty, `None` is returned. """ try: return self._local.stack[-1] except (AttributeError, IndexError): return None @implements_bool class LocalProxy(object): """ 代理模式: 給目標對象提供一個代理對象,並由代理對象控制對目標對象的引用 """ """Acts as a proxy for a werkzeug local. Forwards all operations to a proxied object. The only operations not supported for forwarding are right handed operands and any kind of assignment. Example usage:: from werkzeug.local import Local l = Local() # these are proxies request = l('request') user = l('user') from werkzeug.local import LocalStack _response_local = LocalStack() # this is a proxy response = _response_local() Whenever something is bound to l.user / l.request the proxy objects will forward all operations. If no object is bound a :exc:`RuntimeError` will be raised. To create proxies to :class:`Local` or :class:`LocalStack` objects, call the object as shown above. If you want to have a proxy to an object looked up by a function, you can (as of Werkzeug 0.6.1) pass a function to the :class:`LocalProxy` constructor:: session = LocalProxy(lambda: get_current_request().session) .. versionchanged:: 0.6.1 The class can be instantiated with a callable as well now. """ __slots__ = ('__local', '__dict__', '__name__', '__wrapped__') def __init__(self, local, name=None): # __local 會被重命名爲 _LocalProxy__local object.__setattr__(self, '_LocalProxy__local', local) object.__setattr__(self, '__name__', name) if callable(local) and not hasattr(local, '__release_local__'): # "local" is a callable that is not an instance of Local or # LocalManager: mark it as a wrapped function. object.__setattr__(self, '__wrapped__', local) def _get_current_object(self): """Return the current object. This is useful if you want the real object behind the proxy at a time for performance reasons or because you want to pass the object into a different context. """ if not hasattr(self.__local, '__release_local__'): return self.__local() try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError('no object bound to %s' % self.__name__) @property def __dict__(self): try: return self._get_current_object().__dict__ except RuntimeError: raise AttributeError('__dict__') def __repr__(self): try: obj = self._get_current_object() except RuntimeError: return '<%s unbound>' % self.__class__.__name__ return repr(obj) def __bool__(self): try: return bool(self._get_current_object()) except RuntimeError: return False def __unicode__(self): try: return unicode(self._get_current_object()) # noqa except RuntimeError: return repr(self) def __dir__(self): try: return dir(self._get_current_object()) except RuntimeError: return [] def __getattr__(self, name): if name == '__members__': return dir(self._get_current_object()) return getattr(self._get_current_object(), name) def __setitem__(self, key, value): self._get_current_object()[key] = value def __delitem__(self, key): del self._get_current_object()[key] if PY2: __getslice__ = lambda x, i, j: x._get_current_object()[i:j] def __setslice__(self, i, j, seq): self._get_current_object()[i:j] = seq def __delslice__(self, i, j): del self._get_current_object()[i:j] __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v) __delattr__ = lambda x, n: delattr(x._get_current_object(), n) __str__ = lambda x: str(x._get_current_object()) __lt__ = lambda x, o: x._get_current_object() < o __le__ = lambda x, o: x._get_current_object() <= o __eq__ = lambda x, o: x._get_current_object() == o __ne__ = lambda x, o: x._get_current_object() != o __gt__ = lambda x, o: x._get_current_object() > o __ge__ = lambda x, o: x._get_current_object() >= o __cmp__ = lambda x, o: cmp(x._get_current_object(), o) # noqa __hash__ = lambda x: hash(x._get_current_object()) __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw) __len__ = lambda x: len(x._get_current_object()) __getitem__ = lambda x, i: x._get_current_object()[i] __iter__ = lambda x: iter(x._get_current_object()) __contains__ = lambda x, i: i in x._get_current_object() __add__ = lambda x, o: x._get_current_object() + o __sub__ = lambda x, o: x._get_current_object() - o __mul__ = lambda x, o: x._get_current_object() * o __floordiv__ = lambda x, o: x._get_current_object() // o __mod__ = lambda x, o: x._get_current_object() % o __divmod__ = lambda x, o: x._get_current_object().__divmod__(o) __pow__ = lambda x, o: x._get_current_object() ** o __lshift__ = lambda x, o: x._get_current_object() << o __rshift__ = lambda x, o: x._get_current_object() >> o __and__ = lambda x, o: x._get_current_object() & o __xor__ = lambda x, o: x._get_current_object() ^ o __or__ = lambda x, o: x._get_current_object() | o __div__ = lambda x, o: x._get_current_object().__div__(o) __truediv__ = lambda x, o: x._get_current_object().__truediv__(o) __neg__ = lambda x: -(x._get_current_object()) __pos__ = lambda x: +(x._get_current_object()) __abs__ = lambda x: abs(x._get_current_object()) __invert__ = lambda x: ~(x._get_current_object()) __complex__ = lambda x: complex(x._get_current_object()) __int__ = lambda x: int(x._get_current_object()) __long__ = lambda x: long(x._get_current_object()) # noqa __float__ = lambda x: float(x._get_current_object()) __oct__ = lambda x: oct(x._get_current_object()) __hex__ = lambda x: hex(x._get_current_object()) __index__ = lambda x: x._get_current_object().__index__() __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o) __enter__ = lambda x: x._get_current_object().__enter__() __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw) __radd__ = lambda x, o: o + x._get_current_object() __rsub__ = lambda x, o: o - x._get_current_object() __rmul__ = lambda x, o: o * x._get_current_object() __rdiv__ = lambda x, o: o / x._get_current_object() if PY2: __rtruediv__ = lambda x, o: x._get_current_object().__rtruediv__(o) else: __rtruediv__ = __rdiv__ __rfloordiv__ = lambda x, o: o // x._get_current_object() __rmod__ = lambda x, o: o % x._get_current_object() __rdivmod__ = lambda x, o: x._get_current_object().__rdivmod__(o) __copy__ = lambda x: copy.copy(x._get_current_object()) __deepcopy__ = lambda x, memo: copy.deepcopy(x._get_current_object(), memo)
1 建立一個Local對象api
local = Local()
剛建立後, 這個local_context中負責存儲局部上下文變量的storage是一個空字典restful
local.__storage__ = {}
咱們用iden1, inde2 .... indeN 來表示n個同屬於一個進程的線程(或者greenlet), 假如當前的線程(或者greenlet)的id爲iden1, 咱們來操做一下localsession
local.name = "iden1_name"
實際執行的代碼是:多線程
local.__storage__.setdefault("iden1", {})["name"] = "iden1_name"
這個local中負責存儲局部上下文變量的storage就變成了這樣:app
local.__storage__ = { "iden1": { "name": "iden1_name" } }
當咱們在不一樣的線程(或者greenlet)中操做後,local就可能會變成這樣ide
local.__storage__ = { "iden1": {...}, "iden2": {...}, ... "idenN": {...} }
local對象有一個__release_local__方法, 執行該方法會清理掉當前線程(或者greenlet)對應的存儲空間, 假如當前的線程(或者greenlet)的id爲iden1,
當咱們執行完__release_local__方法後, local的存儲空間就會變成這樣: 函數
# 已經沒有iden1了 local.__storage__ = { "iden2": {...}, ... "idenN": {...} }
local還定義了__call__方法, 當咱們執行local()後會返回一個LocalStack對象post
LocalStack底層使用的是Local,而後在Local實例中實現了一個棧
建立一個LocalStack對象
local_stack = LocalStack()
該對象的local屬性就是一個Local實例
isinstance(local_stack.local, Local) is True
local_stack的棧存儲在他的local屬性中, 當咱們調用local_stack.push(some_obj)的時候, 其實是執行了
local_stack.local.stack.append(some_obj) if hasattr(local_stack.local, "stack") else local_stack.local.stack = [some_obj]
假如當前的線程(或者greenlet)的id爲iden1, 咱們push一個對象request_ctx_obj, 而後又push一個對象request_ctx_obj2, 那麼local_stack.local就會是這樣:
local_stack.local.__storage__ = { "iden1": { "stack": [request_ctx_obj, request_ctx_obj2] } }
假如當前的線程(或者greenlet)的id爲iden1,咱們在調用local_stack.top()方法時,實際上執行的是:
return local_stack.local.stack[-1]
須要注意的是:
若是咱們當前所處的線程(或者greenlet)中以前並無進行過push的話,那麼咱們調用local_stack.top()方法返回的結果是None
當咱們執行local_stack.pop()時, 實際上執行的是
local_stack.local.stack.pop()
須要注意兩點:
1 若是當前線程(或者greenlet)中以前沒有push過, 那麼pop()方法會返回None
2 若是當前線程(或者greenlet)中的棧中只有一個對象, 那麼本次pop()還會清理掉stack(實際上執行了local_stack.local.__release_local__方法),
假如當前的線程(或者greenlet)的id爲iden2的話,沒有pop()以前是這樣的:
local_stack.local.__storage__ = { "iden1": { "stack": [request_ctx_obj, request_ctx_obj2] } "iden2": { "stack": [request_ctx_obj3] } }
執行pop()則會將當前線程(或者greenlet)的局部上下文存儲空間清理掉, 變爲這樣:
local_stack.local.__storage__ = { "iden1": { "stack": [request_ctx_obj, request_ctx_obj2] } }
LocalStack也提供了__call__方法, 執行該方法會生成一個LocalProxy對象
LocalProxy實現了代理模式, 給目標對象提供一個代理對象,並由代理對象控制對目標對象的引用。
根據傳入參數的類型以及數量的不一樣他會有兩種表現形式.
第一種,第一個參數傳入一個Local實例, 而後第二個參數傳入想要代理的對象的名稱:
咱們執行下面的語句:
local_1 = Local() local_proxy1 = LocalProxy(local_1, "age")
local_proxy1所代理的實際上就是local_1實例中的age屬性了。
假如當前的線程(或者greenlet)的id爲iden1,那麼咱們執行local_proxy1 = 12, 實際執行的就是local_1.age = 12
第二種,只傳入一個函數,經過該函數能夠獲取到想要代理的對象:
咱們執行下面的語句:
local_2 = Local() def _find_raw_obj(): return local_2.name local_proxy2 = LocalProxy(_find_raw_obj)
local_proxy2所代理的實際上就是local_2實例中的name屬性
request源碼
def _lookup_req_object(name): top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) return getattr(top, name) # context locals _request_ctx_stack = LocalStack() request = LocalProxy(partial(_lookup_req_object, 'request'))
只要看懂了文章上半部分的local,這裏實際上很簡單,
_request_ctx_stack是一個LocalStack實例,而咱們每次調用request.some_attr 的時候其實是執行_request_ctx_stack.top.some_attr
再來看一下當請求過來的時候,flask是如何處理的:
# flask.app class Flask(_PackageBoundObject): request_class = Request def request_context(self, environ): return RequestContext(self, environ) def wsgi_app(self, environ, start_response): # self.request_context是一個RequestContext實例 ctx = self.request_context(environ) error = None try: try:
# 2 執行了request_ctx_stack.push(ctx) ctx.push()
# 3 處理請求獲得響應 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None
# 4 request_ctx_stack.pop(ctx) ctx.auto_pop(error)
# 1 當請求來的時候會執行app.__call__()方法 def __call__(self, environ, start_response): """The WSGI server calls the Flask application object as the WSGI application. This calls :meth:`wsgi_app` which can be wrapped to applying middleware.""" return self.wsgi_app(environ, start_response) # flask.ctx class RequestContext(object): def __init__(self, app, environ, request=None): self.app = app if request is None:
# request是一個Request對象 request = app.request_class(environ) self.request = request self._implicit_app_ctx_stack = [] def push(self): top = _request_ctx_stack.top if top is not None and top.preserved: top.pop(top._preserved_exc) # Before we push the request context we have to ensure that there # is an application context. app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else: self._implicit_app_ctx_stack.append(None) if hasattr(sys, 'exc_clear'): sys.exc_clear()
# 2.1 這裏是重點 _request_ctx_stack.push(self) def pop(self, exc=_sentinel): app_ctx = self._implicit_app_ctx_stack.pop() try: if not self._implicit_app_ctx_stack: self.preserved = False self._preserved_exc = None if exc is _sentinel: exc = sys.exc_info()[1] self.app.do_teardown_request(exc) request_close = getattr(self.request, 'close', None) if request_close is not None: request_close() finally: rv = _request_ctx_stack.pop()# Get rid of the app as well if necessary. if app_ctx is not None: app_ctx.pop(exc) assert rv is self, 'Popped wrong request context. ' \ '(%r instead of %r)' % (rv, self)
當請求過來時:
1 將請求封裝爲一個RequestContext實例
2 而後將請求的environ封裝成Request對象
3 執行_request_ctx_stack.push(RequestContext實例)
4 處理請求獲得響應
5 執行_request_ctx_stack.pop()
6 返回結果
看到這裏,大致原理咱們也就懂了。
工做中使用到flask flask-restful,有這樣的場景:
1 首先是遵循restful
2 我但願全部接口有統一的參數傳遞格式,相似於這樣:
timestamp: int # 以秒爲單位
token: str # 這個就不用說了
data: str # base64.encode(json.dumps(原始參數數據))
signature: md5(data + string(timestamp) + key) # 簽名,注:key爲加密密鑰
假如是登錄接口,客戶端須要給我傳遞手機號以及驗證碼,我但願格式是json,因此原始參數數據大概是這樣:
{"mobile": "12377776666", "code": "1234"}
3 我但願全部接口有統一的響應格式,相似於這樣:
{ "code": 0, # 這成功。 1 -9999 錯誤 "message": "", "data": { "mobile": "sdfasdf", "code": "" } }
4 我但願請求參數數據通過統一的參數檢測以後, request的args(若是請求方式爲get) 或者form(若是請求方式爲post) 或者values屬性變爲原始參數數據.這樣就能夠正常使用RequestParser()
如下是本人的實現源碼(生產環境測試無問題,但有幾處須要改進的地方):
import time import json import base64 import traceback import logging from hashlib import md5 from flask import request from functools import wraps from flask_restful import reqparse from flask.globals import _request_ctx_stack from werkzeug.exceptions import HTTPException from werkzeug.datastructures import ImmutableMultiDict, CombinedMultiDict, MultiDict logger = logging.getLogger("api") def get_user_info(token): """根據token獲取用戶信息,這個本身實現吧""" pass class ServerRequestParser(reqparse.RequestParser): def parse_args(self, req=None, strict=False): try: # print(request.values) return super().parse_args(req, strict) except HTTPException as e: raise ServerException(3, str(e.data)) class ServerException(Exception): code_msg = { -1: "未知錯誤", -2: "非法請求", 1: "缺乏token", 2: "非法token", 3: "參數錯誤: %s" } def __init__(self, code, *args): self.code = code self.args = args def to_json(self): return { "code": self.code, "message": self.code_msg[self.code] % self.args, "data": {} } class ServerRequest(object): """ 該類主要是配合uniform_verification_wechat方法,對flask.wrappers.Request對象作封裝後進行替換 """ def __init__(self, json_data): self._raw_request = request._get_current_object() self.args = ImmutableMultiDict() self.form = ImmutableMultiDict() self.json = {} if request.method.lower() == "get": self.args = json_data elif request.content_type == "application/json": self.json = json_data else: self.form = json_data setattr(_request_ctx_stack._local.stack[-1], "request", self) def __getattr__(self, item): return getattr(self._raw_request, item) @property def values(self): args = [] for d in self.args, self.form: if not isinstance(d, MultiDict): d = MultiDict(d) args.append(d) return CombinedMultiDict(args) uniform_wechat_parser = reqparse.RequestParser() uniform_wechat_parser.add_argument("timestamp", type=int, required=True) uniform_wechat_parser.add_argument("signature", required=True) uniform_wechat_parser.add_argument("data", required=True) uniform_wechat_parser.add_argument("token", required=True) def uniform_verification_wechat(check_token=True): def wrapper(func): current_time = time.time() @wraps(func) def inner(*args, **kwargs): request_data_dict = uniform_wechat_parser.parse_args() request_log_info = "url: %s, method: %s, request_data: %s" % ( request.url, request.method, json.dumps(request_data_dict).encode()) signature = request_data_dict["signature"] timestamp = request_data_dict["timestamp"] data = request_data_dict["data"] token = request_data_dict.get("token", None) try: if current_time - timestamp >= 120: raise ServerException(-2) _strings = "%s%s%s" % (data, timestamp, "密鑰key") if signature != md5(_strings.encode()).hexdigest(): raise ServerException(-2) try: data = json.loads(base64.b64decode(data.encode()).decode()) except Exception: raise ServerException(-2) request_log_info = "url: %s, method: %s, request_data: %s" % ( request.url, request.method, json.dumps(data).encode()) user_info = {} if check_token: if not token: raise ServerException(1) user_info = get_user_info(token) if user_info is None: raise ServerException(2) logger.info("checking token... %s" % user_info) request.user_info = user_info ServerRequest(data) _result = func(*args, **kwargs) result = { "code": 0, "message": "", "data": _result } except ServerException as e: result = e.to_json() except Exception as e: if hasattr(e, "to_json"): result = e.to_json() else: logger.info(traceback.print_exc()) result = ServerException(-1).to_json() response_log_info = result logger.info("%s, %s" % (request_log_info, json.dumps(response_log_info).encode())) return result return inner return wrapper from flask import Flask from flask_restful import Resource, Api app = Flask(__name__) api = Api(app) class Hello(Resource): hello_parser = ServerRequestParser() hello_parser.add_argument("name", required=True) @uniform_verification_wechat(False) def get(self): args = self.hello_parser.parse_args() return {'hello': args["name"]} api.add_resource(Hello, '/') if __name__ == "__main__": app.run(debug=True)
https://blog.tonyseek.com/post/the-context-mechanism-of-flask/
flask源碼
flask-restful源碼