Django Restful Framework【第三篇】認證、權限、限制訪問頻率

1、認證

認證請求頭python

views.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.permissions import BasePermission

from rest_framework.request import Request
from rest_framework import exceptions

token_list = [
    'sfsfss123kuf3j123',
    'asijnfowerkkf9812',
]


class TestAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用戶認證,若是驗證成功後返回元組: (用戶,用戶Token)
        :param request: 
        :return: 
            None,表示跳過該驗證;
                若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER() # 默認值爲:匿名用戶
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值爲:None
                else:
                    self.auth = None
            (user,token)表示驗證經過並設置用戶名和Token;
            AuthenticationFailed異常
        """
        val = request.query_params.get('token')
        if val not in token_list:
            raise exceptions.AuthenticationFailed("用戶認證失敗")

        return ('登陸用戶', '用戶token')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        pass


class TestPermission(BasePermission):
    message = "權限驗證失敗"

    def has_permission(self, request, view):
        """
        判斷是否有權限訪問當前請求
        Return `True` if permission is granted, `False` otherwise.
        :param request: 
        :param view: 
        :return: True有權限;False無權限
        """
        if request.user == "管理員":
            return True

    # GenericAPIView中get_object時調用
    def has_object_permission(self, request, view, obj):
        """
        視圖繼承GenericAPIView,並在其中使用get_object時獲取對象時,觸發單獨對象權限驗證
        Return `True` if permission is granted, `False` otherwise.
        :param request: 
        :param view: 
        :param obj: 
        :return: True有權限;False無權限
        """
        if request.user == "管理員":
            return True


class TestView(APIView):
    # 認證的動做是由request.user觸發
    authentication_classes = [TestAuthentication, ]

    # 權限
    # 循環執行全部的權限
    permission_classes = [TestPermission, ]

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET請求,響應內容')

    def post(self, request, *args, **kwargs):
        return Response('POST請求,響應內容')

    def put(self, request, *args, **kwargs):
        return Response('PUT請求,響應內容')

  

自定義認證功能
class MyAuthtication(BasicAuthentication):
    def authenticate(self, request):
        token = request.query_params.get('token')  #注意是沒有GET的,用query_params表示
        if token == 'zxxzzxzc':
            return ('uuuuuu','afsdsgdf') #返回user,auth
        # raise AuthenticationFailed('認證錯誤')  #只要拋出認證錯誤這樣的異常就會去執行下面的函數
        raise APIException('認證錯誤')
    def authenticate_header(self, request):  #認證不成功的時候執行
        return 'Basic reala="api"'

class UserView(APIView):
    authentication_classes = [MyAuthtication,]
    def get(self,request,*args,**kwargs):
        print(request.user)
        print(request.auth)
        return Response('用戶列表')

 

 

2、權限

一、需求:Host是匿名用戶和用戶都能訪問  #匿名用戶的request.user = none;User只有註冊用戶能訪問django

urls.py
from app03 import views
from django.conf.urls import url
urlpatterns = [
    # django rest framework
    url('^auth/', views.AuthView.as_view()),
    url(r'^hosts/', views.HostView.as_view()),
    url(r'^users/', views.UsersView.as_view()),
    url(r'^salary/', views.SalaryView.as_view()),
]

  

views.py
from django.shortcuts import render
from rest_framework.views import APIView  #繼承的view
from rest_framework.response import  Response #友好的返回
from rest_framework.authentication import BaseAuthentication   #認證的類
from rest_framework.authentication import BasicAuthentication
from app01 import models
from rest_framework import  exceptions
from rest_framework.permissions import AllowAny   #權限在這個類裏面
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
# Create your views here.
# +++++++++++++++認證類和權限類========================
class MyAuthentication(BaseAuthentication):
    def authenticate(self, request):
        token = request.query_params.get('token')
        obj = models.UserInfo.objects.filter(token=token).first()
        if obj :  #若是認證成功,返回用戶名和auth
            return (obj.username,obj)
        return None  #若是沒有認證成功就不處理,進行下一步

    def authenticate_header(self, request):
        pass

class MyPermission(object):
    message = '無權訪問'
    def has_permission(self,request,view):  #has_permission裏面的self是view視圖對象
        if request.user:
            return True  #若是不是匿名用戶就說明有權限
        return False  #不然無權限

class AdminPermission(object):
    message = '無權訪問'
    def has_permission(self, request, view):  # has_permission裏面的self是view視圖對象
        if request.user=='haiyun':
            return True  # 返回True表示有權限
        return False #返回False表示無權限

# +++++++++++++++++++++++++++
class AuthView(APIView):
    authentication_classes = []  #認證頁面不須要認證

    def get(self,request):
        self.dispatch
        return '認證列表'

class HostView(APIView):
    '''需求:
          Host是匿名用戶和用戶都能訪問  #匿名用戶的request.user = none
          User只有註冊用戶能訪問
    '''
    authentication_classes = [MyAuthentication,]
    permission_classes = []  #都能訪問就不必設置權限了
    def get(self,request):
        print(request.user)
        print(request.auth)
        return Response('主機列表')

class UsersView(APIView):
    '''用戶能訪問,request.user裏面有值'''
    authentication_classes = [MyAuthentication,]
    permission_classes = [MyPermission,]
    def get(self,request):
        print(request.user,'111111111')
        return Response('用戶列表')

    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''若是沒有經過認證,而且權限中return False了,就會報下面的這個異常了'''
            raise exceptions.NotAuthenticated(detail='無權訪問')
        raise exceptions.PermissionDenied(detail=message)

  

認證和權限配合使用
class SalaryView(APIView):
    '''用戶能訪問'''
    message ='無權訪問'
    authentication_classes = [MyAuthentication,]  #驗證是否是用戶
    permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,若是有權限在判斷有沒有管理員的權限
    def get(self,request):
        return Response('薪資列表')

    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''若是沒有經過認證,而且權限中return False了,就會報下面的這個異常了'''
            raise exceptions.NotAuthenticated(detail='無權訪問')
        raise exceptions.PermissionDenied(detail=message)

  

若是趕上這樣的,還能夠自定製,參考源碼api

    def check_permissions(self, request):
        """
        Check if the request should be permitted.
        Raises an appropriate exception if the request is not permitted.
        """
        for permission in self.get_permissions():
            #循環每個permission對象,調用has_permission
            #若是False,則拋出異常
            #True 說明有權訪問
            if not permission.has_permission(request, self):
                self.permission_denied(
                    request, message=getattr(permission, 'message', None)
                )
    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''若是沒有經過認證,而且權限中return False了,就會報下面的這個異常了'''
            raise exceptions.NotAuthenticated()
        raise exceptions.PermissionDenied(detail=message)

那麼咱們能夠重寫permission_denied這個方法,以下:緩存

views.py
class UsersView(APIView):
    '''用戶能訪問,request.user裏面有值'''
    authentication_classes = [MyAuthentication,]
    permission_classes = [MyPermission,]
    def get(self,request):
        return Response('用戶列表')

    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''若是沒有經過認證,而且權限中return False了,就會報下面的這個異常了'''
            raise exceptions.NotAuthenticated(detail='無權訪問')
        raise exceptions.PermissionDenied(detail=message)

  

2. 全局使用app

上述操做中均是對單獨視圖進行特殊配置,若是想要對全局進行配置,則須要再配置文件中寫入便可。ide

settings.py
REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,  #將匿名用戶設置爲None
    "DEFAULT_AUTHENTICATION_CLASSES": [
        "app01.utils.MyAuthentication",
    ],
    'DEFAULT_PERMISSION_CLASSES':[
        "app03.utils.MyPermission",#設置路徑,
    ]
}

  

Views.py
class AuthView(APIView):
    authentication_classes = []  #認證頁面不須要認證

    def get(self,request):
        self.dispatch
        return '認證列表'

class HostView(APIView):
    '''需求:
          Host是匿名用戶和用戶都能訪問  #匿名用戶的request.user = none
          User只有註冊用戶能訪問
    '''
    authentication_classes = [MyAuthentication,]
    permission_classes = []  #都能訪問就不必設置權限了
    def get(self,request):
        print(request.user)
        print(request.auth)
        return Response('主機列表')

class UsersView(APIView):
    '''用戶能訪問,request.user裏面有值'''
    authentication_classes = [MyAuthentication,]
    permission_classes = [MyPermission,]
    def get(self,request):
        print(request.user,'111111111')
        return Response('用戶列表')

    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''若是沒有經過認證,而且權限中return False了,就會報下面的這個異常了'''
            raise exceptions.NotAuthenticated(detail='無權訪問')
        raise exceptions.PermissionDenied(detail=message)


class SalaryView(APIView):
    '''用戶能訪問'''
    message ='無權訪問'
    authentication_classes = [MyAuthentication,]  #驗證是否是用戶
    permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,若是有權限在判斷有沒有管理員的權限
    def get(self,request):
        return Response('薪資列表')

    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''若是沒有經過認證,而且權限中return False了,就會報下面的這個異常了'''
            raise exceptions.NotAuthenticated(detail='無權訪問')
        raise exceptions.PermissionDenied(detail=message)

  

 

3、限流

一、爲何要限流呢?  函數

答:防爬源碼分析

二、限制訪問頻率源碼分析post

self.check_throttles(request)
self.check_throttles(request)
check_throttles
def check_throttles(self, request):
        """
        Check if request should be throttled.
        Raises an appropriate exception if the request is throttled.
        """
        for throttle in self.get_throttles():
            #循環每個throttle對象,執行allow_request方法
            # allow_request:
                #返回False,說明限制訪問頻率
                #返回True,說明不限制,通行
            if not throttle.allow_request(request, self):
                self.throttled(request, throttle.wait())
                #throttle.wait()表示還要等多少秒就能訪問了
get_throttles
def get_throttles(self):
        """
        Instantiates and returns the list of throttles that this view uses.
        """
        #返回對象
        return [throttle() for throttle in self.throttle_classes]
找到類,可自定製類throttle_classes
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
BaseThrottle
class BaseThrottle(object):
    """
    Rate throttling of requests.
    """

    def allow_request(self, request, view):
        """
        Return `True` if the request should be allowed, `False` otherwise.
        """
        raise NotImplementedError('.allow_request() must be overridden')

    def get_ident(self, request):
        """
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR')
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(',')
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return ''.join(xff.split()) if xff else remote_addr

    def wait(self):
        """
        Optionally, return a recommended number of seconds to wait before
        the next request.
        """
        return None
