django的rest framework框架——認證、權限、節流控制

1、登陸認證示例git

模擬用戶登陸,獲取token,當用戶訪問訂單或用戶中心時,判斷用戶攜帶正確的token,則容許查看訂單和用戶信息,不然拋出異常:數據庫

from django.conf.urls import url
from django.contrib import admin
from api import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^api/v1/auth/$', views.AuthView.as_view()),
    url(r'^api/v1/order/$', views.OrderView.as_view()),
    url(r'^api/v1/userInfo/$', views.UserInfoView.as_view()),
]
urls.py
from django.db import models


class UserInfo(models.Model):
    user_type_choices = (
        (1, "普通用戶"),
        (2, "vip"),
        (3, "svip"),
    )
    user_type = models.IntegerField(choices=user_type_choices)
    username = models.CharField(max_length=32, unique=True)
    password = models.CharField(max_length=64)


class UserToken(models.Model):
    user = models.OneToOneField(to="UserInfo")
    token = models.CharField(max_length=64)


class Order(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    price = models.DecimalField(max_digits=5, decimal_places=2)
    create_time = models.DateTimeField(auto_now_add=True)
    user = models.ForeignKey(to="UserInfo", on_delete=models.CASCADE, null=True, blank=True)
models.py
import hashlib
import time

from django.http import JsonResponse
from rest_framework.views import APIView
from rest_framework import exceptions

from api import models


def md5(user):
    """生成token"""
    ctime = str(time.time())  # 當前時間
    m = hashlib.md5(bytes(user, encoding="utf-8"))
    m.update(bytes(ctime, encoding="utf-8"))
    return m.hexdigest()


class Authtication(object):
    """認證"""
    def authenticate(self, request):
        token = request._request.GET.get("token")
        token_obj = models.UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("用戶認證失敗")
        return (token_obj.user, token_obj)  # rest framework會將這兩個字段賦值給request,以供後續操做使用

    def authenticate_header(self, request):
        pass


class AuthView(APIView):
    """登陸"""
    def post(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None}
        try:
            # 從請求中獲取用戶登陸信息
            user = request._request.POST.get("username")
            pwd = request._request.POST.get("password")
            # 到數據庫獲取用戶信息
            user_obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
            # 若是獲取到對象則說明認證成功,爲登陸用戶建立token,若是認證失敗則返回錯誤信息
            if not user_obj:
                res["code"] = 1001
                res["msg"] = "用戶名或密碼錯誤"
            else:
                token = md5(user)
                # 將token存入數據庫:若是數據庫存在token就更新,不存在就建立
                models.UserToken.objects.update_or_create(user=user_obj, defaults={"token": token})
                res["token"] = token
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class OrderView(APIView):
    """訂單"""
    authentication_classes = [Authtication]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            orders = models.Order.objects.filter(user=request.user).values("id", "name", "price", "create_time", "user__username")
            res["data"] = list(orders)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class UserInfoView(APIView):
    """用戶中心"""
    authentication_classes = [Authtication]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            # print(request.user)  # 用戶對象
            # print(request.auth)  # 認證對象
            user = models.UserInfo.objects.filter(id=request.auth.user_id).values("id", "username", "password")
            res["data"] = list(user)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)
views.py

2、rest framework認證流程源碼django

rest framework的request.py:api

_not_authenticated()方法的處理流程:瀏覽器

3、rest framework配置緩存

一、如何將以前寫在視圖裏面的 authentication_classes 寫入rest framework的配置文件中:ide

rest framework的配置信息在rest framework的settings.py裏面:post

rest framework的views.py:加密

新建一個utils文件,新建auth.py文件,將自定義的認證類寫到這個文件裏面:url

代碼:

from rest_framework import exceptions

from api import models


class Authtication(object):
    """認證"""
    def authenticate(self, request):
        token = request._request.GET.get("token")
        token_obj = models.UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed("用戶認證失敗")
        return (token_obj.user, token_obj)  # rest framework會將這兩個字段賦值給request,以供後續操做使用

    def authenticate_header(self, request):
        pass
auth.py

再去項目的settings裏面設置這個認證類的路徑:

REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ["api.utils.auth.Authtication",]
}
settings.py

 這樣至關於作了一個全局配置,就不用在每一個視圖裏面再去設置認證類了:

import hashlib
import time

from django.http import JsonResponse
from rest_framework.views import APIView
# from rest_framework import exceptions

from api import models


