RestFramework自定製之認證和權限、限制訪問頻率

認證和權限

  所謂認證就是檢測用戶登錄與否,一般與權限對應使用。網站中都是經過用戶登陸後由該用戶相應的角色認證以給予對應的權限。數據庫

  權限是對用戶對網站進行操做的限制,只有在擁有相應權限時纔可對網站中某個功能進行操做。權限老是與認證相輔相成。wdjango

自定製認證規則的重點是繼承內置的BaseAuthentication類,重寫其authenticate()方法。api

自定製認證方式一:經過url傳參進行認證緩存

from django.conf.urls import url, include
from app01.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
ulrs.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.request import Request
from rest_framework import exceptions

######僞造的數據庫中存有的token########
token_list = [
    'sfsfss123kuf3j123',
    'asijnfowerkkf9812',
]

######自定製的認證規則的類,必須繼承BaseAuthentication#####
class TestAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用戶認證,若是驗證成功後返回元組: (用戶,用戶Token)
        :param request: 
        :return: 
            None,表示跳過該驗證;
                若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER()
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()
                else:
                    self.auth = None
            (user,token)表示驗證經過並設置用戶名和Token;
            AuthenticationFailed異常
        """
        val = request.query_params.get('token')
        if val not in token_list:
            raise exceptions.AuthenticationFailed("用戶認證失敗")

        return ('登陸用戶', '用戶token')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        # 驗證失敗時,返回的響應頭WWW-Authenticate對應的值
        pass

#####視圖函數,必須繼承APIView#####
class TestView(APIView):
    authentication_classes = [TestAuthentication, ]#中括號中寫入定義了認證規則的類
    permission_classes = []#這是權限規則,下文會進行詳述

#只有經過了上述的規則,才能如下執行視圖函數
    def get(self, request, *args, **kwargs):
        print(request.user)
        print(request.auth)
        return Response('GET請求,響應內容')

    def post(self, request, *args, **kwargs):
        return Response('POST請求,響應內容')

    def put(self, request, *args, **kwargs):
        return Response('PUT請求,響應內容')
views.py

 

自定製認證方式二:經過請求頭認證安全

from django.conf.urls import url, include
from app01.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
ulrs.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.request import Request
from rest_framework import exceptions


######自定製的認證規則的類,必須繼承BaseAuthentication#####
class TestAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用戶認證,若是驗證成功後返回元組: (用戶,用戶Token)
        :param request: 
        :return: 
            None,表示跳過該驗證;
                若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER()
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()
                else:
                    self.auth = None
            (user,token)表示驗證經過並設置用戶名和Token;
            AuthenticationFailed異常
        """
        import base64
        auth = request.META.get('HTTP_AUTHORIZATION', b'')#獲取請求頭
        if auth:
            auth = auth.encode('utf-8')#將bytes類型編碼成utf-8
        auth = auth.split()
        if not auth or auth[0].lower() != b'basic':
            raise exceptions.AuthenticationFailed('驗證失敗')
        if len(auth) != 2:
            raise exceptions.AuthenticationFailed('驗證失敗')
        username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':')
        if username == 'Damon' and password == '123':
            return ('登陸用戶', '用戶token')
        else:
            raise exceptions.AuthenticationFailed('用戶名或密碼錯誤')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        return 'Basic realm=api'

#####視圖函數,必須繼承APIView#####
class TestView(APIView):
    authentication_classes = [TestAuthentication, ]#中括號中寫入定義了認證規則的類,可放入多個
    permission_classes = []#這是權限規則,下文會進行詳述
#只有經過了上述的規則,才能如下執行視圖函數
    def get(self, request, *args, **kwargs):
        print(request.user)
        print(request.auth)
        return Response('GET請求,響應內容')

    def post(self, request, *args, **kwargs):
        return Response('POST請求,響應內容')

    def put(self, request, *args, **kwargs):
        return Response('PUT請求,響應內容')
views.py

 

使用自定製認證和自定製權限app

from django.db import models

class UserInfo(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=64)
    token = models.CharField(max_length=64,null=True)
models.py
from django.conf.urls import url
from django.contrib import admin
from app02 import views as app02_view

urlpatterns = [
    # url(r'^admin/', admin.site.urls),

    url(r'^auth/', app02_view.AuthView.as_view()),
    url(r'^hosts/', app02_view.HostView.as_view()),
    url(r'^users/', app02_view.UserView.as_view()),
    url(r'^salary/', app02_view.SalaryView.as_view()),

]
View Code
from django.views import View

from rest_framework.views import APIView
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from rest_framework.response import Response

from app02 import models
import hashlib
import time

