REST framework 源碼分析-認證 鑑權 限流

rest framework 重寫dispatch()方法源碼分析

  1. 根據CBV的源碼運行流程,仍是執行dispatch()方法,只是rest framework插件 重寫dispatch() 方法
  • rest_framework/views.py/APIView.dispatch()
  • 對傳入的request參數進行擴展
def dispatch(self, request, *args, **kwargs):
    """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """
    """ 擴展了django原生的dispatch()方法 """
    self.args = args
    self.kwargs = kwargs
    # 擴展reqeust
    request = self.initialize_request(request, *args, **kwargs)
    self.request = request
    self.headers = self.default_response_headers  # deprecate?
    
    # 這裏和原生的dispatch()基本同樣
    # 重寫了initial()方法
    try:
        self.initial(request, *args, **kwargs)

        # Get the appropriate handler method
        if request.method.lower() in self.http_method_names:
            handler = getattr(self, request.method.lower(),
                              self.http_method_not_allowed)
        else:
            handler = self.http_method_not_allowed

        response = handler(request, *args, **kwargs)

    except Exception as exc:
        response = self.handle_exception(exc)
        
    self.response = self.finalize_response(request, response, *args, **kwargs)
    return self.response
複製代碼
  1. 經過initialize_request()方法擴展request,從新新封裝了原生request、authenticators(認證對象列表) 等成員
def initialize_request(self, request, *args, **kwargs):
    """ Returns the initial request object. """
    parser_context = self.get_parser_context(request)
    
    # 增長了authenticators等成員
    return Request(
        request,
        parsers=self.get_parsers(),
        authenticators=self.get_authenticators(),
        negotiator=self.get_content_negotiator(),
        parser_context=parser_context
    )
複製代碼
  1. get_authenticators()方法找到setting中指定的認證類,或者在View中重寫的認證類
def get_authenticators(self):
    """ Instantiates and returns the list of authenticators that this view can use. """
    # 列表生成式,生成了相應的類的對象的列表:[對象,對象,...]
    return [auth() for auth in self.authentication_classes]
        
class APIView(View):

    # The following policies may be set at either globally, or per-view.
    # 下面的策略 可能在setting中設置,或者重寫在每一個View中
    ...
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
    permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
    ...
複製代碼
  1. 回到dispatch()方法,在從新擴展封裝了新的request以後,使用了 initial() 方法,初始化。在初始化的方法中,執行了認證、鑑權、限流這三個方法。
def initial(self, request, *args, **kwargs):
        """ Runs anything that needs to occur prior to calling the method handler. """
        ...

        # Ensure that the incoming request is permitted
        # 執行了認證、鑑權、限流這三個方法
        self.perform_authentication(request)
        self.check_permissions(request)
        self.check_throttles(request)
複製代碼

rest framework 認證 源碼分析

源碼分析

接上回,restframework重寫的dispatch()方法中,執行了inital()函數。其中perform_authentication(request) 方法實現了請求的認證功能。redis

  1. perform_authentication()函數中執行了Request類(rest_framework.reqeust.py中定義的類)的對象request(新封裝的request)的user方法
def perform_authentication(self, request):
    """ Perform authentication on the incoming request. Note that if you override this and simply 'pass', then authentication will instead be performed lazily, the first time either `request.user` or `request.auth` is accessed. """
    # 執行新封裝的request對象的的user方法(是個property因此不用user() )
    request.user
複製代碼
  1. 通過一些判斷以後,跳轉到 _authenticate() 方法
@property
def user(self):
    """ Returns the user associated with the current request, as authenticated by the authentication classes provided to the request. """
    if not hasattr(self, '_user'):
        with wrap_attributeerrors():
            # 通過一些判斷以後,跳轉到_authenticate()方法
            self._authenticate()
    return self._user
複製代碼
  1. 在Request類的_authenticate()方法中,執行 authenticators(認證對象列表) 中的每個認證對象的 authenticate() 方法。