也能夠重寫allow_request方法
 
可自定製返回的錯誤信息throttled
def throttled(self, request, wait):
        """
        If request is throttled, determine what kind of exception to raise.
        """
        raise exceptions.Throttled(wait)
raise exceptions.Throttled(wait)錯誤信息詳情
class Throttled(APIException):
    status_code = status.HTTP_429_TOO_MANY_REQUESTS
    default_detail = _('Request was throttled.')
    extra_detail_singular = 'Expected available in {wait} second.'
    extra_detail_plural = 'Expected available in {wait} seconds.'
    default_code = 'throttled'

    def __init__(self, wait=None, detail=None, code=None):
        if detail is None:
            detail = force_text(self.default_detail)
        if wait is not None:
            wait = math.ceil(wait)
            detail = ' '.join((
                detail,
                force_text(ungettext(self.extra_detail_singular.format(wait=wait),
                                     self.extra_detail_plural.format(wait=wait),
                                     wait))))
        self.wait = wait
        super(Throttled, self).__init__(detail, code)

  

 下面來看看最簡單的從源碼中分析的示例,這只是舉例說明了一下ui

urls.py
from django.conf.urls import url
from app04 import views
urlpatterns = [
    url('limit/',views.LimitView.as_view()),

]
views.py
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import exceptions
# from rest_framewor import
# Create your views here.
class MyThrottle(object):
    def allow_request(self,request,view):
        #返回False,限制
        #返回True,不限制
        pass
    def wait(self):
        return 1000


