複習
"""
頻率組件:限制接口的訪問頻率
源碼分析:初始化方法、判斷是否有權限方法、計數等待時間方法
自定義頻率組件:
class MyThrottle(SimpleRateThrottle):
scope = 'sms'
def get_cache_key(self, request, view):
# 從request的 query_params、data、META 及 view 中 獲取限制的條件
return '與認證信息有關的動態字符串'
settings文件中要有scope對應的rate配置 {'sms': '3/min'}
jwt認證:
1)session存儲token,須要數據庫參與,耗服務器資源、低效
2)緩存存token,須要緩存參與,高效,不易集羣
3)客戶端存token,服務器存簽發與交易token的算法,高效,易集羣
drf-jwt使用:
安裝:djangorestframework-jwt
視圖類簽發token - username,password => token
認證類校驗token - token => user
jwt格式:基本信息(頭base64).用戶信息過時時間(載荷base64).全部信息+祕鑰(簽名HS256)
"""
今日
"""
一、drf-jwt手動簽發與校驗
二、drf小組件:過濾、篩選、排序、分頁 => 針對與羣查接口
"""
簽發token
源碼入口
# 前提:給一個局部禁用了全部 認證與權限 的視圖類發送用戶信息獲得token,其實就是登陸接口
# 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父類 JSONWebTokenAPIView 的 post 方法
# 接受有username、password的post請求
# 2)post方法將請求數據交給 rest_framework_jwt.serializer.JSONWebTokenSerializer 處理
# 完成數據的校驗,會走序列化類的 全局鉤子校驗規則,校驗獲得登陸用戶並簽發token存儲在序列化對象中
核心源碼:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs):
# 帳號密碼字典
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# 簽發token第1步:用帳號密碼獲得user對象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
# 簽發token第2步:經過user獲得payload,payload包含着用戶信息與過時時間
payload = jwt_payload_handler(user)
# 在視圖類中,能夠經過 序列化對象.object.get('user'或者'token') 拿到user和token
return {
# 簽發token第3步:經過payload簽發出token
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
手動簽發token邏輯
# 1)經過username、password獲得user對象
# 2)經過user對象生成payload:jwt_payload_handler(user) => payload
# from rest_framework_jwt.serializers import jwt_payload_handler
# 3)經過payload簽發token:jwt_encode_handler(payload) => token
# from rest_framework_jwt.serializers import jwt_encode_handler
校驗token
源碼入口
# 前提:訪問一個配置了jwt認證規則的視圖類,就須要提交認證字符串token,在認證類中完成token的校驗
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父類 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 請求頭拿認證信息jwt-token => 經過反爬小規則肯定有用的token => payload => user
核心源碼:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 帶有反爬小規則的獲取token:前臺必須按 "jwt token字符串" 方式提交
# 校驗user第1步:從請求頭 HTTP_AUTHORIZATION 中拿token,並提取
jwt_value = self.get_jwt_value(request)
# 遊客
if jwt_value is None:
return None
# 校驗
try:
# 校驗user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校驗user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
手動校驗token邏輯
# 1)從請求頭中獲取token
# 2)根據token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根據payload解析出user:self.authenticate_credentials(payload) => user
# 繼承drf-jwt的BaseJSONWebTokenAuthentication,拿到父級的authenticate_credentials方法
案例:實現多方式登錄簽發token
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = 'api_user'
verbose_name = '用戶表'
verbose_name_plural = verbose_name
def __str__(self):
return self.username
serializers.py
from rest_framework import serializers
from . import models
import re
# 拿到前臺token的兩個函數: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) 前臺提交多種登陸信息都採用一個key,因此後臺能夠自定義反序列化字段進行對應
# 2) 序列化類要處理序列化與反序列化,要在fields中設置model綁定的Model類全部使用到的字段
# 3) 區分序列化字段與反序列化字段 read_only | write_only
# 4) 在自定義校驗規則中(局部鉤子、全局鉤子)校驗數據是否合法、肯定登陸的用戶、根據用戶簽發token
# 5) 將登陸的用戶與簽發的token保存在序列化類對象中
class UserModelSerializer(serializers.ModelSerializer):
# 自定義反序列字段:必定要設置write_only,只參與反序列化,不會與model類字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username', 'mobile', 'email']
# 系統校驗規則
extra_kwargs = {
'username': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
},
}
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# 多方式登陸:各分支處理獲得該方式下對應的用戶
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# 簽發:獲得登陸用戶,簽發token並存儲在實例化對象中
if user_obj and user_obj.check_password(pwd):
# 簽發token,將token存放到 實例化類對象的token 名字中
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# 將當前用戶與簽發的token都保存在序列化對象中
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({'data': '數據有誤'})
views.py
#實現多方式登錄簽發token:帳號、手機號、郵箱等登錄
# 1) 禁用認證與權限組件
# 2) 拿到前臺登陸信息,交給序列化類
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
# 4) 取出登陸用戶與token返回給前臺
import re
from . import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) 禁用認證與權限組件
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
# 2) 拿到前臺登陸信息,交給序列化類,規則:帳號用usr傳,密碼用pwd傳
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
user_ser.is_valid(raise_exception=True)
# 4) 取出登陸用戶與token返回給前臺
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
# "一根筋" 思考方式:全部邏輯都在視圖類中處理
def my_post(self, request, *args, **kwargs):
usr = request.data.get('usr')
pwd = request.data.get('pwd')
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={'username': user_obj.username}, token=token)
return APIResponse(data_msg='不可控錯誤')
案例:自定義認證反爬規則的認證類
authentications.py
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION')
# 自定義校驗規則:auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed('token已過時')
except:
raise AuthenticationFailed('非法用戶')
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# 自定義校驗規則:auth token jwt,auth爲前鹽,jwt爲後鹽
def parse_jwt_token(self, jwt_token):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt':
return None
return tokens[1]
views.py
from rest_framework.views import APIView
from utils.response import APIResponse
# 必須登陸後才能訪問 - 經過了認證權限組件
from rest_framework.permissions import IsAuthenticated
# 自定義jwt校驗規則
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
admin使用自定義User表:新增用戶密碼密文
from django.contrib import admin
from . import models
# 自定義User表,admin後臺管理,採用密文密碼
from django.contrib.auth.admin import UserAdmin
class MyUserAdmin(UserAdmin):
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('username', 'password1', 'password2', 'mobile', 'email'),
}),
)
admin.site.register(models.User, MyUserAdmin)
羣查接口各類篩選組件數據準備
models.py
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name='車名')
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='價格')
brand = models.CharField(max_length=16, verbose_name='品牌')
class Meta:
db_table = 'api_car'
verbose_name = '汽車表'
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.py
admin.site.register(models.Car)
serializers.py
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = ['name', 'price', 'brand']
views.py
# Car的羣查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
urls.py
url(r'^cars/$', views.CarListAPIView.as_view()),
drf搜索過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索過濾
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter過濾類依賴的過濾條件 => 接口:/cars/?search=...
search_fields = ['name', 'price']
# eg:/cars/?search=1,name和price中包含1的數據都會被查詢出
drf排序過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序過濾
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter過濾類依賴的過濾條件 => 接口:/cars/?ordering=...
ordering_fields = ['pk', 'price']
# eg:/cars/?ordering=-price,pk,先按price降序,若是出現price相同,再按pk升序
drf基礎分頁組件
pahenations.py
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=頁碼
page_query_param = 'page'
# ?page=頁面 下默認一頁顯示的條數
page_size = 3
# ?page=頁面&page_size=條數 用戶自定義一頁顯示的條數
page_size_query_param = 'page_size'
# 用戶自定義一頁顯示的條數最大限制:數值超過5也只顯示5條
max_page_size = 5
views.py
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 若是queryset沒有過濾條件,就必須 .all(),否則分頁會出問題
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分頁組件 - 給視圖類配置分頁類便可 - 分頁類須要自定義,繼承drf提供的分頁類便可
pagination_class = pagenations.MyPageNumberPagination
複習
"""
頻率組件:限制接口的訪問頻率
源碼分析:初始化方法、判斷是否有權限方法、計數等待時間方法
自定義頻率組件:
class MyThrottle(SimpleRateThrottle):
scope = 'sms'
def get_cache_key(self, request, view):
# 從request的 query_params、data、META 及 view 中 獲取限制的條件
return '與認證信息有關的動態字符串'
settings文件中要有scope對應的rate配置 {'sms': '3/min'}
jwt認證:
1)session存儲token,須要數據庫參與,耗服務器資源、低效
2)緩存存token,須要緩存參與,高效,不易集羣
3)客戶端存token,服務器存簽發與交易token的算法,高效,易集羣
drf-jwt使用:
安裝:djangorestframework-jwt
視圖類簽發token - username,password => token
認證類校驗token - token => user
jwt格式:基本信息(頭base64).用戶信息過時時間(載荷base64).全部信息+祕鑰(簽名HS256)
"""
今日
"""
一、drf-jwt手動簽發與校驗
二、drf小組件:過濾、篩選、排序、分頁 => 針對與羣查接口
"""
簽發token
源碼入口
# 前提:給一個局部禁用了全部 認證與權限 的視圖類發送用戶信息獲得token,其實就是登陸接口
# 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父類 JSONWebTokenAPIView 的 post 方法
# 接受有username、password的post請求
# 2)post方法將請求數據交給 rest_framework_jwt.serializer.JSONWebTokenSerializer 處理
# 完成數據的校驗,會走序列化類的 全局鉤子校驗規則,校驗獲得登陸用戶並簽發token存儲在序列化對象中
核心源碼:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs):
# 帳號密碼字典
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# 簽發token第1步:用帳號密碼獲得user對象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
# 簽發token第2步:經過user獲得payload,payload包含着用戶信息與過時時間
payload = jwt_payload_handler(user)
# 在視圖類中,能夠經過 序列化對象.object.get('user'或者'token') 拿到user和token
return {
# 簽發token第3步:經過payload簽發出token
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
手動簽發token邏輯
# 1)經過username、password獲得user對象
# 2)經過user對象生成payload:jwt_payload_handler(user) => payload
# from rest_framework_jwt.serializers import jwt_payload_handler
# 3)經過payload簽發token:jwt_encode_handler(payload) => token
# from rest_framework_jwt.serializers import jwt_encode_handler
校驗token
源碼入口
# 前提:訪問一個配置了jwt認證規則的視圖類,就須要提交認證字符串token,在認證類中完成token的校驗
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父類 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 請求頭拿認證信息jwt-token => 經過反爬小規則肯定有用的token => payload => user
核心源碼:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 帶有反爬小規則的獲取token:前臺必須按 "jwt token字符串" 方式提交
# 校驗user第1步:從請求頭 HTTP_AUTHORIZATION 中拿token,並提取
jwt_value = self.get_jwt_value(request)
# 遊客
if jwt_value is None:
return None
# 校驗
try:
# 校驗user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校驗user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
手動校驗token邏輯
# 1)從請求頭中獲取token
# 2)根據token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根據payload解析出user:self.authenticate_credentials(payload) => user
# 繼承drf-jwt的BaseJSONWebTokenAuthentication,拿到父級的authenticate_credentials方法
案例:實現多方式登錄簽發token
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = 'api_user'
verbose_name = '用戶表'
verbose_name_plural = verbose_name
def __str__(self):
return self.username
serializers.py
from rest_framework import serializers
from . import models
import re
# 拿到前臺token的兩個函數: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) 前臺提交多種登陸信息都採用一個key,因此後臺能夠自定義反序列化字段進行對應
# 2) 序列化類要處理序列化與反序列化,要在fields中設置model綁定的Model類全部使用到的字段
# 3) 區分序列化字段與反序列化字段 read_only | write_only
# 4) 在自定義校驗規則中(局部鉤子、全局鉤子)校驗數據是否合法、肯定登陸的用戶、根據用戶簽發token
# 5) 將登陸的用戶與簽發的token保存在序列化類對象中
class UserModelSerializer(serializers.ModelSerializer):
# 自定義反序列字段:必定要設置write_only,只參與反序列化,不會與model類字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username', 'mobile', 'email']
# 系統校驗規則
extra_kwargs = {
'username': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
},
}
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# 多方式登陸:各分支處理獲得該方式下對應的用戶
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# 簽發:獲得登陸用戶,簽發token並存儲在實例化對象中
if user_obj and user_obj.check_password(pwd):
# 簽發token,將token存放到 實例化類對象的token 名字中
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# 將當前用戶與簽發的token都保存在序列化對象中
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({'data': '數據有誤'})
views.py
#實現多方式登錄簽發token:帳號、手機號、郵箱等登錄
# 1) 禁用認證與權限組件
# 2) 拿到前臺登陸信息,交給序列化類
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
# 4) 取出登陸用戶與token返回給前臺
import re
from . import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) 禁用認證與權限組件
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
# 2) 拿到前臺登陸信息,交給序列化類,規則:帳號用usr傳,密碼用pwd傳
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
user_ser.is_valid(raise_exception=True)
# 4) 取出登陸用戶與token返回給前臺
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
# "一根筋" 思考方式:全部邏輯都在視圖類中處理
def my_post(self, request, *args, **kwargs):
usr = request.data.get('usr')
pwd = request.data.get('pwd')
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={'username': user_obj.username}, token=token)
return APIResponse(data_msg='不可控錯誤')
案例:自定義認證反爬規則的認證類
authentications.py
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION')
# 自定義校驗規則:auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed('token已過時')
except:
raise AuthenticationFailed('非法用戶')
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# 自定義校驗規則:auth token jwt,auth爲前鹽,jwt爲後鹽
def parse_jwt_token(self, jwt_token):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt':
return None
return tokens[1]
views.py
from rest_framework.views import APIView
from utils.response import APIResponse
# 必須登陸後才能訪問 - 經過了認證權限組件
from rest_framework.permissions import IsAuthenticated
# 自定義jwt校驗規則
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
admin使用自定義User表:新增用戶密碼密文
from django.contrib import admin
from . import models
# 自定義User表,admin後臺管理,採用密文密碼
from django.contrib.auth.admin import UserAdmin
class MyUserAdmin(UserAdmin):
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('username', 'password1', 'password2', 'mobile', 'email'),
}),
)
admin.site.register(models.User, MyUserAdmin)
羣查接口各類篩選組件數據準備
models.py
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name='車名')
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='價格')
brand = models.CharField(max_length=16, verbose_name='品牌')
class Meta:
db_table = 'api_car'
verbose_name = '汽車表'
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.py
admin.site.register(models.Car)
serializers.py
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = ['name', 'price', 'brand']
views.py
# Car的羣查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
urls.py
url(r'^cars/$', views.CarListAPIView.as_view()),
drf搜索過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索過濾
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter過濾類依賴的過濾條件 => 接口:/cars/?search=...
search_fields = ['name', 'price']
# eg:/cars/?search=1,name和price中包含1的數據都會被查詢出
drf排序過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序過濾
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter過濾類依賴的過濾條件 => 接口:/cars/?ordering=...
ordering_fields = ['pk', 'price']
# eg:/cars/?ordering=-price,pk,先按price降序,若是出現price相同,再按pk升序
drf基礎分頁組件
pahenations.py
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=頁碼
page_query_param = 'page'
# ?page=頁面 下默認一頁顯示的條數
page_size = 3
# ?page=頁面&page_size=條數 用戶自定義一頁顯示的條數
page_size_query_param = 'page_size'
# 用戶自定義一頁顯示的條數最大限制:數值超過5也只顯示5條
max_page_size = 5
views.py
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 若是queryset沒有過濾條件,就必須 .all(),否則分頁會出問題
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分頁組件 - 給視圖類配置分頁類便可 - 分頁類須要自定義,繼承drf提供的分頁類便可
pagination_class = pagenations.MyPageNumberPagination
複習
"""
頻率組件:限制接口的訪問頻率
源碼分析:初始化方法、判斷是否有權限方法、計數等待時間方法
自定義頻率組件:
class MyThrottle(SimpleRateThrottle):
scope = 'sms'
def get_cache_key(self, request, view):
# 從request的 query_params、data、META 及 view 中 獲取限制的條件
return '與認證信息有關的動態字符串'
settings文件中要有scope對應的rate配置 {'sms': '3/min'}
jwt認證:
1)session存儲token,須要數據庫參與,耗服務器資源、低效
2)緩存存token,須要緩存參與,高效,不易集羣
3)客戶端存token,服務器存簽發與交易token的算法,高效,易集羣
drf-jwt使用:
安裝:djangorestframework-jwt
視圖類簽發token - username,password => token
認證類校驗token - token => user
jwt格式:基本信息(頭base64).用戶信息過時時間(載荷base64).全部信息+祕鑰(簽名HS256)
"""
今日
"""
一、drf-jwt手動簽發與校驗
二、drf小組件:過濾、篩選、排序、分頁 => 針對與羣查接口
"""
簽發token
源碼入口
# 前提:給一個局部禁用了全部 認證與權限 的視圖類發送用戶信息獲得token,其實就是登陸接口
# 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父類 JSONWebTokenAPIView 的 post 方法
# 接受有username、password的post請求
# 2)post方法將請求數據交給 rest_framework_jwt.serializer.JSONWebTokenSerializer 處理
# 完成數據的校驗,會走序列化類的 全局鉤子校驗規則,校驗獲得登陸用戶並簽發token存儲在序列化對象中
核心源碼:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs):
# 帳號密碼字典
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# 簽發token第1步:用帳號密碼獲得user對象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
# 簽發token第2步:經過user獲得payload,payload包含着用戶信息與過時時間
payload = jwt_payload_handler(user)
# 在視圖類中,能夠經過 序列化對象.object.get('user'或者'token') 拿到user和token
return {
# 簽發token第3步:經過payload簽發出token
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
手動簽發token邏輯
# 1)經過username、password獲得user對象
# 2)經過user對象生成payload:jwt_payload_handler(user) => payload
# from rest_framework_jwt.serializers import jwt_payload_handler
# 3)經過payload簽發token:jwt_encode_handler(payload) => token
# from rest_framework_jwt.serializers import jwt_encode_handler
校驗token
源碼入口
# 前提:訪問一個配置了jwt認證規則的視圖類,就須要提交認證字符串token,在認證類中完成token的校驗
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父類 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 請求頭拿認證信息jwt-token => 經過反爬小規則肯定有用的token => payload => user
核心源碼:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 帶有反爬小規則的獲取token:前臺必須按 "jwt token字符串" 方式提交
# 校驗user第1步:從請求頭 HTTP_AUTHORIZATION 中拿token,並提取
jwt_value = self.get_jwt_value(request)
# 遊客
if jwt_value is None:
return None
# 校驗
try:
# 校驗user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校驗user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
手動校驗token邏輯
# 1)從請求頭中獲取token
# 2)根據token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根據payload解析出user:self.authenticate_credentials(payload) => user
# 繼承drf-jwt的BaseJSONWebTokenAuthentication,拿到父級的authenticate_credentials方法
案例:實現多方式登錄簽發token
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = 'api_user'
verbose_name = '用戶表'
verbose_name_plural = verbose_name
def __str__(self):
return self.username
serializers.py
from rest_framework import serializers
from . import models
import re
# 拿到前臺token的兩個函數: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) 前臺提交多種登陸信息都採用一個key,因此後臺能夠自定義反序列化字段進行對應
# 2) 序列化類要處理序列化與反序列化,要在fields中設置model綁定的Model類全部使用到的字段
# 3) 區分序列化字段與反序列化字段 read_only | write_only
# 4) 在自定義校驗規則中(局部鉤子、全局鉤子)校驗數據是否合法、肯定登陸的用戶、根據用戶簽發token
# 5) 將登陸的用戶與簽發的token保存在序列化類對象中
class UserModelSerializer(serializers.ModelSerializer):
# 自定義反序列字段:必定要設置write_only,只參與反序列化,不會與model類字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username', 'mobile', 'email']
# 系統校驗規則
extra_kwargs = {
'username': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
},
}
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# 多方式登陸:各分支處理獲得該方式下對應的用戶
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# 簽發:獲得登陸用戶,簽發token並存儲在實例化對象中
if user_obj and user_obj.check_password(pwd):
# 簽發token,將token存放到 實例化類對象的token 名字中
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# 將當前用戶與簽發的token都保存在序列化對象中
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({'data': '數據有誤'})
views.py
#實現多方式登錄簽發token:帳號、手機號、郵箱等登錄
# 1) 禁用認證與權限組件
# 2) 拿到前臺登陸信息,交給序列化類
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
# 4) 取出登陸用戶與token返回給前臺
import re
from . import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) 禁用認證與權限組件
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
# 2) 拿到前臺登陸信息,交給序列化類,規則:帳號用usr傳,密碼用pwd傳
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
user_ser.is_valid(raise_exception=True)
# 4) 取出登陸用戶與token返回給前臺
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
# "一根筋" 思考方式:全部邏輯都在視圖類中處理
def my_post(self, request, *args, **kwargs):
usr = request.data.get('usr')
pwd = request.data.get('pwd')
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={'username': user_obj.username}, token=token)
return APIResponse(data_msg='不可控錯誤')
案例:自定義認證反爬規則的認證類
authentications.py
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION')
# 自定義校驗規則:auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed('token已過時')
except:
raise AuthenticationFailed('非法用戶')
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# 自定義校驗規則:auth token jwt,auth爲前鹽,jwt爲後鹽
def parse_jwt_token(self, jwt_token):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt':
return None
return tokens[1]
views.py
from rest_framework.views import APIView
from utils.response import APIResponse
# 必須登陸後才能訪問 - 經過了認證權限組件
from rest_framework.permissions import IsAuthenticated
# 自定義jwt校驗規則
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
admin使用自定義User表:新增用戶密碼密文
from django.contrib import admin
from . import models
# 自定義User表,admin後臺管理,採用密文密碼
from django.contrib.auth.admin import UserAdmin
class MyUserAdmin(UserAdmin):
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('username', 'password1', 'password2', 'mobile', 'email'),
}),
)
admin.site.register(models.User, MyUserAdmin)
羣查接口各類篩選組件數據準備
models.py
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name='車名')
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='價格')
brand = models.CharField(max_length=16, verbose_name='品牌')
class Meta:
db_table = 'api_car'
verbose_name = '汽車表'
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.py
admin.site.register(models.Car)
serializers.py
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = ['name', 'price', 'brand']
views.py
# Car的羣查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
urls.py
url(r'^cars/$', views.CarListAPIView.as_view()),
drf搜索過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索過濾
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter過濾類依賴的過濾條件 => 接口:/cars/?search=...
search_fields = ['name', 'price']
# eg:/cars/?search=1,name和price中包含1的數據都會被查詢出
drf排序過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序過濾
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter過濾類依賴的過濾條件 => 接口:/cars/?ordering=...
ordering_fields = ['pk', 'price']
# eg:/cars/?ordering=-price,pk,先按price降序,若是出現price相同,再按pk升序
drf基礎分頁組件
pahenations.py
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=頁碼
page_query_param = 'page'
# ?page=頁面 下默認一頁顯示的條數
page_size = 3
# ?page=頁面&page_size=條數 用戶自定義一頁顯示的條數
page_size_query_param = 'page_size'
# 用戶自定義一頁顯示的條數最大限制:數值超過5也只顯示5條
max_page_size = 5
views.py
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 若是queryset沒有過濾條件,就必須 .all(),否則分頁會出問題
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分頁組件 - 給視圖類配置分頁類便可 - 分頁類須要自定義,繼承drf提供的分頁類便可
pagination_class = pagenations.MyPageNumberPagination
複習
"""
頻率組件:限制接口的訪問頻率
源碼分析:初始化方法、判斷是否有權限方法、計數等待時間方法
自定義頻率組件:
class MyThrottle(SimpleRateThrottle):
scope = 'sms'
def get_cache_key(self, request, view):
# 從request的 query_params、data、META 及 view 中 獲取限制的條件
return '與認證信息有關的動態字符串'
settings文件中要有scope對應的rate配置 {'sms': '3/min'}
jwt認證:
1)session存儲token,須要數據庫參與,耗服務器資源、低效
2)緩存存token,須要緩存參與,高效,不易集羣
3)客戶端存token,服務器存簽發與交易token的算法,高效,易集羣
drf-jwt使用:
安裝:djangorestframework-jwt
視圖類簽發token - username,password => token
認證類校驗token - token => user
jwt格式:基本信息(頭base64).用戶信息過時時間(載荷base64).全部信息+祕鑰(簽名HS256)
"""
今日
"""
一、drf-jwt手動簽發與校驗
二、drf小組件:過濾、篩選、排序、分頁 => 針對與羣查接口
"""
簽發token
源碼入口
# 前提:給一個局部禁用了全部 認證與權限 的視圖類發送用戶信息獲得token,其實就是登陸接口
# 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父類 JSONWebTokenAPIView 的 post 方法
# 接受有username、password的post請求
# 2)post方法將請求數據交給 rest_framework_jwt.serializer.JSONWebTokenSerializer 處理
# 完成數據的校驗,會走序列化類的 全局鉤子校驗規則,校驗獲得登陸用戶並簽發token存儲在序列化對象中
核心源碼:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs):
# 帳號密碼字典
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# 簽發token第1步:用帳號密碼獲得user對象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
# 簽發token第2步:經過user獲得payload,payload包含着用戶信息與過時時間
payload = jwt_payload_handler(user)
# 在視圖類中,能夠經過 序列化對象.object.get('user'或者'token') 拿到user和token
return {
# 簽發token第3步:經過payload簽發出token
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
手動簽發token邏輯
# 1)經過username、password獲得user對象
# 2)經過user對象生成payload:jwt_payload_handler(user) => payload
# from rest_framework_jwt.serializers import jwt_payload_handler
# 3)經過payload簽發token:jwt_encode_handler(payload) => token
# from rest_framework_jwt.serializers import jwt_encode_handler
校驗token
源碼入口
# 前提:訪問一個配置了jwt認證規則的視圖類,就須要提交認證字符串token,在認證類中完成token的校驗
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父類 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 請求頭拿認證信息jwt-token => 經過反爬小規則肯定有用的token => payload => user
核心源碼:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 帶有反爬小規則的獲取token:前臺必須按 "jwt token字符串" 方式提交
# 校驗user第1步:從請求頭 HTTP_AUTHORIZATION 中拿token,並提取
jwt_value = self.get_jwt_value(request)
# 遊客
if jwt_value is None:
return None
# 校驗
try:
# 校驗user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校驗user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
手動校驗token邏輯
# 1)從請求頭中獲取token
# 2)根據token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根據payload解析出user:self.authenticate_credentials(payload) => user
# 繼承drf-jwt的BaseJSONWebTokenAuthentication,拿到父級的authenticate_credentials方法
案例:實現多方式登錄簽發token
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = 'api_user'
verbose_name = '用戶表'
verbose_name_plural = verbose_name
def __str__(self):
return self.username
serializers.py
from rest_framework import serializers
from . import models
import re
# 拿到前臺token的兩個函數: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) 前臺提交多種登陸信息都採用一個key,因此後臺能夠自定義反序列化字段進行對應
# 2) 序列化類要處理序列化與反序列化,要在fields中設置model綁定的Model類全部使用到的字段
# 3) 區分序列化字段與反序列化字段 read_only | write_only
# 4) 在自定義校驗規則中(局部鉤子、全局鉤子)校驗數據是否合法、肯定登陸的用戶、根據用戶簽發token
# 5) 將登陸的用戶與簽發的token保存在序列化類對象中
class UserModelSerializer(serializers.ModelSerializer):
# 自定義反序列字段:必定要設置write_only,只參與反序列化,不會與model類字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username', 'mobile', 'email']
# 系統校驗規則
extra_kwargs = {
'username': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
},
}
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# 多方式登陸:各分支處理獲得該方式下對應的用戶
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# 簽發:獲得登陸用戶,簽發token並存儲在實例化對象中
if user_obj and user_obj.check_password(pwd):
# 簽發token,將token存放到 實例化類對象的token 名字中
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# 將當前用戶與簽發的token都保存在序列化對象中
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({'data': '數據有誤'})
views.py
#實現多方式登錄簽發token:帳號、手機號、郵箱等登錄
# 1) 禁用認證與權限組件
# 2) 拿到前臺登陸信息,交給序列化類
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
# 4) 取出登陸用戶與token返回給前臺
import re
from . import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) 禁用認證與權限組件
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
# 2) 拿到前臺登陸信息,交給序列化類,規則:帳號用usr傳,密碼用pwd傳
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
user_ser.is_valid(raise_exception=True)
# 4) 取出登陸用戶與token返回給前臺
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
# "一根筋" 思考方式:全部邏輯都在視圖類中處理
def my_post(self, request, *args, **kwargs):
usr = request.data.get('usr')
pwd = request.data.get('pwd')
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={'username': user_obj.username}, token=token)
return APIResponse(data_msg='不可控錯誤')
案例:自定義認證反爬規則的認證類
authentications.py
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION')
# 自定義校驗規則:auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed('token已過時')
except:
raise AuthenticationFailed('非法用戶')
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# 自定義校驗規則:auth token jwt,auth爲前鹽,jwt爲後鹽
def parse_jwt_token(self, jwt_token):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt':
return None
return tokens[1]
views.py
from rest_framework.views import APIView
from utils.response import APIResponse
# 必須登陸後才能訪問 - 經過了認證權限組件
from rest_framework.permissions import IsAuthenticated
# 自定義jwt校驗規則
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
admin使用自定義User表:新增用戶密碼密文
from django.contrib import admin
from . import models
# 自定義User表,admin後臺管理,採用密文密碼
from django.contrib.auth.admin import UserAdmin
class MyUserAdmin(UserAdmin):
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('username', 'password1', 'password2', 'mobile', 'email'),
}),
)
admin.site.register(models.User, MyUserAdmin)
羣查接口各類篩選組件數據準備
models.py
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name='車名')
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='價格')
brand = models.CharField(max_length=16, verbose_name='品牌')
class Meta:
db_table = 'api_car'
verbose_name = '汽車表'
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.py
admin.site.register(models.Car)
serializers.py
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = ['name', 'price', 'brand']
views.py
# Car的羣查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
urls.py
url(r'^cars/$', views.CarListAPIView.as_view()),
drf搜索過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索過濾
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter過濾類依賴的過濾條件 => 接口:/cars/?search=...
search_fields = ['name', 'price']
# eg:/cars/?search=1,name和price中包含1的數據都會被查詢出
drf排序過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序過濾
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter過濾類依賴的過濾條件 => 接口:/cars/?ordering=...
ordering_fields = ['pk', 'price']
# eg:/cars/?ordering=-price,pk,先按price降序,若是出現price相同,再按pk升序
drf基礎分頁組件
pahenations.py
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=頁碼
page_query_param = 'page'
# ?page=頁面 下默認一頁顯示的條數
page_size = 3
# ?page=頁面&page_size=條數 用戶自定義一頁顯示的條數
page_size_query_param = 'page_size'
# 用戶自定義一頁顯示的條數最大限制:數值超過5也只顯示5條
max_page_size = 5
views.py
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 若是queryset沒有過濾條件,就必須 .all(),否則分頁會出問題
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分頁組件 - 給視圖類配置分頁類便可 - 分頁類須要自定義,繼承drf提供的分頁類便可
pagination_class = pagenations.MyPageNumberPagination
複習
"""
頻率組件:限制接口的訪問頻率
源碼分析:初始化方法、判斷是否有權限方法、計數等待時間方法
自定義頻率組件:
class MyThrottle(SimpleRateThrottle):
scope = 'sms'
def get_cache_key(self, request, view):
# 從request的 query_params、data、META 及 view 中 獲取限制的條件
return '與認證信息有關的動態字符串'
settings文件中要有scope對應的rate配置 {'sms': '3/min'}
jwt認證:
1)session存儲token,須要數據庫參與,耗服務器資源、低效
2)緩存存token,須要緩存參與,高效,不易集羣
3)客戶端存token,服務器存簽發與交易token的算法,高效,易集羣
drf-jwt使用:
安裝:djangorestframework-jwt
視圖類簽發token - username,password => token
認證類校驗token - token => user
jwt格式:基本信息(頭base64).用戶信息過時時間(載荷base64).全部信息+祕鑰(簽名HS256)
"""
今日
"""
一、drf-jwt手動簽發與校驗
二、drf小組件:過濾、篩選、排序、分頁 => 針對與羣查接口
"""
簽發token
源碼入口
# 前提:給一個局部禁用了全部 認證與權限 的視圖類發送用戶信息獲得token,其實就是登陸接口
# 1)rest_framework_jwt.views.ObtainJSONWebToken 的 父類 JSONWebTokenAPIView 的 post 方法
# 接受有username、password的post請求
# 2)post方法將請求數據交給 rest_framework_jwt.serializer.JSONWebTokenSerializer 處理
# 完成數據的校驗,會走序列化類的 全局鉤子校驗規則,校驗獲得登陸用戶並簽發token存儲在序列化對象中
核心源碼:rest_framework_jwt.serializer.JSONWebTokenSerializer的validate(self, attrs)方法
def validate(self, attrs):
# 帳號密碼字典
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
# 簽發token第1步:用帳號密碼獲得user對象
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
# 簽發token第2步:經過user獲得payload,payload包含着用戶信息與過時時間
payload = jwt_payload_handler(user)
# 在視圖類中,能夠經過 序列化對象.object.get('user'或者'token') 拿到user和token
return {
# 簽發token第3步:經過payload簽發出token
'token': jwt_encode_handler(payload),
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
手動簽發token邏輯
# 1)經過username、password獲得user對象
# 2)經過user對象生成payload:jwt_payload_handler(user) => payload
# from rest_framework_jwt.serializers import jwt_payload_handler
# 3)經過payload簽發token:jwt_encode_handler(payload) => token
# from rest_framework_jwt.serializers import jwt_encode_handler
校驗token
源碼入口
# 前提:訪問一個配置了jwt認證規則的視圖類,就須要提交認證字符串token,在認證類中完成token的校驗
# 1)rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父類 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 請求頭拿認證信息jwt-token => 經過反爬小規則肯定有用的token => payload => user
核心源碼:rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
# 帶有反爬小規則的獲取token:前臺必須按 "jwt token字符串" 方式提交
# 校驗user第1步:從請求頭 HTTP_AUTHORIZATION 中拿token,並提取
jwt_value = self.get_jwt_value(request)
# 遊客
if jwt_value is None:
return None
# 校驗
try:
# 校驗user第2步:token => payload
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
# 校驗user第3步:token => payload
user = self.authenticate_credentials(payload)
return (user, jwt_value)
手動校驗token邏輯
# 1)從請求頭中獲取token
# 2)根據token解析出payload:jwt_decode_handler(token) => payloay
# from rest_framework_jwt.authentication import jwt_decode_handler
# 3)根據payload解析出user:self.authenticate_credentials(payload) => user
# 繼承drf-jwt的BaseJSONWebTokenAuthentication,拿到父級的authenticate_credentials方法
案例:實現多方式登錄簽發token
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
mobile = models.CharField(max_length=11, unique=True)
class Meta:
db_table = 'api_user'
verbose_name = '用戶表'
verbose_name_plural = verbose_name
def __str__(self):
return self.username
serializers.py
from rest_framework import serializers
from . import models
import re
# 拿到前臺token的兩個函數: user => payload => token
# from rest_framework_jwt.settings import api_settings
# jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
# jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
# 1) 前臺提交多種登陸信息都採用一個key,因此後臺能夠自定義反序列化字段進行對應
# 2) 序列化類要處理序列化與反序列化,要在fields中設置model綁定的Model類全部使用到的字段
# 3) 區分序列化字段與反序列化字段 read_only | write_only
# 4) 在自定義校驗規則中(局部鉤子、全局鉤子)校驗數據是否合法、肯定登陸的用戶、根據用戶簽發token
# 5) 將登陸的用戶與簽發的token保存在序列化類對象中
class UserModelSerializer(serializers.ModelSerializer):
# 自定義反序列字段:必定要設置write_only,只參與反序列化,不會與model類字段映射
usr = serializers.CharField(write_only=True)
pwd = serializers.CharField(write_only=True)
class Meta:
model = models.User
fields = ['usr', 'pwd', 'username', 'mobile', 'email']
# 系統校驗規則
extra_kwargs = {
'username': {
'read_only': True
},
'mobile': {
'read_only': True
},
'email': {
'read_only': True
},
}
def validate(self, attrs):
usr = attrs.get('usr')
pwd = attrs.get('pwd')
# 多方式登陸:各分支處理獲得該方式下對應的用戶
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
# 簽發:獲得登陸用戶,簽發token並存儲在實例化對象中
if user_obj and user_obj.check_password(pwd):
# 簽發token,將token存放到 實例化類對象的token 名字中
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
# 將當前用戶與簽發的token都保存在序列化對象中
self.user = user_obj
self.token = token
return attrs
raise serializers.ValidationError({'data': '數據有誤'})
views.py
#實現多方式登錄簽發token:帳號、手機號、郵箱等登錄
# 1) 禁用認證與權限組件
# 2) 拿到前臺登陸信息,交給序列化類
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
# 4) 取出登陸用戶與token返回給前臺
import re
from . import serializers, models
from utils.response import APIResponse
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
class LoginAPIView(APIView):
# 1) 禁用認證與權限組件
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
# 2) 拿到前臺登陸信息,交給序列化類,規則:帳號用usr傳,密碼用pwd傳
user_ser = serializers.UserModelSerializer(data=request.data)
# 3) 序列化類校驗獲得登陸用戶與token存放在序列化對象中
user_ser.is_valid(raise_exception=True)
# 4) 取出登陸用戶與token返回給前臺
return APIResponse(token=user_ser.token, results=serializers.UserModelSerializer(user_ser.user).data)
# "一根筋" 思考方式:全部邏輯都在視圖類中處理
def my_post(self, request, *args, **kwargs):
usr = request.data.get('usr')
pwd = request.data.get('pwd')
if re.match(r'.+@.+', usr):
user_query = models.User.objects.filter(email=usr)
elif re.match(r'1[3-9][0-9]{9}', usr):
user_query = models.User.objects.filter(mobile=usr)
else:
user_query = models.User.objects.filter(username=usr)
user_obj = user_query.first()
if user_obj and user_obj.check_password(pwd):
payload = jwt_payload_handler(user_obj)
token = jwt_encode_handler(payload)
return APIResponse(results={'username': user_obj.username}, token=token)
return APIResponse(data_msg='不可控錯誤')
案例:自定義認證反爬規則的認證類
authentications.py
import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseJSONWebTokenAuthentication):
def authenticate(self, request):
jwt_token = request.META.get('HTTP_AUTHORIZATION')
# 自定義校驗規則:auth token jwt
token = self.parse_jwt_token(jwt_token)
if token is None:
return None
try:
# token => payload
payload = jwt_decode_handler(token)
except jwt.ExpiredSignature:
raise AuthenticationFailed('token已過時')
except:
raise AuthenticationFailed('非法用戶')
# payload => user
user = self.authenticate_credentials(payload)
return (user, token)
# 自定義校驗規則:auth token jwt,auth爲前鹽,jwt爲後鹽
def parse_jwt_token(self, jwt_token):
tokens = jwt_token.split()
if len(tokens) != 3 or tokens[0].lower() != 'auth' or tokens[2].lower() != 'jwt':
return None
return tokens[1]
views.py
from rest_framework.views import APIView
from utils.response import APIResponse
# 必須登陸後才能訪問 - 經過了認證權限組件
from rest_framework.permissions import IsAuthenticated
# 自定義jwt校驗規則
from .authentications import JWTAuthentication
class UserDetail(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
return APIResponse(results={'username': request.user.username})
admin使用自定義User表:新增用戶密碼密文
from django.contrib import admin
from . import models
# 自定義User表,admin後臺管理,採用密文密碼
from django.contrib.auth.admin import UserAdmin
class MyUserAdmin(UserAdmin):
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('username', 'password1', 'password2', 'mobile', 'email'),
}),
)
admin.site.register(models.User, MyUserAdmin)
羣查接口各類篩選組件數據準備
models.py
class Car(models.Model):
name = models.CharField(max_length=16, unique=True, verbose_name='車名')
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='價格')
brand = models.CharField(max_length=16, verbose_name='品牌')
class Meta:
db_table = 'api_car'
verbose_name = '汽車表'
verbose_name_plural = verbose_name
def __str__(self):
return self.name
admin.py
admin.site.register(models.Car)
serializers.py
class CarModelSerializer(serializers.ModelSerializer):
class Meta:
model = models.Car
fields = ['name', 'price', 'brand']
views.py
# Car的羣查接口
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
urls.py
url(r'^cars/$', views.CarListAPIView.as_view()),
drf搜索過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的SearchFilter - 搜索過濾
from rest_framework.filters import SearchFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [SearchFilter]
# 第三步:SearchFilter過濾類依賴的過濾條件 => 接口:/cars/?search=...
search_fields = ['name', 'price']
# eg:/cars/?search=1,name和price中包含1的數據都會被查詢出
drf排序過濾組件
views.py
from rest_framework.generics import ListAPIView
# 第一步:drf的OrderingFilter - 排序過濾
from rest_framework.filters import OrderingFilter
class CarListAPIView(ListAPIView):
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 第二步:局部配置 過濾類 們(全局配置用DEFAULT_FILTER_BACKENDS)
filter_backends = [OrderingFilter]
# 第三步:OrderingFilter過濾類依賴的過濾條件 => 接口:/cars/?ordering=...
ordering_fields = ['pk', 'price']
# eg:/cars/?ordering=-price,pk,先按price降序,若是出現price相同,再按pk升序
drf基礎分頁組件
pahenations.py
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
# ?page=頁碼
page_query_param = 'page'
# ?page=頁面 下默認一頁顯示的條數
page_size = 3
# ?page=頁面&page_size=條數 用戶自定義一頁顯示的條數
page_size_query_param = 'page_size'
# 用戶自定義一頁顯示的條數最大限制:數值超過5也只顯示5條
max_page_size = 5
views.py
from rest_framework.generics import ListAPIView
class CarListAPIView(ListAPIView):
# 若是queryset沒有過濾條件,就必須 .all(),否則分頁會出問題
queryset = models.Car.objects.all()
serializer_class = serializers.CarModelSerializer
# 分頁組件 - 給視圖類配置分頁類便可 - 分頁類須要自定義,繼承drf提供的分頁類便可
pagination_class = pagenations.MyPageNumberPagination