def _authenticate(self):
    """ Attempt to authenticate the request using each authentication instance in turn. """
    """ 嘗試去執行每個認證對象實例中的authenticate()方法,返回認證結果 """
    # 獲取每個認證對象實例
    for authenticator in self.authenticators:
        try:
            # 使用認證對象的authenticate()方法
            user_auth_tuple = authenticator.authenticate(self)
        except exceptions.APIException:
            # 若是認證對象沒有寫authenticate()方法,拋出異常_not_authenticated()
            self._not_authenticated()
            raise
        
        # 若是寫了authenticate()方法,而且執行後返回的不是None
        # 則給request對象實例生成3個成員 self._authenticator, self.user, self.auth
        if user_auth_tuple is not None:
            self._authenticator = authenticator
            self.user, self.auth = user_auth_tuple
            return
        
        # 若是執行authenticate()方法以後返回的是None
        # 則繼續循環,執行對象列表中下一個 認證對象的方法,直到最後一個對象

    self._not_authenticated()
複製代碼
  • authenticate()的返回值應該是(should be)一個元祖,元祖的值 (self.force_user, self.force_token)
  • 返回的值是最終經過認證的用戶和token,這些會做爲成員變量賦值給request,能夠在view中調用self.user, self.auth
def authenticate(self, request):
    return (self.force_user, self.force_token)
複製代碼

使用示例

  1. 自定義一個認證類,繼承BaseAuthentication類
    • 若是這是認證類列表調用的最後一個類,則認證成功:返回 (user,token)
    • 若是認證列表後續還有別的認證類須要疊加認證,則認證成功:返回None,交由下一個類處理
    • 認證失敗,拋出異常
  • 自定義認證類
from rest_framework.authentication import BaseAuthentication,exceptions

class MyAuthentication(BaseAuthentication):
    
    # 必需要重寫authenticate()、authenticate_header()方法
    def authenticate(self, request):
        # 重新的request中若是找不到成員,會繼續從原生_request中查找
        token = request.GET.get('token')
        # 檢查token,實際應該與數據庫中token比對,這裏簡寫
        if token and token=='123456abc':
            # 返回元祖(user, token)
            return ('clay', token)
        else:
            # 若是沒有token或者token錯誤,拋出認證失敗的異常
            raise exceptions.AuthenticationFailed('用戶認證失敗')
            
    def authenticate_header(self, request):
        pass
複製代碼
  • 在View中重寫authentication_classes列表,調用認證類
class AssetsView(APIView):
    # 重點看這一行,重寫authentication_classes列表,調用認證類
    # 自定義的類MyAuthentication此時是最後一個認證類,因此必須返回(user, token)
    authentication_classes = [MyAuthentication, ]

    def get(self,request,*args,**kwargs):
        """ 返回帶code、msg的assets json數據 :param requset: :param args: :param kwargs: :return: """

        # 提取數據庫中assets信息, 整合成字典(先使用土法1)
        asset_list = []

        for asset in models.Assets.objects.all():
            asset_dict = {
                'id': asset.id,
                'name': asset.name,
            }
            asset_list.append(asset_dict)

        ret = [
                  {
                      'code': 1000,
                      'msg': 'SUCCESS: get assets info summary',
                      # 調用認證成功返回的元祖
                      'user': request.user,
                      'token':  request.auth,
                  }
              ] + asset_list
        # 返回數據庫中所有assets概覽
        # 返回帶狀態碼,成功響應默認就是200,能夠修改
        return HttpResponse(json.dumps(ret), status=200)
複製代碼
  • 想要調用認證類,也能夠再全局的setting.py文件中設置認證類的路徑
  • 建議建立一個專門的文件夾來存放這些類
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ['restframework_class.auth.MyAuthentication', 'restframework_class.auth.MyAuthentication2'] 
}
複製代碼

rest framework 鑑權 源碼分析

源碼分析

restframework重寫的dispatch()方法中,執行了inital()函數。inital()中check_permissions(request) 方法實現了請求的鑑權、權限控制功能。數據庫

  1. check_permissions(request)函數中,循環了權限類的對象列表,依次執行權限對象的 has_permission() 方法
    • 若是有權限,則繼續循環使用下一個權限對象再來檢查request
    • 若是沒有權限,執行permission_denied()函數,拋出異常(經過權限對象的message變量來自定義異常信息)