def md5(user):
    """生成token"""
    ctime = str(time.time())  # 當前時間
    m = hashlib.md5(bytes(user, encoding="utf-8"))
    m.update(bytes(ctime, encoding="utf-8"))
    return m.hexdigest()


class AuthView(APIView):
    """登陸"""
    authentication_classes = []
    def post(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None}
        try:
            # 從請求中獲取用戶登陸信息
            user = request._request.POST.get("username")
            pwd = request._request.POST.get("password")
            # 到數據庫獲取用戶信息
            user_obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
            # 若是獲取到對象則說明認證成功,爲登陸用戶建立token,若是認證失敗則返回錯誤信息
            if not user_obj:
                res["code"] = 1001
                res["msg"] = "用戶名或密碼錯誤"
            else:
                token = md5(user)
                # 將token存入數據庫:若是數據庫存在token就更新,不存在就建立
                models.UserToken.objects.update_or_create(user=user_obj, defaults={"token": token})
                res["token"] = token
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class OrderView(APIView):
    """訂單"""
    # authentication_classes = [Authtication]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            orders = models.Order.objects.filter(user=request.user).values("id", "name", "price", "create_time", "user__username")
            res["data"] = list(orders)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class UserInfoView(APIView):
    """用戶中心"""
    # authentication_classes = [Authtication]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            # print(request.user)  # 用戶對象
            # print(request.auth)  # 認證對象
            user = models.UserInfo.objects.filter(id=request.auth.user_id).values("id", "username", "password")
            res["data"] = list(user)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)
views.py

二、匿名用戶配置

看源碼咱們知道,當認證方法返回None時,rest framework默認是從它的配置文件中讀取匿名用戶來賦值給self.user:

當讀取到「UNAUTHENTICATED_USER」這個值時,就會使用這個值,全部咱們能夠在項目的配置文件中對這個值進行設置:

這樣,當認證方法返回None時,self.user="匿名用戶"

那個UNAUTHENTICATED_TOKEN也是一樣的設置方法。

4、rest framework內置的認證類

在rest framework的authentication.py中:

from rest_framework.authentication import BaseAuthentication


class Authtication(BaseAuthentication):
    """自定製認證"""
    def authenticate(self, request):
        ......

    def authenticate_header(self, request):
        ......
View Code

BasicAuthentication認證類:是採用瀏覽器對用戶名和密碼進行base64加密,而後經過請求頭髮送給服務端,服務端獲取請求頭,對以前加密的用戶名和密碼進行解密,再到數據庫進行校驗。

5、rest framework權限使用

需求:給不一樣的視圖設置不一樣的訪問權限,如設置svip用戶能夠查看全部訂單,普通用戶和vip用戶能夠查看全部用戶信息

一、源碼實現流程

若是has_permission()返回True,則表示有權訪問,不然無權訪問。

 

二、權限控制的實現(局部)

建立權限類,在視圖中使用

新建permission.py文件,寫相關的權限控制類:

class MyPermissionSvip(object):
    """svip 訪問權限控制"""
    message = "只有svip用戶才能訪問"  # 設置無權訪問消息內容
    def has_permission(self, request, view):
        if request.user.user_type != 3:  # 若是用戶類型不是svip 則拒絕訪問
            return False
        return True


class MyPermissionOrdinaryAndVip(object):
    """普通用戶和vip 訪問權限控制"""
    def has_permission(self, request, view):
        if request.user.user_type == 3:  # 若是用戶類型是svip 則拒絕訪問
            return False
        return True
permission.py
import hashlib
import time

from django.http import JsonResponse
from rest_framework.views import APIView

from api import models
from api.utils.permission import MyPermissionSvip, MyPermissionOrdinaryAndVip


def md5(user):
    """生成token"""
    ctime = str(time.time())  # 當前時間
    m = hashlib.md5(bytes(user, encoding="utf-8"))
    m.update(bytes(ctime, encoding="utf-8"))
    return m.hexdigest()


