rest-framework框架——認證、權限、頻率組件

1、rest-framework登陸驗證

一、models.py添加User和Token模型

class User(models.Model):
    name = models.CharField(max_length=32)
    pwd = models.CharField(max_length=32)


class Token(models.Model):
    user = models.OneToOneField("User", on_delete=models.CASCADE)
    token = models.CharField(max_length=128)

    def __str__(self):
        return self.token

  添加後執行數據庫遷移,添加app01_user和app01_token表。python

二、給login配置url

urlpatterns = [
    ...
    re_path(r'^login/$', views.LoginView.as_view(), name="login"),
]

三、配置視圖

def get_random_str(user):
    """ 生成隨機字符串 """
    import hashlib, time
    ctime = str(time.time())
    md5 = hashlib.md5(bytes(user, encoding='utf-8'))
    md5.update(bytes(ctime, encoding="utf-8"))
    return md5.hexdigest()


class LoginView(APIView):
    def post(self, request):
        # 驗證邏輯:獲取用戶名密碼與數據庫比對
        name = request.data.get("name")
        pwd = request.data.get("pwd")
        user = User.objects.filter(name=name, pwd=pwd).first()
        res = {"state_code": 1000, "msg": None}   # 成功或失敗須要返回的字典標識此次的狀態
        if user:
            # 經過校驗  拿到隨機字符串交給這我的
            random_str = get_random_str(user.name)   # 獲取隨機字符串
            token = Token.objects.update_or_create(user=user, defaults={"token": random_str})
            res["token"] = str(token)   # json不能序列化對象,所以轉爲字符串
        else:
            # 校驗失敗
            res["state_code"]=1001   # 錯誤狀態碼
            res["msg"] = "用戶名或密碼錯誤"

        return Response(json.dumps(res))

  注意:數據庫

(1)登陸驗證邏輯

class LoginView(APIView):
    def post(self, request):
        # 驗證邏輯:獲取用戶名密碼與數據庫比對
        name = request.data.get("name")
        pwd = request.data.get("pwd")
        user = User.objects.filter(name=name, pwd=pwd).first()
        if user:
            # 經過校驗  拿到隨機字符串交給這我的
            pass
        else:
            # 校驗失敗
            pass

        return Response("login....")

(2)生成隨機字符串

def get_random_str(user):
    """ 生成隨機字符串 """
    import hashlib, time
    ctime = str(time.time())
    md5 = hashlib.md5(bytes(user, encoding='utf-8'))
    md5.update(bytes(ctime, encoding="utf-8"))
    return md5.hexdigest()

   注意:ctime=str(time.time()) ,世界上一直在變化的就是時間變量,所以ctime這個變量每一個都是不一樣的。django

  hashlib.md5()構建md5對象。實例化md5時傳遞參數叫作加鹽json

md5 = hashlib.md5(bytes(user, encoding='utf-8'))

   md5.update()寫入要加密的字節:(這裏的md5是實例化出來的對象)api

md5.update(bytes(ctime, encoding="utf-8"))

   md5_obj.hexdigest():獲取密文數組

return md5.hexdigest()

  加鹽以後,即便要加密的數據徹底同樣,可是用戶名確定不同,所以產生的密文必定惟一。瀏覽器

(3)update_or_create(self, defaults=None, **kwargs)方法

Token.objects.update_or_create(user=user,defaults={"token":random_str})

  用給定的**kwargs值查找對象(這裏是user=user),若是defaults不爲None則用defaults的值{"token":random_str}更新對象;若是爲None則建立一個新對象。服務器

class QuerySet:
    def update_or_create(self, defaults=None, **kwargs):
        """
        Look up an object with the given kwargs, updating one with defaults
        if it exists, otherwise create a new one.
        Return a tuple (object, created), where created is a boolean
        specifying whether an object was created.
        """
        defaults = defaults or {}

  若是是create操做返回值是添加的數據,若是是update操做返回值是更新的條數。app

2、局部視圖認證

from rest_framework import exceptions

class TokenAuth(object):   # 這個類名能夠任意取
    def authenticate(self, request):   # 這個方法名不可變更
        token = request.GET.get("token")
        token_obj = Token.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("驗證失敗!")
        # 若是有值  return兩個值中間加逗號,就構成了一個元組
        return token_obj.user.name, token_obj.token  # 元組:(關聯用戶對象的名字,當前登陸對象token)

    def authenticate_header(self, request):   # 不加會報錯,要求要傳入兩個參數
        pass

