python-django rest framework框架之dispatch方法源碼分析

1.Django的 CBV 中在請求到來以後,都要執行dispatch方法,dispatch方法根據請求方式不一樣觸發 get/post/put等方法前端

class APIView(View):
    def dispatch(self, request, *args, **kwargs):#1.1 把wsgi的request進行封裝
        request = self.initialize_request(request, *args, **kwargs)
        self.request = request     #此時的self.request 是rest_framework的Request對象,它裏面比wsgi的request多了一些東西

        try:
            #1.2 進行 初始化     :版本,認證,權限,訪問頻率
            self.initial(request, *args, **kwargs)
#1.3 反射執行get等方法 if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) #1.4 把返回的response 再進行封裝 self.response = self.finalize_response(request, response, *args, **kwargs) #1.5 返回 , dispatch方法必定要有返回值,由於get等方法返回的結果要返回給前端 return self.response

第1.1步:python

from rest_framework.request import Request

class APIView(View):
    def initialize_request(self, request, *args, **kwargs):
       #返回了一個rest_framework的Request對象
        return Request(
            request,
       #1.1.1 parsers
=self.get_parsers(),
       #1.1.2 authenticators
=self.get_authenticators(),
       #1.1.3 negotiator
=self.get_content_negotiator(), )

第1.1.1步:django

  passapi

第1.1.2步:緩存

class APIView(View):
    def get_authenticators(self):
     #self.authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES 是從配置文件中取數據,從變量名不難看出有多是不少類的列表
return [auth() for auth in self.authentication_classes]
     #self.authenticators = 一個多個對象的列表

第1.2步:app

class APIView(View):
    def initial(self, request, *args, **kwargs):
     #1.2.0版本相關
     version, scheme = self.determine_version(request, *args, **kwargs)
     request.version, request.versioning_scheme = version, scheme
      # request.version, request.versioning_scheme = 版本,和相應的版本類對象
      # 1.2.0.0
      # 既然 request.version 是版本, 那request.versioning_scheme 表明什麼呢?

#1.2.1認證相關 self.perform_authentication(request) #1.2.2權限相關 self.check_permissions(request) #1.2.3訪問頻率相關 self.check_throttles(request)

第1.2.0步:ide

獲取版本數據有五種接收方式:url上傳參,url,子域名,namespace,請求頭post

from rest_framework.versioning import QueryParameterVersioning,URLPathVersioning,HostNameVersioning  
   方式一:
    # 基於url傳參  http://127.0.0.1:8001/api/users/?version=v1
    # versioning_class = QueryParameterVersioning

    方式二:
    # 基於URL http://127.0.0.1:8001/api/v2/users/
    # versioning_class = URLPathVersioning
     # url 配置: url(r'^api/(?P<version>[v1|v2]+)/', include('api.urls')),
    方式三:
    # 基於子域名 http://v1.luffy.com/users/
    # versioning_class = HostNameVersioning
  
 配置文件:
  REST_FRAMEWORK = {
  'VERSION_PARAM':'version',
  'DEFAULT_VERSION':'v1',
  'ALLOWED_VERSIONS':['v1','v2'],
  # 'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.HostNameVersioning",
  'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.URLPathVersioning",
  # 'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.QueryParameterVersioning",
  }
 

 

class APIView(View):
    def determine_version(self, request, *args, **kwargs):
        if self.versioning_class is None:     # self.versioning_class = api_settings.DEFAULT_VERSIONING_CLASS
        #若是沒有配置 版本相關的類
            return (None, None)
        scheme = self.versioning_class()      # 實例化版本類對象
        #調用對象的determine_version 方法
        return (scheme.determine_version(request, *args, **kwargs), scheme)

方式一的 determine_version 方法:ui

class QueryParameterVersioning(BaseVersioning):
   invalid_version_message = _('Invalid version in query parameter.')
def determine_version(self, request, *args, **kwargs): # 至關於 request.GET.get() version = request.query_params.get(self.version_param, self.default_version) #self.version_param = api_settings.VERSION_PARAM #self.default_version = api_settings.DEFAULT_VERSION
     # 見下面
if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version

class BaseVersioning(object):
    def is_allowed_version(self, version):
     #若是沒有配置容許的版本列表,表明沒限制
        if not self.allowed_versions: #self.allowed_versions = api_settings.ALLOWED_VERSIONS
            return True
        return ((version is not None and version == self.default_version) or
                (version in self.allowed_versions))