class AuthView(APIView):
    """登陸"""
    authentication_classes = []

    def post(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None}
        try:
            # 從請求中獲取用戶登陸信息
            user = request._request.POST.get("username")
            pwd = request._request.POST.get("password")
            # 到數據庫獲取用戶信息
            user_obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
            # 若是獲取到對象則說明認證成功,爲登陸用戶建立token,若是認證失敗則返回錯誤信息
            if not user_obj:
                res["code"] = 1001
                res["msg"] = "用戶名或密碼錯誤"
            else:
                token = md5(user)
                # 將token存入數據庫:若是數據庫存在token就更新,不存在就建立
                models.UserToken.objects.update_or_create(user=user_obj, defaults={"token": token})
                res["token"] = token
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class OrderView(APIView):
    """訂單"""
    # authentication_classes = [Authtication]  # 認證類(局部)
    permission_classes = [MyPermissionSvip,]  # 權限控制類(局部)

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            orders = models.Order.objects.all().values("id", "name", "price", "create_time", "user__username")
            res["data"] = list(orders)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class UserInfoView(APIView):
    """用戶中心"""
    # authentication_classes = [Authtication]
    permission_classes = [MyPermissionOrdinaryAndVip]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            # print(request.user)  # 用戶對象
            # print(request.auth)  # 認證對象
            user = models.UserInfo.objects.all().values("id", "username", "password")
            res["data"] = list(user)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)
views.py

 三、權限控制的實現(全局)

 在settings中導入權限類的路徑來實現全局控制,這樣就不須要在每一個視圖中設置permission_classes了。

 

import hashlib
import time

from django.http import JsonResponse
from rest_framework.views import APIView

from api import models
from api.utils.permission import MyPermissionSvip, MyPermissionOrdinaryAndVip


def md5(user):
    """生成token"""
    ctime = str(time.time())  # 當前時間
    m = hashlib.md5(bytes(user, encoding="utf-8"))
    m.update(bytes(ctime, encoding="utf-8"))
    return m.hexdigest()


class AuthView(APIView):
    """登陸"""
    authentication_classes = []
    permission_classes = []

    def post(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None}
        try:
            # 從請求中獲取用戶登陸信息
            user = request._request.POST.get("username")
            pwd = request._request.POST.get("password")
            # 到數據庫獲取用戶信息
            user_obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
            # 若是獲取到對象則說明認證成功,爲登陸用戶建立token,若是認證失敗則返回錯誤信息
            if not user_obj:
                res["code"] = 1001
                res["msg"] = "用戶名或密碼錯誤"
            else:
                token = md5(user)
                # 將token存入數據庫:若是數據庫存在token就更新,不存在就建立
                models.UserToken.objects.update_or_create(user=user_obj, defaults={"token": token})
                res["token"] = token
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class OrderView(APIView):
    """訂單"""
    # authentication_classes = [Authtication]  # 認證類(局部)
    # permission_classes = [MyPermissionSvip,]  # 權限控制類(局部)

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            orders = models.Order.objects.all().values("id", "name", "price", "create_time", "user__username")
            res["data"] = list(orders)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class UserInfoView(APIView):
    """用戶中心"""
    # authentication_classes = [Authtication]
    permission_classes = [MyPermissionOrdinaryAndVip]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            # print(request.user)  # 用戶對象
            # print(request.auth)  # 認證對象
            user = models.UserInfo.objects.all().values("id", "username", "password")
            res["data"] = list(user)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)
views.py

 

6、rest framework內置的權限類

from rest_framework.permissions import BasePermission

在rest framework的permissions.py源碼中:

建議在自定義權限控制類時,繼承這個BasePermission類:

 

7、rest framework的訪問頻率控制

如:限制某個用戶1分鐘只能訪問多少次

一、源碼流程

若是allow_request()返回True,表示能夠訪問,不然表示頻率過高,不能訪問

二、需求:對用戶登陸進行頻率控制

新建文件:

代碼:

import time
from rest_framework.throttling import BaseThrottle


VISIT_RECORD = {}  # 存儲用戶訪問記錄


class VisitThrotlle(BaseThrottle):
    """用戶登陸訪問頻率控制"""
    def __init__(self):
        self.history = None

    def allow_request(self, request, view):
        # 獲取用戶IP
        # remote_addr = request.META.get('REMOTE_ADDR')
        remote_addr = self.get_ident(request)  # 也能夠經過繼承父類方法來獲取IP
        
        ctime = time.time()  # 用戶訪問時間
        # 判斷用戶是否能夠訪問 若是該IP尚未訪問過,直接放行;
        # 若是該IP已經存在於訪問記錄中,判斷其訪問頻率是否達到上限
        if remote_addr not in VISIT_RECORD:
            VISIT_RECORD[remote_addr] = [ctime]
            return True
        self.history = VISIT_RECORD.get(remote_addr)  # 獲取訪問歷史時間列表
        # 當前訪問時間與訪問記錄中的時間進行比較,若是當前時間是在一分鐘以後訪問的,就刪掉訪問記錄中的時間
        while self.history and self.history[-1] < ctime-60:
            self.history.pop()
        # 控制一分鐘內容許訪問3次
        if len(self.history) < 3:
            self.history.insert(0, ctime)  # 將最近的一次訪問時間插入到列表第一個位置
            return True
        return False

    def wait(self):
        # 能夠返回None,也能夠返回時間,提示用戶還要等多少秒就能夠訪問了
        ctime = time.time()
        return 60 - (ctime - self.history[-1])