class BookView(APIView):
    authentication_classes = [TokenAuth, ]

    def get(self, request):
        book_list = Book.objects.all()
        bs = BookModelSerializers(book_list, many=True, context={"request": request})  # 序列化結果
        # return HttpResponse(bs.data)
        return Response(bs.data)

    def post(self, request):
        # POST請求的數據
        bs = BookModelSerializers(data=request.data)
        if bs.is_valid():  # 驗證數據是否合格
            print(bs.validated_data)
            bs.save()  # create方法
            return Response(bs.data)  # 當前添加的數據
        else:
            return Response(bs.errors)

一、分析源碼

  每次請求都要執行dispatch.dom

(1)dispatch分發前的認證權限驗證

  在用戶訪問時執行APIView的dispatch方法,在dispatch進行分發操做前,須要先執行self.initial(request, *args, **kwargs),執行認證、權限、頻率操做。

def dispatch(self, request, *args, **kwargs):
    self.args = args
    self.kwargs = kwargs
    request = self.initialize_request(request, *args, **kwargs)   # 構建新request
    self.request = request
    self.headers = self.default_response_headers  # deprecate?

    try:
        self.initial(request, *args, **kwargs)    # 認證權限頻率

        # Get the appropriate handler method
        if request.method.lower() in self.http_method_names:
            handler = getattr(self, request.method.lower(),
                              self.http_method_not_allowed)
        else:
            handler = self.http_method_not_allowed

        response = handler(request, *args, **kwargs)

    except Exception as exc:
        response = self.handle_exception(exc)

    self.response = self.finalize_response(request, response, *args, **kwargs)
    return self.response

(2)initial()方法中的認證權限頻率組件

def initial(self, request, *args, **kwargs):
  """代碼省略"""
    # Ensure that the incoming request is permitted 
    # 認證組件
    self.perform_authentication(request)
    # 權限組件
    self.check_permissions(request)
    # 頻率組件
    self.check_throttles(request)

(3)查看perform_authentication()方法

def perform_authentication(self, request):
    request.user

  因爲在dispatch函數中,self.initial(request, *args, **kwargs)晚於request = self.initialize_request(request, *args, **kwargs)。所以這裏的request是新構建的request。request.user即須要去Request類中尋找user靜態方法。

  這個新request經過initialize_request方法返回Request類對象:

def initialize_request(self, request, *args, **kwargs):
    parser_context = self.get_parser_context(request)

    return Request(
        request,
        parsers=self.get_parsers(),
        authenticators=self.get_authenticators(),
        negotiator=self.get_content_negotiator(),
        parser_context=parser_context
    )

(4)Request類中有方法user

@property
def user(self):
    """
    Returns the user associated with the current request, as authenticated
    by the authentication classes provided to the request.
  當request已經經過認證類提供的認證,返回當前請求關聯的用戶 """ if not hasattr(self, '_user'): with wrap_attributeerrors(): self._authenticate() return self._user

  注意:@property裝飾器就是負責把一個方法變成屬性調用。未經過驗證的須要用self._authenticate()方法來進行校驗。

(5)_authenticate方法分析(認證核心)

class Request(object):
    def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):
        self._request = request
        self.parsers = parsers or ()
        self.authenticators = authenticators or ()   # 若是是None返回空元組,若是有值返回authenticators
        """
        print(3 and 0)   # 0
        print(0 and 2)   # 0
        print(0 or 1)    # 1  
        print(4 or 1)    # 4
        """

    def _authenticate(self):
        """
        Attempt to authenticate the request using each authentication instance
        in turn.
        """
        for authenticator in self.authenticators:
            try:
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise

            if user_auth_tuple is not None:
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return

        self._not_authenticated()
request.py

  確認authenticators的來源是Request實例化時傳入的,

(6)追溯self.authenticators

  找到Request實例化的方法:initialize_request

class APIView(View):
    def initialize_request(self, request, *args, **kwargs):
        return Request(
            authenticators=self.get_authenticators(),
        )

  再由此找到get_authenticators方法:

class APIView(View):
    def get_authenticators(self):
        """
        Instantiates and returns the list of authenticators that this view can use.
        """
        return [auth() for auth in self.authentication_classes] 

  authentication_classes就是咱們在視圖函數中定義的列表:

class BookView(APIView):
    authentication_classes = [TokenAuth, ]
    def get(self, request):...

    def post(self, request):...

   [auth() for auth in self.authentication_classes]這個語句的含義:循環每個認證類並進行實例化,放在數組中。以[TokenAuth, ]爲例返回值是[TokenAuth(), ]。 

  所以回傳回去authenticators=[TokenAuth(), ]。

(7)_authenticate方法處理

class Request(object):
    def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):
        self._request = request
        self.parsers = parsers or ()
        self.authenticators = authenticators or ()   # 若是是None返回空元組,若是有值返回authenticators
        """
        print(3 and 0)   # 0
        print(0 and 2)   # 0
        print(0 or 1)    # 1  
        print(4 or 1)    # 4
        """

    def _authenticate(self):
        """
        Attempt to authenticate the request using each authentication instance
        in turn.
        """
        for authenticator in self.authenticators:   # [TokenAuth(), ],  authenticator:TokenAuth()
            try:
                user_auth_tuple = authenticator.authenticate(self)  # TokenAuth必須有authenticate方法
                # authenticator.authenticate(self):是一個實例對象調用本身的實例方法,本不須要傳self,這裏必定是傳的一個形參。
                # 這個方法是在Request類中,追溯調用關係可知,這裏的self是一個新request對象
            except exceptions.APIException:  # 拋出錯誤
                self._not_authenticated()   # 沒有驗證成功
                raise

            if user_auth_tuple is not None:   # 若是user_auth_tuple有值
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple  # 將元組的值賦給self.user和self.auth
                return

        self._not_authenticated()   # 沒有驗證成功

  authenticator.authenticate(self):是一個實例對象調用本身的實例方法,本不須要傳self,這裏必定是傳的一個形參。這個方法是在Request類中,追溯調用關係可知,這裏的self是一個新request對象。所以在構建自定義的TokenAuth時必定要在def authenticate(self, request):  添加request參數。

二、測試驗證

  

  訪問時添加數據庫查到的token信息,驗證經過:

  

   打印以前在_authenticate將元組的值賦給self.user和self.auth的值:

class BookView(APIView):
    authentication_classes = [TokenAuth, ]

    def get(self, request):
        print(request.user)   # egon
        print(request.auth)   # 02aee930be6011068e24f68935d52b02

  能夠看到正好對應authoerticate函數的返回值:return token_obj.user.name, token_obj.token.

三、BaseAuthentication模塊引入規範簡化認證類編寫

from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication

class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")
        token_obj = Token.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("驗證失敗!")
        # 若是有值  return兩個值中間加逗號,就構成了一個元組
        return token_obj.user.name, token_obj.token  # 元組:(關聯用戶對象的名字,當前登陸對象token)

    # def authenticate_header(self, request):   # 不加會報錯,且要求要傳入兩個參數
    #     pass

  BaseAuthentication包含authenticate和authenticate_header函數。默認用來被覆蓋。

3、全局視圖認證

 一、源碼分析

  若是沒有在局部定義authentication_classes=[TokenAuth, ]。回溯查找默認的authentication_classes。

(1)APIView有變量authentication_classes

class APIView(View):
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

  api_settings.DEFAULT_AUTHENTICATION_CLASSES是類的實例對象.屬性的模式。當調用不存在的屬性時,Python會試圖調用__getattr__(self,attr)來獲取屬性,而且返回attr。

(2)查看api_settings

  該語句在rest_framework/settings.py中。發現api_settings是一個實例對象。實例化時執行相應的init方法。

api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)

  DEFAULTS這個值是一個字典,每個鍵後面都是跟着一個元組,保存了關於rest_framework全部默認配置。也定義在rest_framework/settings.py文件中。

  

(3)查看APISettings類__init__方法

class APISettings(object):
    def __init__(self, user_settings=None, defaults=None, import_strings=None):
        if user_settings:
            self._user_settings = self.__check_user_settings(user_settings)
        self.defaults = defaults or DEFAULTS
        self.import_strings = import_strings or IMPORT_STRINGS
        self._cached_attrs = set()

  user_settings默認爲None,若是有值拿到user_settings.

  self.defaults = defaults or DEFAULTS  拿到DEFAULTS字典值。