def check_permissions(self, request):
    """ Check if the request should be permitted. Raises an appropriate exception if the request is not permitted. 檢查請求是否被權限限制。 若是請求沒有權限,拋出對應的異常 """
    for permission in self.get_permissions():
        if not permission.has_permission(request, self):
            self.permission_denied(
                request, message=getattr(permission, 'message', None)
            )
複製代碼
  • permission_classes能夠在全局的setting中設置,也能夠再每一個類中重寫
def get_permissions(self):
    """ Instantiates and returns the list of permissions that this view requires. 列表生產式,來生成權限類的列表 """
    return [permission() for permission in self.permission_classes]
複製代碼
  • 若是沒有權限,執行permission_denied()函數,拋出異常,經過權限對象的message變量來自定義異常信息,message是權限對象中的變量。
def permission_denied(self, request, message=None):
    """ If request is not permitted, determine what kind of exception to raise. """
    if request.authenticators and not request.successful_authenticator:
        raise exceptions.NotAuthenticated()
    # 拋出PermissionDenied(detail=message)異常,出入權限對象的message變量
    raise exceptions.PermissionDenied(detail=message)
複製代碼

使用示例

  1. 自定義的權限類繼 承BasePermission
    • 在重寫 has_permission() 方法時,須要引入 request, view 參數,能夠根據 request, view 來作權限控制
    • 在重寫 has_object_permission() 方法時, 須要引入 request, view, obj 參數,能夠根據 request, view, obj 來作權限控制
class BasePermission(object):
    """ A base class from which all permission classes should inherit. """

    def has_permission(self, request, view):
        """ Return `True` if permission is granted, `False` otherwise. """
        return True

    def has_object_permission(self, request, view, obj):
        """ Return `True` if permission is granted, `False` otherwise. """
        return True
複製代碼
  • 自定義權限類,示例:
from rest_framework.permissions import BasePermission

class NetPermission(BasePermission):
    message = "必須是網絡工程是才能訪問!"
    def has_permission(self, request, view):
        if request.user.role != 0:
            return False
        return True
複製代碼
  1. 在View中重寫permission_classes權限類列表,列表中幾個權限類表示要通過幾層權限檢查(從左至右依次檢查)
class AssetsView(APIView):
    authentication_classes = [MyAuthentication, ]
    # 關注這一行
    # 在View中重寫permission_classes權限類列表,幾個類表示要通過幾層權限檢查
    permission_classes = [NetPermission, ]

    def get(self,request,*args,**kwargs):
        ...
        return HttpResponse(json.dumps(ret), status=200)
複製代碼
  • 或者在全局setting.py中指定權限類的路徑,對全站的View進行默認的權限檢查
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ['plugin.restframework.authentication.MyAuthentication', 'plugin.restframework.authentication.MyAuthentication2'] ,
    "DEFAULT_PERMISSION_CLASSES": ['restframework.permissions.MyPermission', ] 
}
複製代碼

細化的權限控制(針對view ,針對obj)

rest framework 限流 源碼分析

源碼分析