throtlle.py

在views.py中引入:

import hashlib
import time

from django.http import JsonResponse
from rest_framework.views import APIView

from api import models
from api.utils.permission import MyPermissionSvip, MyPermissionOrdinaryAndVip
from api.utils.throtlle import VisitThrotlle


def md5(user):
    """生成token"""
    ctime = str(time.time())  # 當前時間
    m = hashlib.md5(bytes(user, encoding="utf-8"))
    m.update(bytes(ctime, encoding="utf-8"))
    return m.hexdigest()


class AuthView(APIView):
    """登陸"""
    authentication_classes = []
    permission_classes = []
    throttle_classes = [VisitThrotlle]  # 訪問頻率控制

    def post(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None}
        try:
            # 從請求中獲取用戶登陸信息
            user = request._request.POST.get("username")
            pwd = request._request.POST.get("password")
            # 到數據庫獲取用戶信息
            user_obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
            # 若是獲取到對象則說明認證成功,爲登陸用戶建立token,若是認證失敗則返回錯誤信息
            if not user_obj:
                res["code"] = 1001
                res["msg"] = "用戶名或密碼錯誤"
            else:
                token = md5(user)
                # 將token存入數據庫:若是數據庫存在token就更新,不存在就建立
                models.UserToken.objects.update_or_create(user=user_obj, defaults={"token": token})
                res["token"] = token
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class OrderView(APIView):
    """訂單"""
    # authentication_classes = [Authtication]  # 認證類(局部)
    # permission_classes = [MyPermissionSvip,]  # 權限控制類(局部)

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            orders = models.Order.objects.all().values("id", "name", "price", "create_time", "user__username")
            res["data"] = list(orders)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)


class UserInfoView(APIView):
    """用戶中心"""
    # authentication_classes = [Authtication]
    permission_classes = [MyPermissionOrdinaryAndVip]

    def get(self, request, *args, **kwargs):
        res = {"code": 1000, "msg": None, "data": None}
        try:
            # print(request.user)  # 用戶對象
            # print(request.auth)  # 認證對象
            user = models.UserInfo.objects.all().values("id", "username", "password")
            res["data"] = list(user)
        except Exception as e:
            res["code"] = 1002
            res["msg"] = e
        return JsonResponse(res)
views.py

三、頻率控制也能夠作全局設置,方法與權限控制相同

四、內置控制頻率的類

在rest_framework的throttling.py中:

一、示例1

在throtlle.py中繼承SimpleRateThrottle類:

from rest_framework.throttling import BaseThrottle, SimpleRateThrottle


class VisitThrotlle(SimpleRateThrottle):
    """用戶登陸訪問頻率控制"""
    scope = "throtlle_rate"  # 定義一個key,從配置文件中獲取訪問頻次

    def get_cache_key(self, request, view):
        # 程序回去Django的緩存中獲取key,此時咱們重寫這個方法,給他返回一個用戶IP做爲key
        return self.get_ident(request)
throtlle.py

這個實現效果和上面本身寫的同樣。

二、示例2 

對登陸用戶作頻率控制

from rest_framework.throttling import BaseThrottle, SimpleRateThrottle


class UserThrotlle(SimpleRateThrottle):
    """對已登陸用戶進行訪問頻率控制"""
    scope = "user_throtlle_rate"  # 定義一個key,從配置文件中獲取訪問頻次

    def get_cache_key(self, request, view):
        # 返回一個用戶惟一標誌,如用戶名
        return request.user.username


class VisitThrotlle(SimpleRateThrottle):
    """匿名用戶登陸訪問頻率控制"""
    scope = "throtlle_rate"  # 定義一個key,從配置文件中獲取訪問頻次

    def get_cache_key(self, request, view):
        # 程序回去Django的緩存中獲取key,此時咱們重寫這個方法,給他返回一個用戶IP做爲key
        return self.get_ident(request)
throtlle.py

setting.py:

相關文章
相關標籤/搜索