restful知識點之三restframework認證-->權限-->頻率

 

認證、權限、頻率是層層遞進的關係

  權限業務時認證+權限前端

  頻率業務時:認證+權限+頻率數據庫

局部認證方式

 

from django.conf.urls import url,include
from django.contrib import admin
from api import views

urlpatterns = [
    # url(r'^admin/', admin.site.urls),
    url(r'^auth/', views.AuthView.as_view()),
    url(r'^books/', views.booksView.as_view()),
    url(r'^books_detail/(\d+)/$', views.book_detail.as_view()),
    url(r'^oderDetial/$', views.OderDetialView.as_view()),

]
urls.py

 

from rest_framework.views import APIView
from api.models import *
import uuid
from django.http import JsonResponse
from rest_framework.response import Response#渲染器
from api.util.Myserializer import BookSerializers
from rest_framework import exceptions#拋異常
from rest_framework.generics import GenericAPIView
from rest_framework.viewsets import GenericViewSet
from rest_framework.authentication import BaseAuthentication

# Create your views here.


ORDER_DICT = {
    1:{
        'name': "媳婦",
        'age':18,
        'gender':'',
        'content':'...'
    },
    2:{
        'name': "老狗",
        'age':19,
        'gender':'',
        'content':'...。。'
    },
}

class AuthView(APIView):
    def post(self,request):

        ret={'code':1000,'msg':None}
        # 從前端獲取用戶名密碼
        try:
            user=request._request.POST.get('username')
            pwd=request._request.POST.get('password')
        #     取數據庫校驗
            obj=User.objects.filter(name=user,pwd=pwd).first()

            if not obj:
                ret['code']=1001
                ret['msg']='用戶名密碼錯誤'
        #     登陸成功生成token寫入token表(若是有則更新,沒有則建立)
            token=str(uuid.uuid4())
            Token.objects.update_or_create(user=obj,defaults={'token':token})
            ret['token']=token
        except Exception as e:
            ret['code']=1002
            ret['msg']='請求異常'
        return JsonResponse(ret)

class Authtication(object):
    def authenticate(self,request):
        # 接收來自前端發來的token值
        token = request._request.GET.get('token')
#         從數據庫中查找
        token_obj=Token.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed('用戶認證失敗')
        # 在rest_framework內部會將兩個字段賦值給request,以供後續使用
        return (token_obj.user,token_obj)
    def authenticate_header(self,request):
        pass





class OderDetialView(APIView):
   authentication_classes = [Authtication,]
   def get(self,request):
       ret={'code':1000,'msg':None,'data':None}

       ret['data']=ORDER_DICT
       return JsonResponse(ret)
views.py

 

from django.db import models

# Create your models here.
from django.db import models

# Create your models here.

class User(models.Model):
    name=models.CharField(max_length=32)
    pwd=models.CharField(max_length=32)
    type_choices=((1,"普通用戶"),(2,"VIP"),(3,"SVIP"))
    user_type=models.IntegerField(choices=type_choices,default=1)


class Token(models.Model):
    user=models.OneToOneField("User")
    token = models.CharField(max_length=128)

    def __str__(self):
        return self.token
modesl.py

 

 

 

 

 

postman執行:django

  

 

全局認證方式

settings.py配置以下:api

REST_FRAMEWORK={
'DEFAULT_AUTHENTICATION_CLASSES':['app02.service.auth.Authtication',
]
}
from app02.models import *
from rest_framework import exceptions

#不繼承BaseAuthentication也能夠
class Authtication(object):
    def authenticate(self,request):
        token=request._request.GET.get('token')
        token_obj=UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise  exceptions.AuthenticationFailed('用戶認證失敗')
       #rest framework內部會將這兩個字段賦值給request,以供後續操做使用
        return (token_obj.user,token_obj)

    def authenticate_header(self, request):
            pass
app02.service.auth.py

 

 認證源碼分析流程:

 權限

class permission(object):
    def has_permission(self,request,view):
        if request.user.user_type !=2:
            return True
        return False
    
    
class OderDetialView(APIView):
   # authentication_classes = [Authtication,]
   permission_classes = [permission,]
   def get(self,request):
       ret={'code':1000,'msg':None,'data':None}
       print(request.user.user_type,
    'user表中填入的user_type類型,
    權限認證時從新封裝了新的request.user(user是數據庫關聯字段)
) if request.user.user_type==2: ret['data']=ORDER_DICT return JsonResponse(ret)

 頻率

import time
VISIT_RECORD = {}   # 格式是{id:[time2]}

# 訪問頻率類
class VisitThrottle(object):
    """60秒內只能訪問3次"""
    def __init__(self):
        self.history = None

    def allow_request(self, request, view):
        # 獲取用戶IP
        remote_addr = request.META.get('REMOTE_ADDR')
        ctime = time.time()
        print(remote_addr)
        if remote_addr not in VISIT_RECORD:  # 若是是第一次訪問,就存放訪問時間以及IP地址
            VISIT_RECORD[remote_addr] = [ctime,]   # 添加到VISIT_RECORD中
            return True
        history = VISIT_RECORD.get(remote_addr)  # 不是第一次訪問,先獲取記錄
        self.history = history
        print("111:",history)
        while history and history[-1] < ctime - 60:   # 若是最先一次訪問時間超過一分鐘,就刪掉 去掉history and  後把while改爲if,能夠實現同樣的功能
# 上一行代碼中while循環一直循環,若是列表history爲空,循環的時候都會報錯,由於找不到history[-1]這個值,因此要加上history,用來跳出循環,防止代碼出錯
            history.pop()

        if len(history) < 3:  # 不用寫else,若是不小於3,會有錯誤處理機制,直接拒絕訪問。
            history.insert(0, ctime)  # 按照索引插入元素
            return True
    def wait(self):
        ctime = time.time()
        return 60 - (ctime - self.history[-1])
相關文章
相關標籤/搜索