class LimitView(APIView):
    authentication_classes = []  #不讓認證用戶
    permission_classes = []  #不讓驗證權限
    throttle_classes = [MyThrottle, ]
    def get(self,request):
        # self.dispatch
        return Response('控制訪問頻率示例')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''
        # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):
            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'
        raise MyThrottle(wait)

  

三、需求:對匿名用戶進行限制,每一個用戶一分鐘容許訪問10次(只針對用戶來講)

a、基於用戶IP限制訪問頻率

流程分析:

  • 先獲取用戶信息,若是是匿名用戶,獲取IP。若是不是匿名用戶就能夠獲取用戶名。
  • 獲取匿名用戶IP,在request裏面獲取,好比IP= 1.1.1.1。
  • 吧獲取到的IP添加到到recode字典裏面,須要在添加以前先限制一下。
  • 若是時間間隔大於60秒,說明時間久遠了,就把那個時間給剔除 了pop。在timelist列表裏面如今留的是有效的訪問時間段。
  • 而後判斷他的訪問次數超過了10次沒有,若是超過了時間就return False。
  • 美中不足的是時間是固定的,咱們改變他爲動態的:列表裏面最開始進來的時間和當前的時間進行比較,看須要等多久。

具體實現:

views初級版本
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import exceptions
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制訪問頻率
import time
# Create your views here.
RECORD = {}
class MyThrottle(BaseThrottle):

    def allow_request(self,request,view):
        '''對匿名用戶進行限制,每一個用戶一分鐘訪問10次 '''
        ctime = time.time()
        ip = '1.1.1.1'
        if ip not in RECORD:
            RECORD[ip] = [ctime]
        else:
            #[152042123,15204212,3152042,123152042123]
            time_list = RECORD[ip]  #獲取ip裏面的值
            while True:
                val = time_list[-1]#取出最後一個時間,也就是訪問最先的時間
                if (ctime-60)>val:  #吧時間大於60秒的給剔除了
                    time_list.pop()
                #剔除了以後timelist裏面就是有效的時間了,在進行判斷他的訪問次數是否是超過10次
                else:
                    break
            if len(time_list) >10:
                return False        # 返回False,限制
            time_list.insert(0, ctime)
        return True   #返回True,不限制

    def wait(self):
        ctime = time.time()
        first_in_time = RECORD['1.1.1.1'][-1]
        wt = 60-(ctime-first_in_time)
        return wt