#####自定製認證規則的類#####
class MyAuthentication(BaseAuthentication):

    def authenticate(self, request):
        token = request.query_params.get('token')
        obj = models.UserInfo.objects.filter(token=token).first()
        if obj:
            return (obj.username,obj)
        return None

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        # return 'Basic realm="api"'
        pass
#####自定製權限的類#####
class MyPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user:
            return True
        return False
#####自定製管理員權限的類#####
class AdminPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user == 'Damon':#管理員
            return True
        return False

#####視圖函數#####
class HostView(APIView):
    """
    匿名用戶和用戶都能訪問
    """
    authentication_classes = [MyAuthentication,]
    permission_classes = []
    def get(self,request,*args,**kwargs):
        # 原來request對象,django.core.handlers.wsgi.WSGIRequest
        # 如今的request對象,rest_framework.request.Request\
        self.dispatch
        print(request.user)
        # print(request.user)
        # print(request.auth)
        return Response('主機列表')

class UserView(APIView):
    """
    普通用戶能訪問
    """
    authentication_classes = [MyAuthentication, ]
    permission_classes = [MyPermission,]#自定製普通用戶擁有的權限
    def get(self,request,*args,**kwargs):
        return Response('用戶列表')

class SalaryView(APIView):
    """
    管理員用戶才能訪問
    """
    authentication_classes = [MyAuthentication, ]
    permission_classes = [MyPermission,AdminPermission,]#自定製普通用戶和管理員擁有的權限
    def get(self,request,*args,**kwargs):
        self.dispatch
        return Response('薪資列表')

    def permission_denied(self, request, message=None):
        #若是沒有經過認證,而且權限中return False,就會報下面的異常,detail爲自定製的錯誤信息
        if request.authenticators and not request.successful_authenticator:
            raise exceptions.NotAuthenticated(detail='無權訪問')
        raise exceptions.PermissionDenied(detail=message)
views.py

 

設置全局變量ide

  對於認證和權限,咱們可新建utils.py文件,將自定製的類寫入該py文件中,以後在settings.py中進行配置,就可快速使用。在views.py中只需寫入相應視圖函數,無需關心中括號便可實現認證和權限的配置,使得views.py文件中的代碼可讀性更高。函數

REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,#將匿名用戶名稱設置爲None,默認爲Anonymous
    "DEFAULT_AUTHENTICATION_CLASSES": [
        "app02.utils.MyAuthentication",#配置自定製認證類的路徑
    ],
    "DEFAULT_PERMISSION_CLASSES": [
        "app02.utils.MyPermission",#配置自定製權限類的路徑
        "app02.utils.AdminPermission",#配置自定製權限類的路徑
    ],
}
settings.py

 

限制訪問頻率

  建網站的宗旨是爲人民服務,供人民訪問。但總有刁民想害朕,如利用機器人爬蟲肆意爬取數據、侵佔流量緩存、洪水攻擊等等。因此咱們須要對能夠訪問網站的用戶進行相應的訪問頻率的限制,以保護咱們網站的安全。源碼分析

  訪問的用戶有兩種——登陸用戶、匿名用戶。對於登陸用戶咱們可採用惟一標識進行標記,而匿名用戶咱們一般採用IP對其進行標記。post

a、基於用戶IP限制訪問頻率

流程分析:

  • 首先獲取用戶信息,若是是匿名用戶,獲取IP。若是不是匿名用戶獲取其用戶名。
  • 在request裏面獲取匿名用戶IP(也有多是代理的IP),如IP= 127.1.1.1。
  • 將獲取到的IP添加到到recode字典裏面,須要在添加以前先限制一下。
  • 若是時間間隔大於60秒(可自定製時長),說明村吃的該用戶最先一次訪問的時間久遠、已失效,將該次訪問的時間pop。在timelist列表裏如今留的是有效的訪問時間段。
  • 判斷該IP訪問次數是否超過10次,若是超過return False予以限制。

具體實現:重點是繼承BaseThrottle類,重寫其allow_request()和wait()方法

from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle

from rest_framework import exceptions