(4)APISettings類的def __getattr__(self, attr)方法

class APISettings(object):
    def __getattr__(self, attr):
        if attr not in self.defaults:
            raise AttributeError("Invalid API setting: '%s'" % attr)

        try:
            # Check if present in user settings
            val = self.user_settings[attr]
        except KeyError:   # 當self.user_settings的值是一個空字典,取值報錯KeyError
            # Fall back to defaults
            val = self.defaults[attr]  # 異常處理去取默認的DEFAULT值

        # Coerce import strings into classes
        if attr in self.import_strings:
            val = perform_import(val, attr)

        # Cache the result
        self._cached_attrs.add(attr)
        setattr(self, attr, val)
        return val

  __getattr__是python裏的一個內建函數,能夠很方便地動態返回一個屬性;當調用不存在的屬性時,Python會試圖調用__getattr__(self,item)來獲取屬性,而且返回item;

class Person(object):
    def __init__(self, name):
        self.name = name

    def __getattr__(self, item):
        print("item", item)

    def dream(self):
        print("dreaming。。。。。")
        
alex = Person("alex")
alex.yuan   # 打印:item yuan

  val = self.user_settings[attr]:user_settings執行的返回值後面加上[attr]

(5)APISettings類的user_settings方法執行

class APISettings(object):
    @property
    def user_settings(self):  # 靜態方法
        if not hasattr(self, '_user_settings'):
            self._user_settings = getattr(settings, 'REST_FRAMEWORK', {})
        return self._user_settings

  這裏的settings指的是restDemo項目中的restDemo/settings.py。

  所以getattr(settings, 'REST_FRAMEWORK', {})  表明的意思是:去settings.py中去拿REST_FRAMEWORK變量,若是拿不到則取一個空字典。所以self._user_settings必定是一個字典。

  所以self.user_settings[attr]是在字典中取鍵attr的值.當字典爲空時,取不到值會拋出Keyerror錯誤,進行異常處理去取默認的DEFAULT字典中的值:

DEFAULTS = {
    """省略代碼"""
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication'
    ),
    """省略代碼"""

二、在settings.py中配置全局視圖

REST_FRAMEWORK = {
    # 仿照DEFAULT配置認證類路徑
    "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"]
}

(1)REST_FRAMEWORK是一個字典

  鍵必須是DEFAULT_AUTHENTICATION_CLASSES

(2)值是認證類路徑

  因爲要指定認證類路徑所以要把以前寫的TokenAuth從views.py遷移到一個新文件(文件名自定義)中,這裏是:/app01/utils.py

from .models import *
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication

class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")
        token_obj = Token.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("驗證失敗!")
        # 若是有值  return兩個值中間加逗號,就構成了一個元組
        return token_obj.user.name, token_obj.token  # 元組:(關聯用戶對象的名字,當前登陸對象token)

三、在全局認證狀況下,配置部分訪問不進行認證

class LoginView(APIView):
    authentication_classes = []    # 配置爲空,優先級高於全局認證,不用進行認證
    def post(self, request):....

4、權限組件

一、權限組件源碼分析

(1)訪問請求交給APIView類下的dispatch方法處理

class APIView(View):
    def dispatch(self, request, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        request = self.initialize_request(request, *args, **kwargs)
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            self.initial(request, *args, **kwargs)
        """代碼省略"""

  在dispatch中使用的initial()方法,包含了權限組件:

def initial(self, request, *args, **kwargs):
  """代碼省略"""
    # Ensure that the incoming request is permitted 
    # 認證組件
    self.perform_authentication(request)
    # 權限組件
    self.check_permissions(request)
    # 頻率組件
    self.check_throttles(request)

(2)權限函數中get_permissions

class APIView(View):
    def check_permissions(self, request):
        """
        Check if the request should be permitted.
        Raises an appropriate exception if the request is not permitted.
        """
        for permission in self.get_permissions():   # 循環的是[SVIPPermission(), ]  permission是SVIPPermission()——權限實例
            if not permission.has_permission(request, self):   # 因而可知權限類必須有has_permission方法
        # 經過權限認證什麼都不用作,不經過執行如下代碼 self.permission_denied( request, message=getattr(permission, 'message', None) )

  這裏循環的是self.get_permissions():

  1)查看get_permissions方法:

class APIView(View):
    def get_permissions(self):
        """
        Instantiates and returns the list of permissions that this view requires.
        """
        return [permission() for permission in self.permission_classes]

  [permission() for permission in self.permission_classes]與認證組件徹底相似:循環每個權限類並進行實例化,放在數組中。

  2)查看確認permission_classes的值:

class APIView(View):
    # The following policies may be set at either globally, or per-view.
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES

  若是在本身的視圖類中定義了permission_classes(優先取當前視圖的局部配置) ,說明配置了局部視圖權限,就取本身定義的。

  若是沒有定義:

  因爲權限組件依然是用了api_settings這個APISettings類實例對象,實例化時執行__init__函數,所以也執行了user_settings靜態方法。

class APISettings(object):
    def __init__(self, user_settings=None, defaults=None, import_strings=None):
        if user_settings:
            self._user_settings = self.__check_user_settings(user_settings)
        self.defaults = defaults or DEFAULTS

    def user_settings(self):
        if not hasattr(self, '_user_settings'):
            self._user_settings = getattr(settings, 'REST_FRAMEWORK', {})  # 去settings.py中去拿REST_FRAMEWORK變量,若是拿不到則取一個空字典
        return self._user_settings

  這裏user_settings函數經過反射取settings下是否有配置‘REST_FRAMEWORK’,若是有配置即說明配置了全局視圖權限。self._user_settings取配置的值

  getattr(settings, 'REST_FRAMEWORK', {})裏的settings是從django引入過來的:

from django.conf import settings

  若是沒有配置則self._user_settings是一個空字典,須要進一步分析api_settings.DEFAULT_PERMISSION_CLASSES:

  APISettings類中包含__getattr__方法,當調用不存在的屬性時,Python會試圖調用__getattr__(self,item)來獲取屬性,而且返回item:

class APISettings(object):
    def __getattr__(self, attr):
        if attr not in self.defaults:
            raise AttributeError("Invalid API setting: '%s'" % attr)

        try:
            # Check if present in user settings
            val = self.user_settings[attr]    # 全局權限視圖未設置時self.user_settings返回值是一個空字典,設置時取到全局配置
        except KeyError:   # 空字典取值報錯拋出異常
            # Fall back to defaults
            val = self.defaults[attr]    # 獲取DEFAULT中默認值

        # Coerce import strings into classes
        if attr in self.import_strings:
            val = perform_import(val, attr)

        # Cache the result
        self._cached_attrs.add(attr)
        setattr(self, attr, val)
        return val

  因爲全局權限視圖未未設置時self.user_settings返回值是一個空字典,所以取值會失敗,經過異常處理獲取DEFAULT默認配置中的值

  3)總結

  self.get_permissions()的返回值是權限實例列表,如下面的示例爲例是[SVIPPermission(), ]。所以for循環拿到的permission是一個個權限實例:SVIPPermission()

  所以若是permission.has_permission返回值是true,直接完成權限驗證;若是返回值是False,則返回沒有權限的提示。

二、局部視圖權限

(1)前置準備

  修改models.py中user表結構:

class User(models.Model):
    name = models.CharField(max_length=32)
    pwd = models.CharField(max_length=32)
    type_choice = ((1, "普通用戶"), (2, "VIP"), (3, "SVIP"))
    user_type = models.IntegerField(choices=type_choice, default=1)

   修改後完成數據庫遷移。

(2)配置權限類

from rest_framework import viewsets

class SVIPPermission(object):
    # 超級用戶可用
    def has_permission(self, request, view):
        username = request.user   # 獲取前面認證過的信息:egon
        user_type = User.objects.filter(name=username).first().user_type
        if user_type == 3:
            # 經過權限認證
            return True
        else:
            # 未經過權限認證
            return False

class AuthorViewSet(viewsets.ModelViewSet):
    # authentication_classes = [TokenAuth, ]
    permission_classes = [SVIPPermission, ]
    queryset = Author.objects.all()  # 配置queryset:告知這個類此次處理的數據
    serializer_class = AuthorModelSerializers  # 告知處理用到的序列化組件

  顯示效果:

  

(3)用message自定義配置錯誤提示

class SVIPPermission(object):
    # 超級用戶可用
    message = "只有超級用戶才能訪問"
    def has_permission(self, request, view):
        username = request.user   # 獲取前面認證過的信息:egon
        user_type = User.objects.filter(name=username).first().user_type
        if user_type == 3:
            # 經過權限認證
            return True
        else:
            # 未經過權限認證
            return False

  顯示效果:

  