方式二的 determine_version 方法:this

class URLPathVersioning(BaseVersioning):
    invalid_version_message = _('Invalid version in URL path.')
    
    def determine_version(self, request, *args, **kwargs):
        #從傳過來的字典中獲取版本
        version = kwargs.get(self.version_param, self.default_version)
        
        if not self.is_allowed_version(version):
            raise exceptions.NotFound(self.invalid_version_message)
        return version
        
        
class BaseVersioning(object):
    def is_allowed_version(self, version):
        if not self.allowed_versions:
            return True
        return ((version is not None and version == self.default_version) or
                (version in self.allowed_versions))

方式三的 determine_version 方法:

class HostNameVersioning(BaseVersioning):
    hostname_regex = re.compile(r'^([a-zA-Z0-9]+)\.[a-zA-Z0-9]+\.[a-zA-Z0-9]+$')
    invalid_version_message = _('Invalid version in hostname.')
    
    def determine_version(self, request, *args, **kwargs):
        hostname, separator, port = request.get_host().partition(':')
        match = self.hostname_regex.match(hostname)
        if not match:
            return self.default_version
        version = match.group(1)
        if not self.is_allowed_version(version):
            raise exceptions.NotFound(self.invalid_version_message)
        return version
        
class BaseVersioning(object):
    def is_allowed_version(self, version):
        if not self.allowed_versions:
            return True
        return ((version is not None and version == self.default_version) or
                (version in self.allowed_versions))

第1.2.0.0步:     request.versioning_scheme  是用來反向生成url 的

        urlpatterns = [
            url(r'^api/(?P<version>[v1|v2]+)/', include('api.urls')),
            url(r'^api/', include('api.urls')),
        ]
    
        urlpatterns = [
            url(r'^users/', views.UsersView.as_view(),name='u'),
        ]
        
        
        # 當前版本同樣的URL
        # url = request.versioning_scheme.reverse(viewname='u',request=request)    #不用傳版本的參數
        # print(url)

        # 當前版本不同的URL
        # from django.urls import reverse
        # url = reverse(viewname='u',kwargs={'version':'v2'})
        # print(url)

 方式一的 reverse方法

class QueryParameterVersioning(BaseVersioning):
    def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra):
        #調用基類的 reverse方法
        url = super(QueryParameterVersioning, self).reverse(
            viewname, args, kwargs, request, format, **extra
        )
        # 之因此在反向生成的時候不用傳版本參數, 是由於這步幫你處理了
        if request.version is not None:
            return replace_query_param(url, self.version_param, request.version)
        return url
        
class BaseVersioning(object):    
    def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra):
        return _reverse(viewname, args, kwargs, request, format, **extra)
        
def _reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra):

    if format is not None:
        kwargs = kwargs or {}
        kwargs['format'] = format
    # 最後仍是調用了 django的 reverse方法 生成url
    url = django_reverse(viewname, args=args, kwargs=kwargs, **extra)
    if request:
        return request.build_absolute_uri(url)
    return url

 方式二的 reverse方法

class URLPathVersioning(BaseVersioning):
    def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra):
        if request.version is not None:
            kwargs = {} if (kwargs is None) else kwargs
            #給 version賦值
            kwargs[self.version_param] = request.version

        #調用基類的reverse方法
        return super(URLPathVersioning, self).reverse(
            viewname, args, kwargs, request, format, **extra
        )
        
class BaseVersioning(object):
    def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra):
        return _reverse(viewname, args, kwargs, request, format, **extra)
        
        
def _reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra):

    if format is not None:
        kwargs = kwargs or {}
        kwargs['format'] = format
    #最後仍是調用了django的reverse方法生成url
    url = django_reverse(viewname, args=args, kwargs=kwargs, **extra)
    if request:
        return request.build_absolute_uri(url)
    return url

方式三的 reverse方法

方式三 HostNameVersioning 類中沒有 reverse方法,因此直接去基類中找
class BaseVersioning(object):
    def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra):
        return _reverse(viewname, args, kwargs, request, format, **extra)
         