class LimitView(APIView):
    authentication_classes = []  #不讓認證用戶
    permission_classes = []  #不讓驗證權限
    throttle_classes = [MyThrottle, ]
    def get(self,request):
        # self.dispatch
        return Response('控制訪問頻率示例')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''
        # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):
            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'
        raise MyThrottle(wait)
 
稍微作了改動
# from django.shortcuts import render
# from rest_framework.views import APIView
# from rest_framework.response import Response
# from rest_framework import exceptions
# from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制訪問頻率
# import time
# # Create your views here.
# RECORD = {}
# class MyThrottle(BaseThrottle):
#
#     def allow_request(self,request,view):
#         '''對匿名用戶進行限制,每一個用戶一分鐘訪問10次 '''
#         ctime = time.time()
#         ip = '1.1.1.1'
#         if ip not in RECORD:
#             RECORD[ip] = [ctime]
#         else:
#             #[152042123,15204212,3152042,123152042123]
#             time_list = RECORD[ip]  #獲取ip裏面的值
#             while True:
#                 val = time_list[-1]#取出最後一個時間,也就是訪問最先的時間
#                 if (ctime-60)>val:  #吧時間大於60秒的給剔除了
#                     time_list.pop()
#                 #剔除了以後timelist裏面就是有效的時間了,在進行判斷他的訪問次數是否是超過10次
#                 else:
#                     break
#             if len(time_list) >10:
#                 return False        # 返回False,限制
#             time_list.insert(0, ctime)
#         return True   #返回True,不限制
#
#     def wait(self):
#         ctime = time.time()
#         first_in_time = RECORD['1.1.1.1'][-1]
#         wt = 60-(ctime-first_in_time)
#         return wt
#
#
# class LimitView(APIView):
#     authentication_classes = []  #不讓認證用戶
#     permission_classes = []  #不讓驗證權限
#     throttle_classes = [MyThrottle, ]
#     def get(self,request):
#         # self.dispatch
#         return Response('控制訪問頻率示例')
#
#     def throttled(self, request, wait):
#         '''可定製方法設置中文錯誤'''
#         # raise exceptions.Throttled(wait)
#         class MyThrottle(exceptions.Throttled):
#             default_detail = '請求被限制'
#             extra_detail_singular = 'Expected available in {wait} second.'
#             extra_detail_plural = 'Expected available in {wait} seconds.'
#             default_code = '還須要再等{wait}秒'
#         raise MyThrottle(wait)