三、全局視圖權限

(1)將權限類遷移到utils.py(自定義工具包文件)

from .models import *
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication

class TokenAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.GET.get("token")
        token_obj = Token.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("驗證失敗!")
        # 若是有值  return兩個值中間加逗號,就構成了一個元組
        return token_obj.user.name, token_obj.token  # 元組:(關聯用戶對象的名字,當前登陸對象token)

class SVIPPermission(object):
    # 超級用戶可用
    message = "只有超級用戶才能訪問"
    def has_permission(self, request, view):
        username = request.user   # 獲取前面認證過的信息:egon
        user_type = User.objects.filter(name=username).first().user_type
        if user_type == 3:
            # 經過權限認證
            return True
        else:
            # 未經過權限認證
            return False

(2)在settings.py中配置全局權限

REST_FRAMEWORK = {
    # 認證類路徑
    "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"],
    # 權限類路徑
    "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission"]
}

5、throttle(頻率訪問)組件

一、源碼分析

class APIView(View):
    def dispatch(self, request, *args, **kwargs):
        try:
            self.initial(request, *args, **kwargs)

    def initial(self, request, *args, **kwargs):
        self.check_throttles(request)

    def check_throttles(self, request):
        """
        Check if request should be throttled.
        Raises an appropriate exception if the request is throttled.
        """
        for throttle in self.get_throttles():
            if not throttle.allow_request(request, self):
                self.throttled(request, throttle.wait())

    def get_throttles(self):
        return [throttle() for throttle in self.throttle_classes]   # 頻率對象列表

二、局部視圖throttle

  直接將頻率限制類定義在utils.py中:

from rest_framework.throttling import BaseThrottle

VISIT_RECORD={}

class VisitThrottle(BaseThrottle):
    def __init__(self):
        self.history=None

    def allow_request(self,request,view):
        # 要求訪問站點的頻率不能超過1分鐘3次
        remote_addr = request.META.get('REMOTE_ADDR')
        print(remote_addr)
        import time
        ctime=time.time()

        if remote_addr not in VISIT_RECORD:
            # 未訪問過
            VISIT_RECORD[remote_addr]=[ctime,]
            return True

        history=VISIT_RECORD.get(remote_addr)
        self.history=history

        while history and history[-1]<ctime-60:
            history.pop()

        if len(history) < 3:
            # 未達到頻率限制
            history.insert(0,ctime)
            return True
        else:
            return False

    def wait(self):
        import time
        ctime=time.time()
        return 60-(ctime-self.history[-1])

  在視圖類中中配置局部頻率限制:

class AuthorViewSet(viewsets.ModelViewSet):
    # authentication_classes = [TokenAuth, ]
    # permission_classes = [SVIPPermission, ]
    throttle_classes = [VisitThrottle, ]
    queryset = Author.objects.all()  # 配置queryset:告知這個類此次處理的數據
    serializer_class = AuthorModelSerializers  # 告知處理用到的序列化組件

  注意:

(1)request.META

  request.META 是一個Python字典,包含了全部本次HTTP請求的Header信息,好比用戶IP地址和用戶Agent(一般是瀏覽器的名稱和版本號)。 注意,Header信息的完整列表取決於用戶所發送的Header信息和服務器端設置的Header信息。

三、全局視圖throttle

  在restDemo/settings.py中配置:

REST_FRAMEWORK = {
    # 認證類路徑
    "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"],
    # 權限類路徑
    "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission"],
    # 頻率類路徑
    "DEFAULT_THROTTLE_CLASSES": ["app01.utils.VisitThrottle"]
}

四、內置throttle類

  將頻率配置類修改成:

class VisitThrottle(SimpleRateThrottle):

    scope="visit_rate"
    def get_cache_key(self, request, view):

        return self.get_ident(request)

  settings.py設置:

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': ['app01.utils.TokenAuth'],
    'DEFAULT_PERMISSION_CLASSES': ['app01.utils.SVIPPermission'],
    'DEFAULT_THROTTLE_CLASSES': ['app01.utils.VisitThrottle'],
    "DEFAULT_THROTTLE_RATES": {
        "visit_rate": "1/m",
    }
}
相關文章
相關標籤/搜索