目錄python
""" 系統:session認證 rest_framework.authentication.SessionAuthentication ajax請求經過認證: cookie中要攜帶 sessionid、csrftoken,請求頭中要攜帶 x-csrftoken 第三方:jwt認證 rest_framework_jwt.authentication.JSONWebTokenAuthentication ajax請求經過認證: 請求頭中要攜帶 authorization,值爲 jwt空格token 自定義:基於jwt、其它 1)自定義認證類,繼承BaseAuthentication(或其子類),重寫authenticate 2)authenticate中完成 拿到認證標識 auth 反解析出用戶 user 前兩步操做失敗 返回None => 遊客 前兩步操做成功 返回user,auth => 登陸用戶 注:若是在某個分支拋出異常,直接定義失敗 => 非法用戶 """
from rest_framework.exceptions import AuthenticationFailed import jwt from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication from rest_framework_jwt.authentication import jwt_decode_handler class JWTAuthentication(BaseJSONWebTokenAuthentication): # 自定義認證類,重寫authenticate方法 def authenticate(self, request): # 認證經過,返回user,auth # 認證失敗,返回None auth = request.META.get('HTTP_AUTH') # 前臺用auth攜帶token if not auth: return None try: payload = jwt_decode_handler(auth) # 出現jwt解析異常,直接拋出異常,表明非法用戶,也能夠返回None,做爲遊客處理 except jwt.ExpiredSignature: raise AuthenticationFailed('token已過時') except: raise AuthenticationFailed('token非法') user = self.authenticate_credentials(payload) return (user, auth)
from rest_framework.authentication import BaseAuthentication def authenticate(self, request): auth = 從request中獲得 user = 從auth中獲得 if not user: return None return user, auth
""" 系統: 1)AllowAny:容許全部用戶,校驗方法直接返回True 2)IsAuthenticated:只容許登陸用戶 必須request.user和request.user.is_authenticated都經過 3)IsAuthenticatedOrReadOnly:遊客只讀,登陸用戶無限制 get、option、head 請求無限制 前臺請求必須校驗 request.user和request.user.is_authenticated 4)IsAdminUser:是不是後臺用戶 校驗 request.user和request.user.is_staff is_staff(能夠登陸後臺管理系統的用戶) 自定義:基於auth的Group與Permission表 1)自定義權限類,繼承BasePermission,重寫has_permission 2)has_permission中完成 拿到登陸用戶 user <= request.user 校驗user的分組或是權限 前兩步操做失敗 返回False => 無權限 前兩步操做成功 返回True => 有權限 """
from rest_framework.permissions import BasePermission class AdminPermission(BasePermission): # 繼承BasePermission,重寫has_permission def has_permission(self, request, view): # 有權限,返回True # 無權限,返回False user = request.user if not user: return False # 用戶是 管理員 分組 (管理員分組是Group表中的一條自定義記錄) if not user.groups.filter(name='管理員'): return False # 登陸的用戶必須是自定義管理員分組成員 return True
""" 系統: 1)AnonRateThrottle:對同一IP遊客的限制 2)UserRateThrottle:對同一IP登陸用戶的限制 必須在settings.py中 'DEFAULT_THROTTLE_RATES': { 'user': '10/min', # 登陸的用戶一分鐘能夠訪問10次 'anon': '3/min', # 遊客一分鐘能夠訪問3次 } 在視圖類中: class TempAPIView(APIView): ... throttle_classes = [AnonRateThrottle, UserRateThrottle] 自定義:基於auth的Group與Permission表 1)自定義頻率類,繼承SimpleRateThrottle,重寫get_cache_key,明確scope SimpleRateThrottle已經幫咱們實現了 allow_request、wait 2)scope與settings.py的DEFAULT_THROTTLE_RATES配合使用 3)get_cache_key中完成 拿到限制信息 ident <= request中獲取 沒有限制信息 返回None => 不限制 有限制信息 返回限制信息字符串 => 有限制 """
from rest_framework.throttling import SimpleRateThrottle class ThreeMinRateThrottle(SimpleRateThrottle): scope = 'sms' def get_cache_key(self, request, view): # 對手機號頻率限制 ident = request.data.get('mobile') if not ident: # 爲發現限制條件,返回None表明不進行頻率限制 return None return self.cache_format % { 'scope': self.scope, 'ident': ident } # settings.py 配置文件 'DEFAULT_THROTTLE_RATES': { 'user': '10/min', # 登陸的用戶一分鐘能夠訪問10次 'anon': '3/min', # 遊客一分鐘能夠訪問3次 'sms': '1/min', }
from rest_framework.views import APIView from django.contrib import auth # views.py # 登陸成功後產生cookie,cookie中攜帶 sessionid、csrftoken class LoginSessionAPIView(APIView): # 登陸要禁用認證與權限 authentication_classes = [] permission_classes = [] def post(self, request, *args, **kwargs): username = request.data.get('username') password = request.data.get('password') if not (username and password): return Response('信息有誤') # user = models.User.objects.filter(username=username).first() # type: models.User # user.check_password(password) user = auth.authenticate(request, username=username, password=password) if not user: return Response('登陸失敗') auth.login(request, user=user) return Response('登陸成功') # postman 請求頭中要攜帶 x-csrftoken from rest_framework.permissions import IsAuthenticatedOrReadOnly from rest_framework.authentication import SessionAuthentication class CarModelViewSet(ModelViewSet): queryset = models.Car.objects.filter(is_delete=False) serializer_class = serializers.CarModelSerializer permission_classes = [IsAuthenticatedOrReadOnly] authentication_classes = [SessionAuthentication] def create(self, request, *args, **kwargs): return super().create(request, *args, **kwargs) def destroy(self, request, *args, **kwargs): obj = self.get_object() obj.is_delete = True obj.save() return Response('刪除成功') # urls.py # 遊客只能夠查看,登陸後能夠增刪改 url(r'^cars/$', views.CarModelViewSet.as_view({ 'get': 'list', 'post': 'create' }))
import re from utils.response import APIResponse from rest_framework_jwt.serializers import jwt_encode_handler, jwt_payload_handler class LoginJWTAPIView(APIView): authentication_classes = () permission_classes = () def post(self, request, *args, **kwargs): # username可能攜帶的不止是用戶名,可能仍是用戶的其它惟一標識 手機號 郵箱 username = request.data.get('username') password = request.data.get('password') # 若是username匹配上手機號正則 => 多是手機登陸 if re.match(r'1[3-9][0-9]{9}', username): try: # 手動經過 user 簽發 jwt-token user = models.User.objects.get(mobile=username) except: return APIResponse(1, '該手機未註冊') # 郵箱登陸 等 # 帳號登陸 else: try: # 手動經過 user 簽發 jwt-token user = models.User.objects.get(username=username) except: return APIResponse(1, '該帳號未註冊') # 得到用戶後,校驗密碼並簽發token if not user.check_password(password): return APIResponse(1, '密碼錯誤') payload = jwt_payload_handler(user) token = jwt_encode_handler(payload) return APIResponse(0, 'ok', results={ 'username': user.username, 'mobile': user.mobile, 'token': token }) # 自定義認證類 JWTAuthentication from rest_framework.permissions import IsAuthenticatedOrReadOnly from api.authentications import JWTAuthentication class CarV3ModelViewSet(ModelViewSet): authentication_classes = [JWTAuthentication] permission_classes = [IsAuthenticatedOrReadOnly] queryset = models.Car.objects.filter(is_delete=False) serializer_class = serializers.CarModelSerializer def destroy(self, request, *args, **kwargs): obj = self.get_object() obj.is_delete = True obj.save() return Response('刪除成功') # urls.py # 遊客只能夠查看,登陸後能夠增刪改 url(r'^v3/cars/$', views.CarV3ModelViewSet.as_view({ 'get': 'list', 'post': 'create' }))