def _reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra):
    if format is not None:
        kwargs = kwargs or {}
        kwargs['format'] = format
    #最後仍是調用了django的reverse方法生成url
    url = django_reverse(viewname, args=args, kwargs=kwargs, **extra)
    if request:
        return request.build_absolute_uri(url)
    return url

 

第1.2.1步: 認證相關

class APIView(View):
    def perform_authentication(self, request):
     #1.2.1.1
        request.user

第1.2.1.1步:

class Request(object):    
    @property
    def user(self):       #此時的self 是 rest_framework的request對象
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():
          #1.2.1.1.1 self._authenticate()
return self._user

1.2.1.1.1步:

class Request(object):    
    def _authenticate(self):
        #此時的self 是 rest_framework的request對象
for authenticator in self.authenticators: #self.authenticators = 一個多個對象的列表 try:
          #執行每一個對象的authenticate方法 user_auth_tuple
= authenticator.authenticate(self)   #從變量的名不難看出 返回了一個元組 except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator
          #賦值, request.user和request.auth 並返回 self.user, self.auth
= user_auth_tuple return self._not_authenticated()

第1.3步反射執行get等方法

 

咱們能夠自定義一個簡單的用戶認證

class MyAuth(object):
    def authenticate(self,request):
        return "1111","222"
    
class Host(APIView):
    authentication_classes=[MyAuth]
    def get(self,request):
        print(request.user)    #1111
        print(request.auth)   #222
        return HttpResponse("666")

認證

- 認證
                - 局部  : 只是一個類內的一些接口用
                    class MyAuthentication(BaseAuthentication):

                        def authenticate(self, request):
                 '''
                 有三種返回值: None 表示我無論,交給下一個進行認證, 元組表示認證成功, 拋出異常
                 '''
# return None ,我無論,交給下一個進行認證 token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) raise APIException('用戶認證失敗') #  認證失敗時 須要註冊restframework 進行友好的展現錯誤信息 class AuthView(APIView): authentication_classes=[MyAuthentication,]
               ....

- 全局 : 多個類都須要用到認證的時候 就須要在配置文件中配置了 REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ "app02.utils.MyAuthentication", ], } class HostView(APIView):
               #authentication_classes=[] #若是在類中寫了authentication_classes 等於一個空列表,那就表示 這個類內的接口不須要認證
def get(self,request,*args,**kwargs): return HttpResponse('主機列表')

         - 類的繼承:
          ********utils.py

          from rest_framework.authentication import BaseAuthentication
          from rest_framework import exceptions

          class LuffyTokenAuthentication(BaseAuthentication):
              keyword = 'Token'

              def authenticate(self, request):
                  """
                  Authenticate the request and return a two-tuple of (user, token).
                  """

                  token = request.query_params.get('token')
                  if not token:
                      raise exceptions.AuthenticationFailed('驗證失敗')
                  return self.authenticate_credentials(token)

              def authenticate_credentials(self, token):
                  from luffy.models import UserAuthToken
                  try:
                      token_obj = UserAuthToken.objects.select_related('user').get(token=token)
                  except Exception as e:
                      raise exceptions.AuthenticationFailed(_('Invalid token.'))

                  return (token_obj.user, token_obj)

         class AuthAPIView(object):
             authentication_classes = [LuffyTokenAuthentication,]
    
         *******views.py
    
          from utils import AuthAPIView

          class ShoppingCarView(AuthAPIView,APIView):   #注意繼承的順序

              def get(self,request,*args,**kwargs):
                  pass

 第1.2.2步: 權限相關

class APIView(View):
    def check_permissions(self, request):
     #1.2.2.1 permission是每個權限類的對象
for permission in self.get_permissions():
       #1.2.2.2
if not permission.has_permission(request, self): #我猜has_permission方法返回的值是 True/False,True表明有權限
          # 1.2.2.3 若是沒有權限執行  self.permission_denied( request, message
=getattr(permission, 'message', None) #根據這句話能夠發現,能夠在自定義類中寫 message='無權訪問' )

第1.2.2.1步

class APIView(View):
    def get_permissions(self):
        return [permission() for permission in self.permission_classes]

第1.2.2.2步: 咱們能夠在自定義類中寫這個方法,經過一些邏輯判斷後讓它返回True或False

第1.2.2.3步