from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import exceptions
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制訪問頻率
import time
# Create your views here.
RECORD = {}
class MyThrottle(BaseThrottle):

    def allow_request(self,request,view):
        '''對匿名用戶進行限制,每一個用戶一分鐘訪問10次 '''
        ctime = time.time()
        self.ip =self.get_ident(request)
        if self.ip not in RECORD:
            RECORD[self.ip] = [ctime]
        else:
            #[152042123,15204212,3152042,123152042123]
            time_list = RECORD[self.ip]  #獲取ip裏面的值
            while True:
                val = time_list[-1]#取出最後一個時間,也就是訪問最先的時間
                if (ctime-60)>val:  #吧時間大於60秒的給剔除了
                    time_list.pop()
                #剔除了以後timelist裏面就是有效的時間了,在進行判斷他的訪問次數是否是超過10次
                else:
                    break
            if len(time_list) >10:
                return False        # 返回False,限制
            time_list.insert(0, ctime)
        return True   #返回True,不限制

    def wait(self):
        ctime = time.time()
        first_in_time = RECORD[self.ip][-1]
        wt = 60-(ctime-first_in_time)
        return wt


class LimitView(APIView):
    authentication_classes = []  #不讓認證用戶
    permission_classes = []  #不讓驗證權限
    throttle_classes = [MyThrottle, ]
    def get(self,request):
        # self.dispatch
        return Response('控制訪問頻率示例')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''
        # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):
            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'
        raise MyThrottle(wait)

  

 

b、用resetframework內部的限制訪問頻率(利於Django緩存)

 源碼分析:

from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制訪問頻率
BaseThrottle至關於一個抽象類
class BaseThrottle(object):
    """
    Rate throttling of requests.
    """

    def allow_request(self, request, view):
        """
        Return `True` if the request should be allowed, `False` otherwise.
        """
        raise NotImplementedError('.allow_request() must be overridden')

    def get_ident(self, request):  #惟一標識
        """
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR') #獲取IP等
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(',')
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return ''.join(xff.split()) if xff else remote_addr

    def wait(self):
        """
        Optionally, return a recommended number of seconds to wait before
        the next request.
        """
        return None

 

SimpleRateThrottle
class SimpleRateThrottle(BaseThrottle):
    """
    一個簡單的緩存實現,只須要` get_cache_key() `。被覆蓋。
    速率(請求/秒)是由視圖上的「速率」屬性設置的。類。該屬性是一個字符串的形式number_of_requests /期。
    週期應該是:(的),「秒」,「M」,「min」,「h」,「小時」,「D」,「一天」。
    之前用於節流的請求信息存儲在高速緩存中。
    A simple cache implementation, that only requires `.get_cache_key()`
    to be overridden.

    The rate (requests / seconds) is set by a `rate` attribute on the View
    class.  The attribute is a string of the form 'number_of_requests/period'.

    Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')

    Previous request information used for throttling is stored in the cache.
    """
    cache = default_cache
    timer = time.time
    cache_format = 'throttle_%(scope)s_%(ident)s'
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, 'rate', None):
            self.rate = self.get_rate()
        self.num_requests, self.duration = self.parse_rate(self.rate)

    def get_cache_key(self, request, view):#這個至關因而一個半成品,咱們能夠來補充它
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        raise NotImplementedError('.get_cache_key() must be overridden')

    def get_rate(self):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, 'scope', None):
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        try:
            return self.THROTTLE_RATES[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

    def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split('/')
        num_requests = int(num)
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
        return (num_requests, duration)

    #一、一進來會先執行他,
    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)  #二、執行get_cache_key,這裏的self.key就至關於咱們舉例ip
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])  #三、獲得的key,默認是一個列表,賦值給了self.history,
                                                        # 這時候self.history就是每個ip對應的訪問記錄
        self.now = self.timer()

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

    def throttle_success(self):
        """
        Inserts the current request's timestamp along with the key
        into the cache.
        """
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

    def throttle_failure(self):
        """
        Called when a request to the API has failed due to throttling.
        """
        return False

    def wait(self):
        """
        Returns the recommended next request time in seconds.
        """
        if self.history:
            remaining_duration = self.duration - (self.now - self.history[-1])
        else:
            remaining_duration = self.duration

        available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0:
            return None

        return remaining_duration / float(available_requests)

  

請求一進來會先執行SimpleRateThrottle這個類的構造方法

__init__
def __init__(self):
        if not getattr(self, 'rate', None):
            self.rate = self.get_rate()  #點進去看到須要些一個scope  ,2/m
        self.num_requests, self.duration = self.parse_rate(self.rate)

  

get_rate
def get_rate(self):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, 'scope', None):  #檢測必須有scope,沒有就報錯了
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        try:
            return self.THROTTLE_RATES[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

  

parse_rate
def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split('/')
        num_requests = int(num)
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
        return (num_requests, duration)

  

allow_request
#二、接下來會先執行他,
    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)  #二、執行get_cache_key,這裏的self.key就至關於咱們舉例ip
        if self.key is None:
            return True  #不限制
        # [114521212,11452121211,45212121145,21212114,521212]
        self.history = self.cache.get(self.key, [])  #三、獲得的key,默認是一個列表,賦值給了self.history,
                                                        # 這時候self.history就是每個ip對應的訪問記錄
        self.now = self.timer()

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

  

wait
def wait(self):
        """
        Returns the recommended next request time in seconds.
        """
        if self.history:
            remaining_duration = self.duration - (self.now - self.history[-1])
        else:
            remaining_duration = self.duration

        available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0:
            return None

        return remaining_duration / float(available_requests)

  

代碼實現:

views.py
###########用resetframework內部的限制訪問頻率##############
class MySimpleRateThrottle(SimpleRateThrottle):
    scope = 'xxx'
    def get_cache_key(self, request, view):
        return self.get_ident(request)  #返回惟一標識IP

class LimitView(APIView):
    authentication_classes = []  #不讓認證用戶
    permission_classes = []  #不讓驗證權限
    throttle_classes = [MySimpleRateThrottle, ]
    def get(self,request):
        # self.dispatch
        return Response('控制訪問頻率示例')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''
        # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):
            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'
        raise MyThrottle(wait)

  

記得在settings裏面配置

settings.py
REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,  #將匿名用戶設置爲None
    "DEFAULT_AUTHENTICATION_CLASSES": [
        "app01.utils.MyAuthentication",
    ],
    'DEFAULT_PERMISSION_CLASSES':[
        # "app03.utils.MyPermission",#設置路徑,
    ],
    'DEFAULT_THROTTLE_RATES':{
        'xxx':'2/minute'  #2分鐘
    }
}

#緩存:放在文件
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
        'LOCATION': 'cache',   #文件路徑
    }
}

  

四、對匿名用戶進行限制,每一個用戶1分鐘容許訪問5次,對於登陸的普通用戶1分鐘訪問10次,VIP用戶一分鐘訪問20次

  • 好比首頁能夠匿名訪問
  • #先認證,只有認證了才知道是否是匿名的,
  • #權限登陸成功以後才能訪問, ,index頁面就不須要權限了
  • If request.user  #判斷登陸了沒有
urls.py
from django.contrib import admin

from django.conf.urls import url, include
from app05 import views

urlpatterns = [
    url('index/',views.IndexView.as_view()),
    url('manage/',views.ManageView.as_view()),
]
views.py
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication  #認證須要
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限流處理
from rest_framework.permissions import BasePermission
from rest_framework import exceptions
from app01 import models
# Create your views here.
###############3##認證#####################
class MyAuthentcate(BaseAuthentication):
    '''檢查用戶是否存在,若是存在就返回user和auth,若是沒有就返回'''
    def authenticate(self, request):
        token = request.query_params.get('token')
        obj = models.UserInfo.objects.filter(token=token).first()
        if obj:
            return (obj.username,obj.token)
        return None  #表示我不處理

##################權限#####################
class MyPermission(BasePermission):
    message='無權訪問'
    def has_permission(self, request, view):
        if request.user:
            return True  #true表示有權限
        return False  #false表示無權限

class AdminPermission(BasePermission):
    message = '無權訪問'

    def has_permission(self, request, view):
        if request.user=='haiyan':
            return True  # true表示有權限
        return False  # false表示無權限

############3#####限流##################3##
class AnonThrottle(SimpleRateThrottle):
    scope = 'wdp_anon'  #至關於設置了最大的訪問次數和時間
    def get_cache_key(self, request, view):
        if request.user:
            return None  #返回None表示我不限制,登陸用戶我無論
        #匿名用戶
        return self.get_ident(request)  #返回一個惟一標識IP

class UserThrottle(SimpleRateThrottle):
    scope = 'wdp_user'
    def get_cache_key(self, request, view):
        #登陸用戶
        if request.user:
            return request.user
        return None  #返回NOne表示匿名用戶我無論


##################視圖#####################
#首頁支持匿名訪問,
#無須要登陸就能夠訪問
class IndexView(APIView):
    authentication_classes = [MyAuthentcate,]   #認證判斷他是否是匿名用戶
    permission_classes = []   #通常主頁就不須要權限驗證了
    throttle_classes = [AnonThrottle,UserThrottle,]  #對匿名用戶和普通用戶的訪問限制

    def get(self,request):
        # self.dispatch
        return Response('訪問首頁')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''

        # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):
            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'

        raise MyThrottle(wait)

#需登陸就能夠訪問
class ManageView(APIView):
    authentication_classes = [MyAuthentcate, ]  # 認證判斷他是否是匿名用戶
    permission_classes = [MyPermission,]  # 通常主頁就不須要權限驗證了
    throttle_classes = [AnonThrottle, UserThrottle, ]  # 對匿名用戶和普通用戶的訪問限制

    def get(self, request):
        # self.dispatch
        return Response('管理人員訪問頁面')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''

        # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):
            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'

        raise MyThrottle(wait)

 

4、總結

一、認證:就是檢查用戶是否存在;若是存在返回(request.user,request.auth);不存在request.user/request.auth=None

 二、權限:進行職責的劃分

三、限制訪問頻率

認證
    - 類:authenticate/authenticate_header ##驗證不成功的時候執行的
    - 返回值:
        - return None,
        - return (user,auth),
        - raise 異常
    - 配置:
        - 視圖:
            class IndexView(APIView):
                authentication_classes = [MyAuthentication,]
        - 全局:
            REST_FRAMEWORK = {
                    'UNAUTHENTICATED_USER': None,
                    'UNAUTHENTICATED_TOKEN': None,
                    "DEFAULT_AUTHENTICATION_CLASSES": [
                        # "app02.utils.MyAuthentication",
                    ],
            }

權限 
    - 類:has_permission/has_object_permission
    - 返回值: 
        - True、#有權限
        - False、#無權限
        - exceptions.PermissionDenied(detail="錯誤信息")  #異常本身隨意,想拋就拋,錯誤信息本身指定
    - 配置:
        - 視圖:
            class IndexView(APIView):
                permission_classes = [MyPermission,]
        - 全局:
            REST_FRAMEWORK = {
                    "DEFAULT_PERMISSION_CLASSES": [
                        # "app02.utils.MyAuthentication",
                    ],
            }
限流
    - 類:allow_request/wait PS: scope = "wdp_user"
    - 返回值:      return True、#不限制      return False  #限制
    - 配置: 
            - 視圖: 
                class IndexView(APIView):
                    
                    throttle_classes=[AnonThrottle,UserThrottle,]
                    def get(self,request,*args,**kwargs):
                        self.dispatch
                        return Response('訪問首頁')
            - 全局
                REST_FRAMEWORK = {
                    "DEFAULT_THROTTLE_CLASSES":[
                    
                    ],
                    'DEFAULT_THROTTLE_RATES':{
                        'wdp_anon':'5/minute',
                        'wdp_user':'10/minute',
                    }
                }
    
相關文章
相關標籤/搜索