(1)視圖django
APIView的dispath(self, request, *args, **kwargs)
(2)任務分發api
dispath方法內 self.initial(request, *args, **kwargs) 進入三大認證
(3)認證組件app
self.perform_authentication(request)
(4)認證功能介紹ide
(1)其主要用來對用戶進行認證源碼分析
(2)檢驗用戶 ----> 普通遊客 | 合法用戶 | 非法用戶post
(3)遊客:表明認證經過 直接進入下一步(權限校驗)測試
(2)合法用戶:攜帶驗證信息進行驗證 驗證經過將用戶存儲進入request.user中 再次進行下一步校驗url
(3)非法用戶:攜帶驗證信息進行驗證 驗證失敗直接拋出異常 返回403權限問題spa
(1)第一步APIView/dispatch/initialrest
def initial(self, request, *args, **kwargs):# Ensure that the incoming request is permitted self.perform_authentication(request) # 認證功能 self.check_permissions(request) # 訪問權限功能 self.check_throttles(request) # 頻率功能
(2)第二步進入認證功能
def perform_authentication(self, request): request.user # 獲得一個方法屬性 get方法
PS: 若是是複製屬性 應該是request.user = XXX
(3)第三步進入request/user
@property def user(self): if not hasattr(self, '_user'): with wrap_attributeerrors(): self._authenticate() # 沒有用戶認證處用戶 return self._user # 若是有直接返回用戶
(4)第四步_authenticate
def _authenticate(self): # self.authenticators:一堆認證類產生序列化認證對象的集合體(列表) for authenticator in self.authenticators: # 循環遍歷集合體拿到一個個認證對象 try: ''' authenticator.authenticate:認證對象調用認證功能 user_auth_tuple:拿到登陸的用戶與用戶信息的返回值 調用改方法傳入兩個參數 self,request self:當前認證類對象 request:當前請求用戶 ''' user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: # 拋出異常表明認證失敗 self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator # 若是有返回值 將登陸用戶 與 驗證信息 保存在 request.user ,request.author self.user, self.auth = user_auth_tuple return # 若是返回值爲空 表明認證經過 可是沒有登陸用戶 與驗證信息 表示當前用戶爲遊客用戶 self._not_authenticated()
(1)全局settings文件配置
REST_FRAMEWORK = { # 認證類配置 'DEFAULT_AUTHENTICATION_CLASSES': [ 'api.authentications.MyAuthentication', # 自定義認證組件 ], }
(2)全局settings默認配置
# 認證類配置 'DEFAULT_AUTHENTICATION_CLASSES': [ 'rest_framework.authentication.SessionAuthentication', 'rest_framework.authentication.BasicAuthentication', ],
(3)自定義文件配置api/Authentication
from rest_framework.exceptions import AuthenticationFailed from rest_framework.authentication import BaseAuthentication from . import models class MyAuthentication(BaseAuthentication): def authenticate(self, request): ''' 前臺在請求頭攜帶認證信息, 且默認規範用 Authorization 字段攜帶認證信息, 後臺固定在請求對象的META字段中 HTTP_AUTHORIZATION 獲取 ''' auth = request.META.get('HTTP_AUTHORIZATION',None) # 能夠不攜帶認證信息 if auth is None: # 若是爲空則代表遊客 return None # 沒有登陸用戶 與 返回信息 auth_list = auth.split() # 設置認證小規則(兩段式) # 判斷用戶是否合法 if not (len(auth_list) == 2 and auth_list[0].lower() == 'auth'): raise AuthenticationFailed('認證信息有誤 非法用戶') # 非法的請求 直接拋出異常 # 解析合法的用戶 if not auth_list[1] == 'qwe.asd.zxc': # 假設經過解析'qwe.asd.zxc' 能夠解析處合法用戶 raise AuthenticationFailed('用戶校驗失敗 非法用戶') user_obj = models.User.objects.filter(username='admin').first() if not user_obj: raise AuthenticationFailed('用戶數據有誤 非法用戶') return (user_obj,None) # 合法將用戶返回 驗證信息不返回
(4)視圖層
from rest_framework.views import APIView from rest_framework.response import Response class TestAPIView(APIView): def get(self,request,*args,**kwargs): ''' 測試一:前段不輸入校驗信息 獲取用戶屬於匿名用戶 測試二:前段加入驗證信息 獲取用戶屬於解析用戶 admin ''' print(request.user) return Response( { 'status':0, 'msg':'測試成功' } )
(1)視圖
APIView的dispath(self, request, *args, **kwargs)
(2)任務分發
dispath方法內 self.initial(request, *args, **kwargs) 進入三大認證
(3)權限組件
self.check_permissions(request)
(1)源碼分析
def check_permissions(self, request): ''' get_permissions:產生權限對象的類 參數: self:產生權限的類的對象 request:用戶請求 self:views實例化產生對象 ''' for permission in self.get_permissions(): # 循環遍歷 ''' has_permission:返回值布爾值 ''' if not permission.has_permission(request, self): # 判斷布爾值 若是爲真則獲取全部權限 反之則爲假 self.permission_denied( request, message=getattr(permission, 'message', None) ) # PS:默認登陸用戶 與遊客有所有權限
(1)AllowAny:遊客與登陸用戶都含有權限
class AllowAny(BasePermission): def has_permission(self, request, view): return True
(2)IsAuthenticated:登陸用戶且驗證經過的用戶
class IsAuthenticated(BasePermission): def has_permission(self, request, view): return bool(request.user and request.user.is_authenticated)
(3)IsAdminUser:登陸用戶 且必須是後臺管理員
class IsAdminUser(BasePermission): def has_permission(self, request, view): return bool(request.user and request.user.is_staff)
(4)IsAuthenticatedOrReadOnly:只讀請求 或者合法用戶
(1)遊客:只讀
(2)登陸用戶:含有所有權限
class IsAuthenticatedOrReadOnly(BasePermission): def has_permission(self, request, view): return bool( request.method in SAFE_METHODS or request.user and request.user.is_authenticated )
(1)全局settings
REST_FRAMEWORK = { # 權限類配置 'DEFAULT_PERMISSION_CLASSES': [ 'rest_framework.permissions.AllowAny', ], }
(2)視圖層
from rest_framework.permissions import IsAuthenticated # 只有登陸用戶纔有權限 class TestAuthenticatedAPIView(APIView): permission_classes = [IsAuthenticated] # 使用 IsAuthenticated 不使用全局AllowAny def get(self, request, *args, **kwargs): return Response( { 'status': 0, 'msg': "登陸且合法用戶能夠訪問" } ) # 遊客只讀,登陸無限制 from rest_framework.permissions import IsAuthenticatedOrReadOnly class TestAuthenticatedOrReadOnlyAPIView(APIView): permission_classes = [IsAuthenticatedOrReadOnly] def get(self, request, *args, **kwargs): return Response({ 'status': 0, 'msg': '遊客只讀 登陸用戶無限制' }) def post(self, request, *args, **kwargs): return Response({ 'status': 0, 'msg': '遊客只讀 登陸用戶無限制' })
(1)api/permissions
from django.contrib.auth.models import Group from rest_framework.permissions import BasePermission from rest_framework.exceptions import AuthenticationFailed class MyPermission(BasePermission): def has_permission(self, request, view): print(self) # <api.permissions.MyPermission object at 0x0000000005673C50> 當前權限對象 print(request) # <api.permissions.MyPermission object at 0x0000000005673C50> 請求對象 print(view) # <api.views.TestAdminOrReadOnlyAPIView object at 0x0000000005673518> view對象 # 只讀接口 read_only = request.method in ('GET', 'HEAD', 'OPTIONS') # group爲有權限的分組 group = Group.objects.filter(name='管理員').first() # groups爲當前用戶所屬的全部分組 groups = request.user.groups.all() data1 = group and groups # group groups 必須有值 data2 = group in groups # 有權限分組 必須在全部分組中 # 讀接口你們都有權限,寫接口必須爲指定分組下的登錄用戶 return read_only or (data1 and data2)
(2)視圖層
from .permissions import MyPermission class TestAdminOrReadOnlyAPIView(APIView): permission_classes = [MyPermission] def get(self, request, *args, **kwargs): return Response( { 'status':0, 'msg':"遊客能夠讀取數據 可是不能寫數據" } ) def post(self, request, *args, **kwargs): return Response( { "status":0, 'msg':'管理員能夠讀寫' } )
(1)視圖
APIView的dispath(self, request, *args, **kwargs)
(2)任務分發
dispath方法內 self.initial(request, *args, **kwargs) 進入三大認證
(3)權限組件
self.check_throttles(request) # 頻率組件
for throttle in self.get_throttles(): if not throttle.allow_request(request, self): # 只要頻率限制了,allow_request 返回False了,纔會調用wait throttle_durations.append(throttle.wait()) if throttle_durations: # Filter out `None` values which may happen in case of config / rate # changes, see #1438 durations = [ duration for duration in throttle_durations if duration is not None ] duration = max(durations, default=None) self.throttled(request, duration)
PS:
(1)遍歷配置的頻率認證類,初始化獲得一個個頻率認證類對象(會調用頻率認證類的 __init__() 方法)
(2)頻率認證類對象調用 allow_request 方法,判斷是否限次(沒有限次可訪問,限次不可訪問)
(3)頻率認證類對象在限次後,調用 wait 方法,獲取還需等待多長時間能夠進行下一次訪問
注:頻率認證類都是繼承 SimpleRateThrottle 類
(1)全局settings文件配置
# 頻率限制條件配置 'DEFAULT_THROTTLE_RATES': { 'send_msg': '1/min' },
(2)api/
from rest_framework.throttling import SimpleRateThrottle class SendMsgRateThrottle(SimpleRateThrottle): scope = 'send_msg' # 自定義一個字符串 def get_cache_key(self, request, view): phone = request.query_params.get('phone') # 獲取前段get拼接值 if not phone: # 若是 沒有值 說明無任何訪問限制 return None # 按照源碼格式來 固定返回None # 按照源碼格式 此處以手機號做爲身份標識 return 'throttle_%(scope)s_%(ident)s' % {'scope': self.scope, 'ident': phone}
(3)路由層
url(r'^send_msg/$', views.TestThrottles.as_view()),
(4)視圖層
from .throttles import SendMsgRateThrottle # 導入自定義的頻率類 class TestThrottles(APIView): throttle_classes = [SendMsgRateThrottle] # 局部配置 def get(self, request, *args, **kwargs): return Response({ 'status': 0, 'msg': '訪問頻率get方式測試成功' }) def post(self, request, *args, **kwargs): return Response({ 'status': 0, 'msg': '訪問頻率post方式測試成功' })
PS:
(1)對api/send_msg/?phone = xxx 有限制
# 例如 http://127.0.0.1:8000/api/send_msg/?phone=1234654646
(2)對 /api/send_msg/ 或其餘接口發送無限制 (3)對數據包提交phone的/api/phone/接口無限制 (4)對不是phone(如mobile)字段提交的電話接口無限制