class APIView(View):
    def permission_denied(self, request, message=None):
        # request.authenticators = 一個對象列表
        # 1.2.2.3.1  若是認證成功,不執行此步
        if request.authenticators and not request.successful_authenticator:
            raise exceptions.NotAuthenticated()   #拋出 未進行認證的異常,這裏能夠傳錯誤信息 detail='xxx'
     # 1.2.2.3.2 拋出異常
raise exceptions.PermissionDenied(detail=message)

第1.2.2.3.1步

class Request(object):        
    @property
    def successful_authenticator(self):
        #self._authenticator  是 最後的那個認證類的對象
        return self._authenticator

第1.2.2.3.2步

沒有init方法,執行父類的        
class PermissionDenied(APIException):
    default_detail = _('You do not have permission to perform this action.')
    
class APIException(Exception):
    default_detail = _('A server error occurred.')

    def __init__(self, detail=None, code=None):
        if detail is None:
            detail = self.default_detail
        
        self.detail = _get_error_details(detail, code)

    def __str__(self):
        return six.text_type(self.detail)

 

認證和權限聯合使用:

class MyAuthentication(BaseAuthentication):

    def authenticate(self, request):
        token = request.query_params.get('token')
        obj = models.UserInfo.objects.filter(token=token).first()
        if obj:
            return (obj.username,obj)
        return None

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        # return 'Basic realm="api"'
        pass

class MyPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user:
            return True
        return False

class AdminPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user == 'alex':
            return True
        return False

class HostView(APIView):
    """
    匿名用戶和用戶都能訪問
    """
    authentication_classes = [MyAuthentication,]
    permission_classes = []
    def get(self,request,*args,**kwargs):

        return Response('主機列表')

class UserView(APIView):
    """
    用戶能訪問
    """
    authentication_classes = [MyAuthentication, ]
    permission_classes = [MyPermission,]
    def get(self,request,*args,**kwargs):
        return Response('用戶列表')

class SalaryView(APIView):
    """
    管理員能訪問
    """
    authentication_classes = [MyAuthentication, ]
    permission_classes = [MyPermission,AdminPermission,]
    def get(self,request,*args,**kwargs):

        return Response('薪資列表')

    #自定義未認證的錯誤信息
    def permission_denied(self, request, message=None):
        if request.authenticators and not request.successful_authenticator:
            raise exceptions.NotAuthenticated(detail='xxxxxxxx')
        raise exceptions.PermissionDenied(detail=message)

 在全局內使用權限需配置:

                    REST_FRAMEWORK = {
                            "DEFAULT_PERMISSION_CLASSES": [
                                 "app02.utils.MyPermission",
                            ],
                    }

 

第1.2.3步: 訪問頻率相關  

class APIView(View):
    def check_throttles(self, request):
     #1.2.3.1
for throttle in self.get_throttles(): if not throttle.allow_request(request, self): #從這句代碼能夠看出,自定義的限流類 能夠 寫allow_request 方法,返回值應該是 True 表示通行或 False 表示限制
          # 1.2.3.2 限制的狀況下執行 self.throttled(request, throttle.wait()) #從這句代碼能夠看出,自定義的限流類 中 要寫 wait 方法,並且返回值必須是數字類型或者 None

第1.2.3.1步:

class APIView(View):
    def get_throttles(self):
        return [throttle() for throttle in self.throttle_classes]   #返回一個 限流 類的對象列表

第1.2.3.2步:

class APIView(View):
    def throttled(self, request, wait):
        raise exceptions.Throttled(wait)  #類的實例化  ,拋出異常是一個對象,那在打印的時候必定調用了 __str__方法
class Throttled(APIException):
    default_detail = _('Request was throttled.')
    extra_detail_plural = 'Expected available in {wait} seconds.'
    def __init__(self, wait=None, detail=None, code=None):
        if detail is None:
            detail = force_text(self.default_detail)
        if wait is not None:
            wait = math.ceil(wait)
            detail = ' '.join((              # 把 wait方法的返回值和 detail 放到了一塊兒,做爲新的參數 傳給了父類進行初始化
                detail,
                force_text(ungettext(self.extra_detail_singular.format(wait=wait),
                                     self.extra_detail_plural.format(wait=wait),
                                     wait))))
        self.wait = wait
        super(Throttled, self).__init__(detail, code)    #傳給了父類進行初始化
