做用:校驗是否登陸html
from rest_framework.authentication import BaseAuthentication class myAuthen(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') ret = models.UserToken.objects.filter(token=token).first() if ret: # return ret.user, ret # 要寫多個認證類,這個地返回None # 最後一個認證類,返回這倆值 return ret.user, ret else: raise AuthenticationFailed('您沒有登錄')
authentication_classes = [myAuthen, ]
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",] }
認證類使用順序:先用視圖類中的驗證類,再用settings裏配置的驗證類,最後用默認的驗證類python
class User(models.Model): username=models.CharField(max_length=32) password=models.CharField(max_length=32) user_type=models.IntegerField(choices=((1,'超級用戶'),(2,'普通用戶'),(3,'二筆用戶'))) class UserToken(models.Model): user=models.OneToOneField(to='User') token=models.CharField(max_length=64)
from rest_framework.authentication import BaseAuthentication class TokenAuth(): def authenticate(self, request): token = request.GET.get('token') token_obj = models.UserToken.objects.filter(token=token).first() if token_obj: return else: raise AuthenticationFailed('認證失敗') def authenticate_header(self,request): pass
(1) 登陸..... def get_token(username): import hashlib import time md = hashlib.md5() # update內必須傳bytes格式 md.update(username.encode('utf-8')) md.update(str(time.time()).encode('utf-8')) return md.hexdigest() #認證 class Login(APIView): authentication_classes = [ ] def post(self, request): response = MyResponse() name = request.data.get('name') pwd = request.data.get('pwd') user = models.User.objects.filter(username=name, password=pwd).first() if user: response.msg = '登錄成功' # 登錄成功,返回一個隨機字符串,之後在發請求,都攜帶這個字符串 token = get_token(name) response.token = token # 把隨機字符串保存到數據庫,有就更新,沒有就建立 # ret=models.UserToken.objects.update_or_create(user_id=user.id,kwargs={'token':token}) ret = models.UserToken.objects.update_or_create(user=user, defaults={'token': token}) else: response.msg = '用戶名或密碼錯誤' response.status = 101 return JsonResponse(response.get_dic,safe=False) (2) 登陸後 攜帶token 認證查詢數據.... from app01.myauth import myAuthen class Stus(APIView): authentication_classes = [myAuthen,] def get(self,request): response = MyResponse() #認證成功以後,能夠去到用戶名稱,及token print(request.user.username) print(request.auth.token) stus = models.Student.objects.all() ret = myserial.StrSer(instance=stus,many=True) response.msg = '查詢成功' response.data = ret.data return JsonResponse(response.get_dic,safe=False)
from rest_framework.authentication import BaseAuthentication #拋出異常,捕捉 from rest_framework.exceptions import AuthenticationFailed from app01 import models class myAuthen(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') ret = models.UserToken.objects.filter(token=token).first() if ret: # return ret.user, ret # 要寫多個認證類,這個地返回None # 最後一個認證類,返回這倆值 return ret.user, ret else: raise AuthenticationFailed('您沒有登錄')
附:不存數據庫的token驗證 就是經過某種的加密校驗規則來驗證數據庫
def get_token(id,salt='123'): import hashlib md=hashlib.md5() md.update(bytes(str(id),encoding='utf-8')) md.update(bytes(salt,encoding='utf-8')) return md.hexdigest()+'|'+str(id) def check_token(token,salt='123'): ll=token.split('|') import hashlib md=hashlib.md5() md.update(bytes(ll[-1],encoding='utf-8')) md.update(bytes(salt,encoding='utf-8')) if ll[0]==md.hexdigest(): return True else: return False class TokenAuth(): def authenticate(self, request): token = request.GET.get('token') success=check_token(token) if success: return else: raise AuthenticationFailed('認證失敗') def authenticate_header(self,request): pass class Login(APIView): def post(self,reuquest): back_msg={'status':1001,'msg':None} try: name=reuquest.data.get('name') pwd=reuquest.data.get('pwd') user=models.User.objects.filter(username=name,password=pwd).first() if user: token=get_token(user.pk) # models.UserToken.objects.update_or_create(user=user,defaults={'token':token}) back_msg['status']='1000' back_msg['msg']='登陸成功' back_msg['token']=token else: back_msg['msg'] = '用戶名或密碼錯誤' except Exception as e: back_msg['msg']=str(e) return Response(back_msg) from rest_framework.authentication import BaseAuthentication class TokenAuth(): def authenticate(self, request): token = request.GET.get('token') token_obj = models.UserToken.objects.filter(token=token).first() if token_obj: return else: raise AuthenticationFailed('認證失敗') def authenticate_header(self,request): pass class Course(APIView): authentication_classes = [TokenAuth, ] def get(self, request): return HttpResponse('get') def post(self, request): return HttpResponse('post')
全局使用 在setting中添加app
REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",] }
#Request對象的user方法 @property def user(self): the authentication classes provided to the request. if not hasattr(self, '_user'): with wrap_attributeerrors(): self._authenticate() return self._user def _authenticate(self): for authenticator in self.authenticators: try: user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise #認證成功,能夠返回一個元組,但必須是最後一個驗證類才能返回 if user_auth_tuple is not None: self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated()
self.authenticatorside
def get_authenticators(self): return [auth() for auth in self.authentication_classes]
連接源碼分析