RECORD = {

}
#####自定製對訪問頻率限制的類#####
class MyThrottle(BaseThrottle):
    def allow_request(self,request,view):
        """
        # 返回False,限制
        # 返回True,通行
        :param request: 
        :param view: 
        :return: 
        """
        """
         對匿名用戶進行限制:每一個用戶1分鐘容許訪問10次
            - 獲取用戶IP request 1.1.1.1
        """
        import time
        ctime = time.time()
        ip = self.get_ident()
        if ip not in RECORD:
            RECORD[ip] = [ctime,]#該IP是首次訪問,存儲當前訪問時間
        else:
            # [4507862389234,3507862389234,2507862389234,1507862389234,],原先存有的訪問時間
            time_list = RECORD[ip]
            while True:
                val = time_list[-1]#取出最先訪問的時間
                if (ctime-60) > val:#檢測此次訪問時間與最先訪問時間間隔是否超過一分鐘
                    time_list.pop()#是,則更新列表中最先的訪問時間(確保列表內的訪問時間與這次訪問的時間間隔在一分鐘內)
                else:
                    break#否,則表示在一分鐘內有屢次訪問
            if len(time_list) > 10:#檢測一分鐘訪問次數是否超過十次則限制
                return False#是則限制
            time_list.insert(0,ctime)#不超過十次則將當前時間存入列表
        return True
    def wait(self):
        import time
        ctime = time.time()
        first_in_time = RECORD[self.get_ident()][-1]
        wt = 60 - (ctime - first_in_time)#動態顯示須要等待的時間60-(當前時間 - 最近一次訪問時間)
        return wt


#####視圖函數#####
class LimitView(APIView):
    authentication_classes = []
    permission_classes = []
    throttle_classes=[MyThrottle,]
    def get(self,request,*args,**kwargs):
        self.dispatch
        return Response('控制訪問頻率示例')

    def throttled(self, request, wait):
        """
        If request is throttled, determine what kind of exception to raise.
        """
        #可自定製該方法設置中文的錯誤提示信息
        class MyThrottled(exceptions.Throttled):
            default_detail = '請求被限制.'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = '還須要再等待{wait}秒'

        raise MyThrottled(wait)
views.py

 

b. 使用配置文件,基於用戶IP限制訪問頻率(利於Django緩存)

源碼分析

class SimpleRateThrottle(BaseThrottle):
    """
    一個簡單的緩存實現,只須要` get_cache_key() `。被覆蓋。
    速率(請求/秒)是由視圖上的「速率」屬性設置的。類。該屬性是一個字符串的形式number_of_requests /期。
    週期應該是:(的),「秒」,「M」,「min」,「h」,「小時」,「D」,「一天」。
    之前用於節流的請求信息存儲在高速緩存中。
    A simple cache implementation, that only requires `.get_cache_key()`
    to be overridden.

    The rate (requests / seconds) is set by a `throttle` attribute on the View
    class.  The attribute is a string of the form 'number_of_requests/period'.

    Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')

    Previous request information used for throttling is stored in the cache.
    """
    cache = default_cache
    timer = time.time
    cache_format = 'throttle_%(scope)s_%(ident)s'
    scope = None
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, 'rate', None):
            self.rate = self.get_rate()#點進去看到須要些一個scope  ,2/m
        self.num_requests, self.duration = self.parse_rate(self.rate)

    def get_cache_key(self, request, view):#這個至關因而一個半成品,咱們能夠來補充它
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        raise NotImplementedError('.get_cache_key() must be overridden')

    def get_rate(self):
        """
        Determine the string representation of the allowed request rate.
        """
        if not getattr(self, 'scope', None):#檢測必須有scope,沒有就報錯了
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        try:
            return self.THROTTLE_RATES[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)

    def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split('/')#取配置文件並切分  'wdp':'2/minute'
        num_requests = int(num)#2
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]#用於配置文件的信息minute取m,即60秒
        return (num_requests, duration)

    # 二、執行完構造方法後執行,
    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)#三、執行get_cache_key
        if self.key is None:
            return True#不限制

        self.history = self.cache.get(self.key, [])#四、獲得的key,默認是一個列表,賦值給了self.history,
                                                       # self.history能夠理解爲每個ip對應的訪問記錄
        self.now = self.timer()

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()

    def throttle_success(self):
        """
        Inserts the current request's timestamp along with the key
        into the cache.
        """
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True

    def throttle_failure(self):
        """
        Called when a request to the API has failed due to throttling.
        """
        return False

    def wait(self):
        """
        Returns the recommended next request time in seconds.
        """
        if self.history:
            remaining_duration = self.duration - (self.now - self.history[-1])
        else:
            remaining_duration = self.duration

        available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0:
            return None

        return remaining_duration / float(available_requests)
源碼SimpleRateThrottle

自定製

settings.py中:

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'test_scope': '10/m',#自定製限制的時間  一分鐘10次
    },
}
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
from rest_framework import exceptions

class MySimpleRateThrottle(SimpleRateThrottle):
    scope = "test_scope"#scope不可改,字符串需與配置文件相同

    def get_cache_key(self, request, view):
        return self.get_ident(request)