class APIException(Exception):
    default_detail = _('A server error occurred.')
    default_code = 'error'

    def __init__(self, detail=None, code=None):
        if detail is None:
            detail = self.default_detail
        if code is None:
            code = self.default_code
     #把 傳過來的 錯誤信息 detail 賦值給了 self.detail
        self.detail = _get_error_details(detail, code)   

    def __str__(self):
        return six.text_type(self.detail)   #打印錯誤信息

 

自定義的訪問頻率限制

class MyThrottle(BaseThrottle):
    def allow_request(self,request,view):
        return False
        
    def wait(self):
        return 22     #表示還需22秒才能訪問
    
class User(APIView):
    throttle_classes=[MyThrottle,]
    
    def get(self,request,*args,**kwargs):
        return Response('333333')

自定義一個對匿名用戶的限流

RECORD={}

class MyThrottle(BaseThrottle):
    
    def allow_request(self,request,view):
        """
        返回False,限制
        返回True,通行
        """
         a. 對匿名用戶進行限制:每一個用戶1分鐘容許訪問10次
            - 獲取用戶IP request 1.1.1
        """
        import time
        ctime = time.time()
        ip = "1.1.1"
        if ip not in RECORD:
            RECORD[ip] = [ctime,]
        else:
            # [4507862389234,3507862389234,2507862389234,1507862389234,]
            time_list = RECORD[ip]
            while True:
                val = time_list[-1]
                if (ctime-60) > val:
                    time_list.pop()
                else:
                    break
            if len(time_list) > 10:
                return False
            time_list.insert(0,ctime)
        return True
    def wait(self):
        import time
        ctime = time.time()
        first_in_time = RECORD["1.1.1"][-1]
        wt = 60 - (ctime - first_in_time)
        return wt
class User(APIView):

    throttle_classes=[MyThrottle]
    
    def get(self,request,*args,**kwargs):

        return Response('333333')
View Code

可是這樣寫以爲很麻煩,故有更加簡單的寫法:以下 繼承SimpleRateThrottle

class MySimpleRateThrottle(SimpleRateThrottle):
    scope = "wdp"

    def get_cache_key(self, request, view):
        return self.get_ident(request)
     # 能夠返回 None 表示 不限流
class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MySimpleRateThrottle,] def get(self,request,*args,**kwargs): return Response('控制訪問頻率示例') def throttled(self, request, wait): #自定義錯誤信息 class MyThrottled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_plural = '還須要再等待{wait}' raise MyThrottled(wait) 須要在配置文件中設置: REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES':{ 'wdp':'5/minute', } }

這種簡單寫法的源碼流程:  首先也是要執行  allow_request 方法 ,它本身類中沒有就去基類中找

class SimpleRateThrottle(BaseThrottle):
    def allow_request(self, request, view):
        #簡1   (不執行)
        if self.rate is None:
            return True
        #簡2   若是你不重寫.get_cache_key() 方法 就拋出異常
        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])   #經過惟一標識 到 cache中取至關於剛纔匿名用戶的 一個ip的訪問記錄列表
                # cache 能夠放在本地,也能夠放在緩存中 等
self.now = self.timer() #若是 記錄列表有值而且列表最後面的值 小於當前時間減去限流的週期 就說明這條記錄過時了 while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() #pop掉 #判斷 訪問的次數大不大於 限流的次數 if len(self.history) >= self.num_requests: #大於, return False return self.throttle_failure() #簡3 return self.throttle_success()

第簡1步:  self.rate

class SimpleRateThrottle(BaseThrottle):
    timer = time.time
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, 'rate', None):   #自定義類中沒寫 rate字段,執行 簡1.1 步
        # 簡1.1 self.rate
= self.get_rate()
       # 簡1.2 self.num_requests, self.duration
= self.parse_rate(self.rate)

第簡1.1步:

class SimpleRateThrottle(BaseThrottle):
    def get_rate(self):
        #若是 自定義中沒有定義 scope 字段 ,拋出異常
        if not getattr(self, 'scope', None):
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)
        
        try:
            #簡1.1.1
            return self.THROTTLE_RATES[self.scope]      # 就是去配置文件中取值  '5/minute'
                       #THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES 
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

第簡1.2 步:

class SimpleRateThrottle(BaseThrottle):
    def parse_rate(self, rate):
        num, period = rate.split('/')    #  '5/minute'
        num_requests = int(num)          #  限流的次數
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]    #限流的週期
        return (num_requests, duration)

第簡2 步:

class SimpleRateThrottle(BaseThrottle):
    def get_cache_key(self, request, view):
        raise NotImplementedError('.get_cache_key() must be overridden')    #拋出異常  .get_cache_key() 方法 必須被重寫
     
#因此咱們要在自定義類中 重寫 .get_cache_key()方法
class MySimpleRateThrottle(SimpleRateThrottle):
  def
get_cache_key(self, request, view):
       #簡2.1   
return self.get_ident(request)

第簡2.1步: 說白了就是 去request 中獲取 惟一標識

class BaseThrottle(object):
    def get_ident(self, request):
        
        xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR')
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
                   addrs = xff.split(',')
        client_addr = addrs[-min(num_proxies, len(addrs))]
        return client_addr.strip()

        return ''.join(xff.split()) if xff else remote_addr

 第簡3步:

class SimpleRateThrottle(BaseThrottle):
    def throttle_success(self):
        #向記錄列表中的第一個位置插入數據,再給self.key 輔助
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

 

接着咱們再看看 wait 方法

class SimpleRateThrottle(BaseThrottle):
    def wait(self):
     # 若是 記錄列表有值
        if self.history:
       # 剩餘的週期是 限流週期減去當前時間減去記錄列表最後的一個值的時間 remaining_duration
= self.duration - (self.now - self.history[-1]) else:
       # 若是列表沒有值, 說明是第一次訪問,剩餘週期等於限流的週期 remaining_duration
= self.duration      #可訪問的次數 等於 限流的次數減去 記錄列表的長度加上本次訪問 available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: #若是沒有次數了 返回None return None return remaining_duration / float(available_requests)

 

全局使用訪問頻率限制 的配置

REST_FRAMEWORK = {
            "DEFAULT_THROTTLE_CLASSES":[
                "app02.utils.AnonThrottle",
            ],
            'DEFAULT_THROTTLE_RATES':{
                'wdp_anon':'5/minute',
                'wdp_user':'10/minute',
            }
        }

 

--------------------------------------------------------------------------------------------------------------------

認證+權限+限流 一塊兒使用的代碼:   對匿名用戶進行限制 每一個用戶1分鐘容許訪問5次,登陸用戶1分鐘容許訪問10次

     一個是經過ip(若是客戶端使用代理就很差限流了),另一個是經過登陸用戶的用戶名

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
from rest_framework.authentication import BaseAuthentication
from app02 import models

class MyAuthentication(BaseAuthentication):
    def authenticate(self, request):
        token = request.query_params.get('token')
        obj = models.UserInfo.objects.filter(token=token).first()
        if obj:
            return (obj.username,obj)
        return None

    def authenticate_header(self, request):
        pass

class MyPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user:
            return True
        return False

class AdminPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user == 'alex':
            return True
        return False


class AnonThrottle(SimpleRateThrottle):
    scope = "wdp_anon"

    def get_cache_key(self, request, view):
        # 返回None,表示我不限制
        # 登陸用戶我無論
        if request.user:
            return None
        # 匿名用戶
        return self.get_ident(request)

class UserThrottle(SimpleRateThrottle):
    scope = "wdp_user"

    def get_cache_key(self, request, view):
        # 登陸用戶
        if request.user:
            return request.user
        # 匿名用戶我無論
        return None


# 無需登陸就能夠訪問
class IndexView(APIView):
    authentication_classes = [MyAuthentication,]
    permission_classes = []
    throttle_classes=[AnonThrottle,UserThrottle,]
    def get(self,request,*args,**kwargs):
  
        return Response('訪問首頁')

# 需登陸就能夠訪問
class ManageView(APIView):
    authentication_classes = [MyAuthentication,]
    permission_classes = [MyPermission,]
    throttle_classes=[AnonThrottle,UserThrottle,]
    def get(self,request,*args,**kwargs):

        return Response('訪問首頁')
View Code
REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,
    "DEFAULT_AUTHENTICATION_CLASSES": [
        
    ],
    'DEFAULT_PERMISSION_CLASSES':[

    ],
    'DEFAULT_THROTTLE_RATES':{
        'wdp_anon':'5/minute',
        'wdp_user':'10/minute',

    }
}
配置文件
相關文章
相關標籤/搜索