class User(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) class Token(models.Model): user = models.OneToOneField("User", on_delete=models.CASCADE) token = models.CharField(max_length=128) def __str__(self): return self.token
添加後執行數據庫遷移,添加app01_user和app01_token表。python
urlpatterns = [ ... re_path(r'^login/$', views.LoginView.as_view(), name="login"), ]
def get_random_str(user): """ 生成隨機字符串 """ import hashlib, time ctime = str(time.time()) md5 = hashlib.md5(bytes(user, encoding='utf-8')) md5.update(bytes(ctime, encoding="utf-8")) return md5.hexdigest() class LoginView(APIView): def post(self, request): # 驗證邏輯:獲取用戶名密碼與數據庫比對 name = request.data.get("name") pwd = request.data.get("pwd") user = User.objects.filter(name=name, pwd=pwd).first() res = {"state_code": 1000, "msg": None} # 成功或失敗須要返回的字典標識此次的狀態 if user: # 經過校驗 拿到隨機字符串交給這我的 random_str = get_random_str(user.name) # 獲取隨機字符串 token = Token.objects.update_or_create(user=user, defaults={"token": random_str}) res["token"] = str(token) # json不能序列化對象,所以轉爲字符串 else: # 校驗失敗 res["state_code"]=1001 # 錯誤狀態碼 res["msg"] = "用戶名或密碼錯誤" return Response(json.dumps(res))
注意:數據庫
class LoginView(APIView): def post(self, request): # 驗證邏輯:獲取用戶名密碼與數據庫比對 name = request.data.get("name") pwd = request.data.get("pwd") user = User.objects.filter(name=name, pwd=pwd).first() if user: # 經過校驗 拿到隨機字符串交給這我的 pass else: # 校驗失敗 pass return Response("login....")
def get_random_str(user): """ 生成隨機字符串 """ import hashlib, time ctime = str(time.time()) md5 = hashlib.md5(bytes(user, encoding='utf-8')) md5.update(bytes(ctime, encoding="utf-8")) return md5.hexdigest()
注意:ctime=str(time.time()) ,世界上一直在變化的就是時間變量,所以ctime這個變量每一個都是不一樣的。django
hashlib.md5()構建md5對象。實例化md5時傳遞參數叫作加鹽:json
md5 = hashlib.md5(bytes(user, encoding='utf-8'))
md5.update()寫入要加密的字節:(這裏的md5是實例化出來的對象)api
md5.update(bytes(ctime, encoding="utf-8"))
md5_obj.hexdigest():獲取密文數組
return md5.hexdigest()
加鹽以後,即便要加密的數據徹底同樣,可是用戶名確定不同,所以產生的密文必定惟一。瀏覽器
Token.objects.update_or_create(user=user,defaults={"token":random_str})
用給定的**kwargs值查找對象(這裏是user=user),若是defaults不爲None則用defaults的值{"token":random_str}更新對象;若是爲None則建立一個新對象。服務器
class QuerySet: def update_or_create(self, defaults=None, **kwargs): """ Look up an object with the given kwargs, updating one with defaults if it exists, otherwise create a new one. Return a tuple (object, created), where created is a boolean specifying whether an object was created. """ defaults = defaults or {}
若是是create操做返回值是添加的數據,若是是update操做返回值是更新的條數。app
from rest_framework import exceptions class TokenAuth(object): # 這個類名能夠任意取 def authenticate(self, request): # 這個方法名不可變更 token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("驗證失敗!") # 若是有值 return兩個值中間加逗號,就構成了一個元組 return token_obj.user.name, token_obj.token # 元組:(關聯用戶對象的名字,當前登陸對象token) def authenticate_header(self, request): # 不加會報錯,要求要傳入兩個參數 pass class BookView(APIView): authentication_classes = [TokenAuth, ] def get(self, request): book_list = Book.objects.all() bs = BookModelSerializers(book_list, many=True, context={"request": request}) # 序列化結果 # return HttpResponse(bs.data) return Response(bs.data) def post(self, request): # POST請求的數據 bs = BookModelSerializers(data=request.data) if bs.is_valid(): # 驗證數據是否合格 print(bs.validated_data) bs.save() # create方法 return Response(bs.data) # 當前添加的數據 else: return Response(bs.errors)
每次請求都要執行dispatch.dom
在用戶訪問時執行APIView的dispatch方法,在dispatch進行分發操做前,須要先執行self.initial(request, *args, **kwargs),執行認證、權限、頻率操做。
def dispatch(self, request, *args, **kwargs): self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) # 構建新request self.request = request self.headers = self.default_response_headers # deprecate? try: self.initial(request, *args, **kwargs) # 認證權限頻率 # Get the appropriate handler method if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) self.response = self.finalize_response(request, response, *args, **kwargs) return self.response
def initial(self, request, *args, **kwargs): """代碼省略""" # Ensure that the incoming request is permitted # 認證組件 self.perform_authentication(request) # 權限組件 self.check_permissions(request) # 頻率組件 self.check_throttles(request)
def perform_authentication(self, request): request.user
因爲在dispatch函數中,self.initial(request, *args, **kwargs)晚於request = self.initialize_request(request, *args, **kwargs)。所以這裏的request是新構建的request。request.user即須要去Request類中尋找user靜態方法。
這個新request經過initialize_request方法返回Request類對象:
def initialize_request(self, request, *args, **kwargs): parser_context = self.get_parser_context(request) return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), negotiator=self.get_content_negotiator(), parser_context=parser_context )
@property def user(self): """ Returns the user associated with the current request, as authenticated by the authentication classes provided to the request.
當request已經經過認證類提供的認證,返回當前請求關聯的用戶 """ if not hasattr(self, '_user'): with wrap_attributeerrors(): self._authenticate() return self._user
注意:@property
裝飾器就是負責把一個方法變成屬性調用。未經過驗證的須要用self._authenticate()方法來進行校驗。
class Request(object): def __init__(self, request, parsers=None, authenticators=None, negotiator=None, parser_context=None): self._request = request self.parsers = parsers or () self.authenticators = authenticators or () # 若是是None返回空元組,若是有值返回authenticators """ print(3 and 0) # 0 print(0 and 2) # 0 print(0 or 1) # 1 print(4 or 1) # 4 """ def _authenticate(self): """ Attempt to authenticate the request using each authentication instance in turn. """ for authenticator in self.authenticators: try: user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated()
確認authenticators的來源是Request實例化時傳入的,
找到Request實例化的方法:initialize_request
class APIView(View): def initialize_request(self, request, *args, **kwargs): return Request( authenticators=self.get_authenticators(), )
再由此找到get_authenticators方法:
class APIView(View): def get_authenticators(self): """ Instantiates and returns the list of authenticators that this view can use. """ return [auth() for auth in self.authentication_classes]
authentication_classes就是咱們在視圖函數中定義的列表:
class BookView(APIView): authentication_classes = [TokenAuth, ] def get(self, request):... def post(self, request):...
[auth() for auth in self.authentication_classes]這個語句的含義:循環每個認證類並進行實例化,放在數組中。以[TokenAuth, ]爲例返回值是[TokenAuth(), ]。
所以回傳回去authenticators=[TokenAuth(), ]。
class Request(object): def __init__(self, request, parsers=None, authenticators=None, negotiator=None, parser_context=None): self._request = request self.parsers = parsers or () self.authenticators = authenticators or () # 若是是None返回空元組,若是有值返回authenticators """ print(3 and 0) # 0 print(0 and 2) # 0 print(0 or 1) # 1 print(4 or 1) # 4 """ def _authenticate(self): """ Attempt to authenticate the request using each authentication instance in turn. """ for authenticator in self.authenticators: # [TokenAuth(), ], authenticator:TokenAuth() try: user_auth_tuple = authenticator.authenticate(self) # TokenAuth必須有authenticate方法 # authenticator.authenticate(self):是一個實例對象調用本身的實例方法,本不須要傳self,這裏必定是傳的一個形參。 # 這個方法是在Request類中,追溯調用關係可知,這裏的self是一個新request對象 except exceptions.APIException: # 拋出錯誤 self._not_authenticated() # 沒有驗證成功 raise if user_auth_tuple is not None: # 若是user_auth_tuple有值 self._authenticator = authenticator self.user, self.auth = user_auth_tuple # 將元組的值賦給self.user和self.auth return self._not_authenticated() # 沒有驗證成功
authenticator.authenticate(self):是一個實例對象調用本身的實例方法,本不須要傳self,這裏必定是傳的一個形參。這個方法是在Request類中,追溯調用關係可知,這裏的self是一個新request對象。所以在構建自定義的TokenAuth時必定要在def authenticate(self, request): 添加request參數。
訪問時添加數據庫查到的token信息,驗證經過:
打印以前在_authenticate將元組的值賦給self.user和self.auth的值:
class BookView(APIView): authentication_classes = [TokenAuth, ] def get(self, request): print(request.user) # egon print(request.auth) # 02aee930be6011068e24f68935d52b02
能夠看到正好對應authoerticate函數的返回值:return token_obj.user.name, token_obj.token.
from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("驗證失敗!") # 若是有值 return兩個值中間加逗號,就構成了一個元組 return token_obj.user.name, token_obj.token # 元組:(關聯用戶對象的名字,當前登陸對象token) # def authenticate_header(self, request): # 不加會報錯,且要求要傳入兩個參數 # pass
BaseAuthentication包含authenticate和authenticate_header函數。默認用來被覆蓋。
若是沒有在局部定義authentication_classes=[TokenAuth, ]。回溯查找默認的authentication_classes。
class APIView(View): authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
api_settings.DEFAULT_AUTHENTICATION_CLASSES是類的實例對象.屬性的模式。當調用不存在的屬性時,Python會試圖調用__getattr__(self,attr)來獲取屬性,而且返回attr。
該語句在rest_framework/settings.py中。發現api_settings是一個實例對象。實例化時執行相應的init方法。
api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)
DEFAULTS這個值是一個字典,每個鍵後面都是跟着一個元組,保存了關於rest_framework全部默認配置。也定義在rest_framework/settings.py文件中。
class APISettings(object): def __init__(self, user_settings=None, defaults=None, import_strings=None): if user_settings: self._user_settings = self.__check_user_settings(user_settings) self.defaults = defaults or DEFAULTS self.import_strings = import_strings or IMPORT_STRINGS self._cached_attrs = set()
user_settings默認爲None,若是有值拿到user_settings.
self.defaults = defaults or DEFAULTS 拿到DEFAULTS字典值。
class APISettings(object): def __getattr__(self, attr): if attr not in self.defaults: raise AttributeError("Invalid API setting: '%s'" % attr) try: # Check if present in user settings val = self.user_settings[attr] except KeyError: # 當self.user_settings的值是一個空字典,取值報錯KeyError # Fall back to defaults val = self.defaults[attr] # 異常處理去取默認的DEFAULT值 # Coerce import strings into classes if attr in self.import_strings: val = perform_import(val, attr) # Cache the result self._cached_attrs.add(attr) setattr(self, attr, val) return val
__getattr__是python裏的一個內建函數,能夠很方便地動態返回一個屬性;當調用不存在的屬性時,Python會試圖調用__getattr__(self,item)來獲取屬性,而且返回item;
class Person(object): def __init__(self, name): self.name = name def __getattr__(self, item): print("item", item) def dream(self): print("dreaming。。。。。") alex = Person("alex") alex.yuan # 打印:item yuan
val = self.user_settings[attr]:user_settings執行的返回值後面加上[attr]
class APISettings(object): @property def user_settings(self): # 靜態方法 if not hasattr(self, '_user_settings'): self._user_settings = getattr(settings, 'REST_FRAMEWORK', {}) return self._user_settings
這裏的settings指的是restDemo項目中的restDemo/settings.py。
所以getattr(settings, 'REST_FRAMEWORK', {}) 表明的意思是:去settings.py中去拿REST_FRAMEWORK變量,若是拿不到則取一個空字典。所以self._user_settings必定是一個字典。
所以self.user_settings[attr]是在字典中取鍵attr的值.當字典爲空時,取不到值會拋出Keyerror錯誤,進行異常處理去取默認的DEFAULT字典中的值:
DEFAULTS = { """省略代碼""" 'DEFAULT_AUTHENTICATION_CLASSES': ( 'rest_framework.authentication.SessionAuthentication', 'rest_framework.authentication.BasicAuthentication' ), """省略代碼"""
REST_FRAMEWORK = { # 仿照DEFAULT配置認證類路徑 "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"] }
鍵必須是DEFAULT_AUTHENTICATION_CLASSES
因爲要指定認證類路徑所以要把以前寫的TokenAuth從views.py遷移到一個新文件(文件名自定義)中,這裏是:/app01/utils.py
from .models import * from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("驗證失敗!") # 若是有值 return兩個值中間加逗號,就構成了一個元組 return token_obj.user.name, token_obj.token # 元組:(關聯用戶對象的名字,當前登陸對象token)
class LoginView(APIView): authentication_classes = [] # 配置爲空,優先級高於全局認證,不用進行認證 def post(self, request):....
class APIView(View): def dispatch(self, request, *args, **kwargs): self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: self.initial(request, *args, **kwargs) """代碼省略"""
在dispatch中使用的initial()方法,包含了權限組件:
def initial(self, request, *args, **kwargs): """代碼省略""" # Ensure that the incoming request is permitted # 認證組件 self.perform_authentication(request) # 權限組件 self.check_permissions(request) # 頻率組件 self.check_throttles(request)
class APIView(View): def check_permissions(self, request): """ Check if the request should be permitted. Raises an appropriate exception if the request is not permitted. """ for permission in self.get_permissions(): # 循環的是[SVIPPermission(), ] permission是SVIPPermission()——權限實例 if not permission.has_permission(request, self): # 因而可知權限類必須有has_permission方法
# 經過權限認證什麼都不用作,不經過執行如下代碼 self.permission_denied( request, message=getattr(permission, 'message', None) )
這裏循環的是self.get_permissions():
1)查看get_permissions方法:
class APIView(View): def get_permissions(self): """ Instantiates and returns the list of permissions that this view requires. """ return [permission() for permission in self.permission_classes]
[permission() for permission in self.permission_classes]與認證組件徹底相似:循環每個權限類並進行實例化,放在數組中。
2)查看確認permission_classes的值:
class APIView(View):
# The following policies may be set at either globally, or per-view.
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
若是在本身的視圖類中定義了permission_classes(優先取當前視圖的局部配置) ,說明配置了局部視圖權限,就取本身定義的。
若是沒有定義:
因爲權限組件依然是用了api_settings這個APISettings類實例對象,實例化時執行__init__函數,所以也執行了user_settings靜態方法。
class APISettings(object): def __init__(self, user_settings=None, defaults=None, import_strings=None): if user_settings: self._user_settings = self.__check_user_settings(user_settings) self.defaults = defaults or DEFAULTS def user_settings(self): if not hasattr(self, '_user_settings'): self._user_settings = getattr(settings, 'REST_FRAMEWORK', {}) # 去settings.py中去拿REST_FRAMEWORK變量,若是拿不到則取一個空字典 return self._user_settings
這裏user_settings函數經過反射取settings下是否有配置‘REST_FRAMEWORK’,若是有配置即說明配置了全局視圖權限。self._user_settings取配置的值。
getattr(settings, 'REST_FRAMEWORK', {})裏的settings是從django引入過來的:
from django.conf import settings
若是沒有配置則self._user_settings是一個空字典,須要進一步分析api_settings.DEFAULT_PERMISSION_CLASSES:
APISettings類中包含__getattr__方法,當調用不存在的屬性時,Python會試圖調用__getattr__(self,item)來獲取屬性,而且返回item:
class APISettings(object): def __getattr__(self, attr): if attr not in self.defaults: raise AttributeError("Invalid API setting: '%s'" % attr) try: # Check if present in user settings val = self.user_settings[attr] # 全局權限視圖未設置時self.user_settings返回值是一個空字典,設置時取到全局配置 except KeyError: # 空字典取值報錯拋出異常 # Fall back to defaults val = self.defaults[attr] # 獲取DEFAULT中默認值 # Coerce import strings into classes if attr in self.import_strings: val = perform_import(val, attr) # Cache the result self._cached_attrs.add(attr) setattr(self, attr, val) return val
因爲全局權限視圖未未設置時self.user_settings返回值是一個空字典,所以取值會失敗,經過異常處理獲取DEFAULT默認配置中的值。
3)總結
self.get_permissions()的返回值是權限實例列表,如下面的示例爲例是[SVIPPermission(), ]。所以for循環拿到的permission是一個個權限實例:SVIPPermission()
所以若是permission.has_permission返回值是true,直接完成權限驗證;若是返回值是False,則返回沒有權限的提示。
修改models.py中user表結構:
class User(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) type_choice = ((1, "普通用戶"), (2, "VIP"), (3, "SVIP")) user_type = models.IntegerField(choices=type_choice, default=1)
修改後完成數據庫遷移。
from rest_framework import viewsets class SVIPPermission(object): # 超級用戶可用 def has_permission(self, request, view): username = request.user # 獲取前面認證過的信息:egon user_type = User.objects.filter(name=username).first().user_type if user_type == 3: # 經過權限認證 return True else: # 未經過權限認證 return False class AuthorViewSet(viewsets.ModelViewSet): # authentication_classes = [TokenAuth, ] permission_classes = [SVIPPermission, ] queryset = Author.objects.all() # 配置queryset:告知這個類此次處理的數據 serializer_class = AuthorModelSerializers # 告知處理用到的序列化組件
顯示效果:
class SVIPPermission(object): # 超級用戶可用 message = "只有超級用戶才能訪問" def has_permission(self, request, view): username = request.user # 獲取前面認證過的信息:egon user_type = User.objects.filter(name=username).first().user_type if user_type == 3: # 經過權限認證 return True else: # 未經過權限認證 return False
顯示效果:
from .models import * from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("驗證失敗!") # 若是有值 return兩個值中間加逗號,就構成了一個元組 return token_obj.user.name, token_obj.token # 元組:(關聯用戶對象的名字,當前登陸對象token) class SVIPPermission(object): # 超級用戶可用 message = "只有超級用戶才能訪問" def has_permission(self, request, view): username = request.user # 獲取前面認證過的信息:egon user_type = User.objects.filter(name=username).first().user_type if user_type == 3: # 經過權限認證 return True else: # 未經過權限認證 return False
REST_FRAMEWORK = { # 認證類路徑 "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"], # 權限類路徑 "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission"] }
class APIView(View): def dispatch(self, request, *args, **kwargs): try: self.initial(request, *args, **kwargs) def initial(self, request, *args, **kwargs): self.check_throttles(request) def check_throttles(self, request): """ Check if request should be throttled. Raises an appropriate exception if the request is throttled. """ for throttle in self.get_throttles(): if not throttle.allow_request(request, self): self.throttled(request, throttle.wait()) def get_throttles(self): return [throttle() for throttle in self.throttle_classes] # 頻率對象列表
直接將頻率限制類定義在utils.py中:
from rest_framework.throttling import BaseThrottle VISIT_RECORD={} class VisitThrottle(BaseThrottle): def __init__(self): self.history=None def allow_request(self,request,view): # 要求訪問站點的頻率不能超過1分鐘3次 remote_addr = request.META.get('REMOTE_ADDR') print(remote_addr) import time ctime=time.time() if remote_addr not in VISIT_RECORD: # 未訪問過 VISIT_RECORD[remote_addr]=[ctime,] return True history=VISIT_RECORD.get(remote_addr) self.history=history while history and history[-1]<ctime-60: history.pop() if len(history) < 3: # 未達到頻率限制 history.insert(0,ctime) return True else: return False def wait(self): import time ctime=time.time() return 60-(ctime-self.history[-1])
在視圖類中中配置局部頻率限制:
class AuthorViewSet(viewsets.ModelViewSet): # authentication_classes = [TokenAuth, ] # permission_classes = [SVIPPermission, ] throttle_classes = [VisitThrottle, ] queryset = Author.objects.all() # 配置queryset:告知這個類此次處理的數據 serializer_class = AuthorModelSerializers # 告知處理用到的序列化組件
注意:
request.META 是一個Python字典,包含了全部本次HTTP請求的Header信息,好比用戶IP地址和用戶Agent(一般是瀏覽器的名稱和版本號)。 注意,Header信息的完整列表取決於用戶所發送的Header信息和服務器端設置的Header信息。
在restDemo/settings.py中配置:
REST_FRAMEWORK = { # 認證類路徑 "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"], # 權限類路徑 "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission"], # 頻率類路徑 "DEFAULT_THROTTLE_CLASSES": ["app01.utils.VisitThrottle"] }
將頻率配置類修改成:
class VisitThrottle(SimpleRateThrottle): scope="visit_rate" def get_cache_key(self, request, view): return self.get_ident(request)
settings.py設置:
REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ['app01.utils.TokenAuth'], 'DEFAULT_PERMISSION_CLASSES': ['app01.utils.SVIPPermission'], 'DEFAULT_THROTTLE_CLASSES': ['app01.utils.VisitThrottle'], "DEFAULT_THROTTLE_RATES": { "visit_rate": "1/m", } }