class LimitView(APIView):
    authentication_classes = []
    permission_classes = []
    throttle_classes=[MySimpleRateThrottle,]
    def get(self,request,*args,**kwargs):
        # self.dispatch
        return Response('控制訪問頻率示例')

    def throttled(self, request, wait):
        """
        If request is throttled, determine what kind of exception to raise.
        """
        #可自定製該方法設置中文的錯誤提示信息
        class MyThrottled(exceptions.Throttled):
            default_detail = '請求被限制.'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = '還須要再等待{wait}秒'
views.py

 

c.對不一樣用戶進行不一樣的限流操做

  對匿名用戶每一個用戶1分鐘容許訪問5次,對於登陸的普通用戶1分鐘訪問10次,VIP用戶一分鐘訪問20次。

操做流程:

  • 首頁能夠匿名訪問
  • 先認證,只有認證了才知道是否是匿名的,
  • 權限登陸成功以後才能訪問, index頁面無需權限便可訪問
  • 限流在配置文件中

settings.py 中進行配置

REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,
    'DEFAULT_THROTTLE_RATES': {#自定製鍵值對
        'obj_anon': '10/m',#匿名用戶
        'obj_user': '20/m',#登陸用戶
        'obj_VIPuser':'20/minute',#VIP用戶
    },
}
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from app02 import models

class MyAuthentication(BaseAuthentication):
    #檢測用戶是否登陸
    def authenticate(self, request):
        token = request.query_params.get('token')#登陸用戶有tocken字段
        obj = models.UserInfo.objects.filter(token=token).first()
        if obj:
            return (obj.username,obj)
        return None#未登陸用戶不處理

    def authenticate_header(self, request):
        pass

class MyPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user:
            return True  #True表示有權限
        return False  #False表示無權限

class AdminPermission(object):
    message = "無權訪問"
    def has_permission(self,request,view):
        if request.user == 'DamonVIP':#VIP用戶
            return True
        return False

######對匿名用戶進行限流的類#####
class AnonThrottle(SimpleRateThrottle):
    scope = "obj_anon"

    def get_cache_key(self, request, view):
        # 返回None,表示我不限制
        # 登陸用戶我無論
        if request.user:
            return None
        # 匿名用戶
        return self.get_ident(request)

######對登陸用戶進行限流的類#####
class UserThrottle(SimpleRateThrottle):
    scope = "obj_user"

    def get_cache_key(self, request, view):
        # 登陸用戶
        if request.user:
            return request.user
        # 匿名用戶我無論
        return None

######對VIP用戶進行限流的類#####
class VIPUserThrottle(SimpleRateThrottle):
    scope = "obj_VIPuser"

    def get_cache_key(self, request, view):
        # VIP用戶
        if request.user=='DamonVIP':#VIP用戶
            return request.user
        # 匿名用戶我無論
        return None

#####視圖函數#####
# 首頁無需登陸就能夠訪問
class IndexView(APIView):
    authentication_classes = [MyAuthentication,]#認證判斷他是否是匿名用戶
    permission_classes = []  #主頁無需權限驗證
    throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle]#對匿名用戶和普通用戶的訪問限制
    def get(self,request,*args,**kwargs):
        # self.dispatch
        return Response('訪問首頁')

    def throttled(self, request, wait):
        '''可定製方法設置中文錯誤'''
      # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled):

            default_detail = '請求被限制'
            extra_detail_singular = 'Expected available in {wait} second.'
            extra_detail_plural = 'Expected available in {wait} seconds.'
            default_code = '還須要再等{wait}秒'
        raise MyThrottle(wait)

# 需登陸就能夠訪問
class ManageView(APIView):
    authentication_classes = [MyAuthentication,]
    permission_classes = [MyPermission,]
    throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle]
    def get(self,request,*args,**kwargs):
        # self.dispatch
        return Response('訪問首頁')

# 需登陸就能夠訪問
class SalaryView(APIView):
    authentication_classes = [MyAuthentication,]
    permission_classes = [MyPermission,]
    throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle]
    def get(self,request,*args,**kwargs):
        # self.dispatch
        return Response('訪問首頁')
views.py

 

設置全局變量

  與認證和權限類似,限流操做也可新建utils.py文件,將自定製的類寫入該py文件中,以後在settings.py中進行配置,就可快速使用。在views.py中只需寫入相應視圖函數,無需關心中括號便可實現認證和權限的配置,使得views.py文件中的代碼可讀性更高。

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_CLASSES': [
        'app04.utils.throttles.throttles.MyAnonRateThrottle',
        'app04.utils.throttles.throttles.MyUserRateThrottle',
    ],
    'DEFAULT_THROTTLE_RATES': {
        'anon': '10/day',
        'user': '10/day',
        'My_anon': '10/m',
        'My_user': '20/m',
        'My_VIPuser': '50/m',
    },
}
View Code
相關文章
相關標籤/搜索