restframework重寫的dispatch()方法中,執行了inital()函數。inital()中check_throttles((request) 方法實現了請求的訪問頻率控制功能。django

  1. check_throttles(request)函數中,循環了限流類的對象列表,依次執行限流對象的 allow_request() 方法
def check_throttles(self, request):
    """ Check if request should be throttled. Raises an appropriate exception if the request is throttled. 檢查請求是否應該被節流。 若是被節流,則拋出相應異常。 """
    # 遍歷 限流對象列表,若是返回Fasle,則被限流,拋出異常(可傳參數throttle.wait()的返回值)
    for throttle in self.get_throttles():
        # 若是被節流,返回False,則拋出相應異常
        if not throttle.allow_request(request, self):
            self.throttled(request, throttle.wait())
複製代碼
  • 限流對象列表生成式
def get_throttles(self):
    """ Instantiates and returns the list of throttles that this view uses. """
    # 能夠在view中重寫throttle_classes,指定限流對象列表
    # 也能夠在setting.py中定義
    return [throttle() for throttle in self.throttle_classes]
複製代碼
  • 若是被節流,則經過throttled()方法,拋出相應異常,可傳入wait做爲等待時間的參數
def throttled(self, request, wait):
    """ If request is throttled, determine what kind of exception to raise. """
    # wait參數,出入的值是 節流類的wait()方法的返回值(單位:秒)
    raise exceptions.Throttled(wait)
複製代碼

使用示例

  1. 自定義限流類,繼承BaseThrottle類,重寫 allow_request()wait() 這兩個方法
from rest_framework.throttling import BaseThrottle

import time

# 訪問記錄,key是IP,value是訪問時間的列表。應該記錄在redis緩存中,這裏簡單寫在內存裏。
REQ_RECORD = {}

class MyThrottle(BaseThrottle):
    """ 自定義限流類:10秒內只能訪問3次,超過就限流 返回True,容許請求訪問 返回False,禁止請求訪問 """
    
    
    # 經過 self.history這個對象的成員變量,
    # 在allow_request()和 wait()這兩個成員方法之間傳遞history的值
    def __init__(self):
        self.history = None

    def allow_request(self, request, view):
        remote_addr = request.META.get('REMOTE_ADDR')
        timer = time.time()
        if remote_addr not in REQ_RECORD:
            REQ_RECORD[remote_addr]=[timer]
            return True

        history = REQ_RECORD[remote_addr]
        self.history = history
        # 若是歷史訪問時間列表的最老的訪問時間 在10秒以前,
        # 將最老的訪問時間 從歷史訪問時間列表 中移除
        while history and history[-1] < timer - 10:
            history.pop()

        # 10秒內的訪問記錄,是否超過3次
        # 若是沒有超過,則記錄此次訪問,並返回True,容許訪問
        # 若是超過,則返回False,禁止訪問
        if len(history) < 3:
            history.insert(0, timer)
            return True
        return False

    def wait(self):
        timer = time.time()
        return 10 - (timer - self.history[-1])
    
複製代碼
  1. 在View中調用限流類
class AssetsView(APIView):
    authentication_classes = [MyAuthentication, ]
    permission_classes = [NetPermission, ]
    # 關注這一行
    # 在View中重寫throttle_classes限流類列表,通常只寫一個限流,
    # 或者不限流,使列表爲空,throttle_classes = []
    throttle_classes = [MyThrottle, ]

    def get(self,request,*args,**kwargs):
        ...
        return HttpResponse(json.dumps(ret), status=200)
複製代碼
  • 或者在setting.py中指定全站默認使用的限流類的路徑
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ['plugin.restframework.authentication.MyAuthentication', 'plugin.restframework.authentication.MyAuthentication2'],
    "DEFAULT_PERMISSION_CLASSES": ['restframework.permissions.MyPermission', ],
    "DEFAULT_THROTTLE_CLASSES": ['plugin.restframework.throttling.MyThrottle']
}
複製代碼

內置的限流類

與認證類、鑑權類一般繼承BaseXXX類,高度自定義不一樣,限流類我我的以爲繼承restframework提供的其餘內置的類更方便json

  • 例如繼承 SimpleRateThrottle
class ZklTrottle(SimpleRateThrottle):
    # 設定規定時間內能訪問的次數,例如 3/m, 1/s, 1000/h, 9999/day
    # 一般設定在setting.py中
    THROTTLE_RATES = {
        "Zkl": '5/m'
    }
    # 指定scope值爲 查找THROTTLE_RATES的key
    scope = "Zkl"
    # 定義標識一個用戶的參數 
    # get_ident(request)是BaseThrottle類中的方法,返回remote_addr,
    # 即便用訪問源IP做爲一個用戶的標識
    def get_cache_key(self, request, view):
        return self.get_ident(request)
        
        # return request.user.pk # 一般也使用requser.user做爲標識一個用戶的ID
複製代碼
相關文章
相關標籤/搜索