rest framework是django中的一個組件,幫助咱們開發一些符合restful規範的api接口
---- 一切皆是資源,操做只是請求方式 ----book表增刪改查 /books/ books /books/add/ addbook /books/(\d+)/change/ changebook /books/(\d+)/delete/ delbook ----book表增刪改查 /books/ -----get books ----- 返回當前全部數據 /books/ -----post books ----- 返回提交數據 /books/(\d+)-----get bookdetail ----- 返回當前查看的單條數據 /books/(\d+)-----put bookdetail ----- 返回更新數據 /books/(\d+)-----delete bookdetail ----- 返回空 class Books(View): def get(self,request): pass # 查看全部書籍 def post(self,request): pass # 添加書籍 class BooksDetail(View): def get(self,request,id): pass # 查看具體書籍 def put(self,request,id): pass # 更新某本書籍 def delete(self,request,id): pass # 刪除某本書籍
models.pyhtml
from django.db import models class Book(models.Model): title = models.CharField(max_length=32) price = models.IntegerField() pub_date = models.DateField() publish = models.ForeignKey("Publish", on_delete=models.CASCADE) authors = models.ManyToManyField("Author") def __str__(self): return self.title class Publish(models.Model): name = models.CharField(max_length=32) email = models.EmailField() def __str__(self): return self.name class Author(models.Model): name = models.CharField(max_length=32) age = models.IntegerField() def __str__(self): return self.name
from django.contrib import admin from django.urls import path from app01 import views urlpatterns = [ path('admin/', admin.site.urls), path('publishes/', views.PublishView.as_view()), ]
from rest_framework import serializers from app01.models import * # 爲queryset,model對象作序列化 class PublishSerializers(serializers.Serializer): name = serializers.CharField() email = serializers.CharField() class PublishModelSerializers(serializers.ModelSerializer): class Meta: model = Publish fields = "__all__"
from rest_framework.response import Response from rest_framework.views import APIView from app01.models import * from app01.serilizer import PublishModelSerializers, BookModelSerializers class PublishView(APIView): def get(self, request): # restframework # 取數據 # print("request.data", request.data) # print("request.data type", type(request.data)) # print(request._request.GET) # print(request.GET) # 序列化 # 方式1: # publish_list = list(Publish.objects.all().values("name","email")) # return HttpResponse(json.dumps(publish_list,ensure_ascii=False)) # 方式2: # from django.forms.models import model_to_dict # publish_list = Publish.objects.all() # temp = [] # for obj in publish_list: # temp.append(model_to_dict(obj)) # return HttpResponse(json.dumps(temp, ensure_ascii=False)) # 方式3: # from django.core import serializers # publish_list = Publish.objects.all() # ret = serializers.serialize("json",publish_list) # return HttpResponse(ret) # 序列組件 publish_list = Publish.objects.all() ps = PublishModelSerializers(publish_list, many=True) return Response(ps.data) def post(self, request): # 取數據 # 原生request支持的操做 # print("POST",request.POST) # print("body",request.body) # # print(request) # print(type(request)) # from django.core.handlers.wsgi import WSGIRequest # 新的request支持的操做 # print("request.data",request.data) # print("request.data type",type(request.data)) # # post請求的數據 ps = PublishModelSerializers(data=request.data) if ps.is_valid(): print(ps.validated_data) ps.save() # create方法 return Response(ps.data) else: return Response(ps.errors)
Django的原生request: 瀏覽器 ------------- 服務器 "GET url?a=1&b=2 http/1.1\r\user_agent:Google\r\ncontentType:urlencoded\r\n\r\n" "POST url http/1.1\r\user_agent:Google\r\ncontentType:urlencoded\r\n\r\na=1&b=2" request.body: a=1&b=2 request.POST: if contentType:urlencoded: # 只有在contentType==urlencoded的時候,request.POST纔有數據,數據爲字典(將請求體的字節轉化爲字典) a=1&b=2----->{"a":1,"b":2}
request._request 原生request
request.data POST數據
request._request.GET GET數據
request.GET GET數據
執行流程:首先進行路由匹配,匹配到指定的視圖類,執行self.dispatch方法,將請求相關信息封裝,並通過版本、認證、權限、頻率組件以後,將request傳入視圖函數,並執行視圖函數,進行解析,序列化,和分頁, 返回response,通過渲染器以後返回頁面
urls.pyjava
from django.conf.urls import url, include from web.views.api import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
views.pynode
from rest_framework.views import APIView from rest_framework.response import Response class TestView(APIView): def dispatch(self, request, *args, **kwargs): """ 請求到來以後,都要執行dispatch方法,dispatch方法根據請求方式不一樣觸發 get/post/put等方法 注意:APIView中的dispatch方法有好多好多的功能 """ return super().dispatch(request, *args, **kwargs) def get(self, request, *args, **kwargs): return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
上述是rest framework框架基本流程,重要的功能是在APIView的dispatch中觸發。python
def dispatch(self, request, *args, **kwargs): """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """ # 通過路由和視圖以後來到dispatch方法 self.args = args self.kwargs = kwargs # 對request進行封裝, 讀取解析器對請求數據進行解析 request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? # self.request._request # self.request.GET # self.request.data try: # 版本組件,認證組件,權限組件,頻率組件 self.initial(request, *args, **kwargs) # Get the appropriate handler method # 分發, 執行請求方法對應的視圖函數 if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed # 分頁器、序列器:執行視圖函數,對返回結果進行分頁,並序列化 response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) # 渲染器: 對response進行渲染 self.response = self.finalize_response(request, response, *args, **kwargs) return self.response def initial(self, request, *args, **kwargs): """ Runs anything that needs to occur prior to calling the method handler. """ self.format_kwarg = self.get_format_suffix(**kwargs) # Perform content negotiation and store the accepted info on the request neg = self.perform_content_negotiation(request) request.accepted_renderer, request.accepted_media_type = neg # Determine the API version, if versioning is in use. # 版本組件 version, scheme = self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted # 認證組件 self.perform_authentication(request) # 權限組件 self.check_permissions(request) # 頻率組件 self.check_throttles(request)
http://127.0.0.1:8000/authors/?token=95631b025a67a0d640c33862d1788293
from django.contrib import admin from django.urls import path, re_path from app01 import views urlpatterns = [ path('admin/', admin.site.urls), re_path(r'^authors/$', views.AuthorModelView.as_view({"get": "list", "post": "create"}), name="author"), re_path(r'^authors/(?P<pk>\d+)/$', views.AuthorModelView.as_view({"get": "retrieve", "put": "update", "delete": "destroy"}), name="detailauthor"), re_path(r'^login/$', views.LoginView.as_view(), name="login"), ]
from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed from api import models class LuffyAuth(BaseAuthentication): def authenticate(self, request): """ 用戶請求進行認正 :param request: :return: """ token = request.query_params.get('token') # request._request.GET.get('token') obj = models.UserAuthToken.objects.filter(token=token).first() if not obj: raise AuthenticationFailed({'code':1001,'error':'認證失敗'}) # request.user = obj.user.username request.auth = obj return (obj.user.username, obj)
class AuthorModelView(viewsets.ModelViewSet): authentication_classes = [TokenAuth, ] permission_classes = [] throttle_classes = [] # 限制某個IP每分鐘訪問次數不能超過20次 queryset = Author.objects.all() serializer_class = AuthorModelSerializers def get_random_str(user): import hashlib, time ctime = str(time.time()) md5 = hashlib.md5(bytes(user, encoding="utf8")) md5.update(bytes(ctime, encoding="utf8")) return md5.hexdigest() from app01.models import User class LoginView(APIView): def post(self, request): self.dispatch name = request.data.get("name") pwd = request.data.get("pwd") user = User.objects.filter(name=name, pwd=pwd).first() res = {"state_code": 1000, "msg": None} if user: random_str = get_random_str(user.name) token = Token.objects.update_or_create(user=user, defaults={"token": random_str}) res["token"] = random_str else: res["state_code"] = 1001 # 錯誤狀態碼 res["msg"] = "用戶名或者密碼錯誤" return Response(res) # return Response(json.dumps(res, ensure_ascii=False))
from django.conf.urls import url, include from web.viewsimport TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,若是驗證成功後返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None (user,token)表示驗證經過並設置用戶名和Token; AuthenticationFailed異常 """ import base64 auth = request.META.get('HTTP_AUTHORIZATION', b'') if auth: auth = auth.encode('utf-8') auth = auth.split() if not auth or auth[0].lower() != b'basic': raise exceptions.AuthenticationFailed('驗證失敗') if len(auth) != 2: raise exceptions.AuthenticationFailed('驗證失敗') username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':') if username == 'alex' and password == '123': return ('登陸用戶', '用戶token') else: raise exceptions.AuthenticationFailed('用戶名或密碼錯誤') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ return 'Basic realm=api' class TestView(APIView): authentication_classes = [TestAuthentication, ] permission_classes = [] def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
from django.conf.urls import url, include from web.views.s2_auth import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class Test1Authentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,若是驗證成功後返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值爲:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值爲:None else: self.auth = None (user,token)表示驗證經過並設置用戶名和Token; AuthenticationFailed異常 """ import base64 auth = request.META.get('HTTP_AUTHORIZATION', b'') if auth: auth = auth.encode('utf-8') else: return None print(auth,'xxxx') auth = auth.split() if not auth or auth[0].lower() != b'basic': raise exceptions.AuthenticationFailed('驗證失敗') if len(auth) != 2: raise exceptions.AuthenticationFailed('驗證失敗') username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':') if username == 'alex' and password == '123': return ('登陸用戶', '用戶token') else: raise exceptions.AuthenticationFailed('用戶名或密碼錯誤') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # return 'Basic realm=api' pass class Test2Authentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,若是驗證成功後返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值爲:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值爲:None else: self.auth = None (user,token)表示驗證經過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登陸用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestView(APIView): authentication_classes = [Test1Authentication, Test2Authentication] permission_classes = [] def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
request.META {'#ENVTSLOGSHELLEXT4044': '436098784', 'ALLUSERSPROFILE': 'C:\\ProgramData', 'APPDATA': 'C:\\Users\\fei\\AppData\\Roaming', 'COMMONPROGRAMFILES': 'C:\\Program Files\\Common Files', 'COMMONPROGRAMFILES(X86)': 'C:\\Program Files (x86)\\Common Files', 'COMMONPROGRAMW6432': 'C:\\Program Files\\Common Files', 'COMPUTERNAME': 'HOME-FEI', 'COMSPEC': 'C:\\WINDOWS\\system32\\cmd.exe', 'DJANGO_SETTINGS_MODULE': 'djngo_rest_demo.settings', 'DOCKER_TOOLBOX_INSTALL_PATH': 'D:\\Docker Toolbox', 'FP_NO_HOST_CHECK': 'NO', 'HOMEDRIVE': 'C:', 'HOMEPATH': '\\Users\\fei', 'JAVA_HOME': 'D:\\java', 'LANG': 'zh_CN', 'LOCALAPPDATA': 'C:\\Users\\fei\\AppData\\Local', 'LOGONSERVER': '\\\\MicrosoftAccount', 'NODE_PATH': 'D:\\nodes\\node_global\\node_modules', 'NUMBER_OF_PROCESSORS': '4', 'OS': 'Windows_NT', 'PATH': 'C:\\Program Files (x86)\\Common Files\\Oracle\\Java\\javapath;C:\\Program Files\\MySQL\\MySQL Server 5.7\\bin;D:\\app\\fei\\product\\11.2.0\\dbhome_1\\bin;;:\\Program Files (x86)\\Intel\\iCLS Client\\;C:\\Program Files\\Intel\\iCLS Client\\;C:\\WINDOWS\\system32;C:\\WINDOWS;C:\\WINDOWS\\System32\\Wbem;C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\;C:\\Program Files\\Intel\\Intel(R) Management Engine Components\\DAL;C:\\Program Files (x86)\\Intel\\Intel(R) Management Engine Components\\DAL;C:\\Program Files\\Intel\\Intel(R) Management Engine Components\\IPT;C:\\Program Files (x86)\\Intel\\Intel(R) Management Engine Components\\IPT;C:\\Program Files\\Intel\\WiFi\\bin\\;C:\\Program Files\\Common Files\\Intel\\WirelessCommon\\;C:\\Program Files\\MySQL\\MySQL Utilities 1.6\\;C:\\Program Files\\MySQL\\MySQL Server 5.7\\bin;D:\\Git\\cmd;D:\\postresql\\pg96\\bin;C:\\salt;D:\\Microsoft VS Code\\bin;D:\\nodes\\;D:\\cmd_markdown_win64;D:\\FastStone Capture;D:\\Anaconda\\Library\\mingw-w64\\bin;D:\\Anaconda\\Library\\usr\\bin;D:\\Anaconda\\Library\\bin;D:\\pytho3.6\\Scripts\\;D:\\pytho3.6\\;D:\\Ruby25-x64\\bin;D:\\python3\\;D:\\java\\bin;D:\\java\\jre\\bin;C:\\Program Files\\MySQL\\MySQL Server 5.7\\bin;D:\\python;D:\\Sublime Text 3;D:\\Vim\\vim81;D:\\Redis;D:\\MINGW\\bin;D:\\MongoDb\\bin;D:\\Ruby25-x64\\bin;D:\\mitmiproxy\\bin;D:\\Docker Toolbox;D:\\postresql\\pg96\\bin;D:\\cmder;D:\\python3\\Scripts;D:\\python\\Scripts;D:\\adb;D:\\Graphviz\\bin;D:\\Vim\\vim74;D:\\nodes\\node_cache;D:\\nodes\\node_cache\\node_modules;D:\\nodes\\node_global;D:\\pytho3.6\\lib\\site-packages\\pywin32_system32;D:\\pytho3.6\\lib\\site-packages\\pywin32_system32', 'PATHEXT': '.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC;.RB;.RBW', 'PROCESSOR_ARCHITECTURE': 'AMD64', 'PROCESSOR_IDENTIFIER': 'Intel64 Family 6 Model 69 Stepping 1, GenuineIntel', 'PROCESSOR_LEVEL': '6', 'PROCESSOR_REVISION': '4501', 'PROGRAMDATA': 'C:\\ProgramData', 'PROGRAMFILES': 'C:\\Program Files', 'PROGRAMFILES(X86)': 'C:\\Program Files (x86)', 'PROGRAMW6432': 'C:\\Program Files', 'PSMODULEPATH': 'C:\\WINDOWS\\system32\\WindowsPowerShell\\v1.0\\Modules\\', 'PUBLIC': 'C:\\Users\\Public', 'PYCHARM_DJANGO_MANAGE_MODULE': 'manage', 'PYCHARM_TRACK_FILES_PATTERN': 'migrations', 'PYTHONDONTWRITEBYTECODE': '1', 'PYTHONIOENCODING': 'UTF-8', 'PYTHONPATH': 'D:\\PyCharm 2018.1.1\\helpers\\pycharm;E:\\Web框架\\Django\\djngo_rest_demo;E:/Web框架/Django/djngo_rest_demo;D:\\pytho3.6', 'PYTHONUNBUFFERED': '1', 'SESSIONNAME': 'Console', 'SYSTEMDRIVE': 'C:', 'SYSTEMROOT': 'C:\\WINDOWS', 'TEMP': 'C:\\Users\\fei\\AppData\\Local\\Temp;D:\\python', 'TMP': 'C:\\Users\\fei\\AppData\\Local\\Temp', 'USERDOMAIN': 'HOME-FEI', 'USERDOMAIN_ROAMINGPROFILE': 'HOME-FEI', 'USERNAME': 'fei', 'USERPROFILE': 'C:\\Users\\fei', 'WINDIR': 'C:\\WINDOWS', 'RUN_MAIN': 'true', 'SERVER_NAME': 'home-fei', 'GATEWAY_INTERFACE': 'CGI/1.1', 'SERVER_PORT': '8000', 'REMOTE_HOST': '', 'CONTENT_LENGTH': '', 'SCRIPT_NAME': '', 'SERVER_PROTOCOL': 'HTTP/1.1', 'SERVER_SOFTWARE': 'WSGIServer/0.2', 'REQUEST_METHOD': 'GET', 'PATH_INFO': '/authors/', 'QUERY_STRING': 'token=95631b025a67a0d640c33862d1788293', 'REMOTE_ADDR': '127.0.0.1', 'CONTENT_TYPE': 'application/json', 'HTTP_CACHE_CONTROL': 'no-cache', 'HTTP_POSTMAN_TOKEN': 'da403618-3002-4256-bfc4-1f2c98c8795e', 'HTTP_USER_AGENT': 'PostmanRuntime/7.1.1', 'HTTP_ACCEPT': '*/*', 'HTTP_HOST': '127.0.0.1:8000', 'HTTP_ACCEPT_ENCODING': 'gzip, deflate', 'HTTP_CONNECTION': 'keep-alive', 'wsgi.input': <_io.BufferedReader name=444>, 'wsgi.errors': <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, 'wsgi.version': (1, 0), 'wsgi.run_once': False, 'wsgi.url_scheme': 'http', 'wsgi.multithread': True, 'wsgi.multiprocess': False, 'wsgi.file_wrapper': <class 'wsgiref.util.FileWrapper'> }
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import BasePermission from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,若是驗證成功後返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值爲:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值爲:None else: self.auth = None (user,token)表示驗證經過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登陸用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestPermission(BasePermission): message = "權限驗證失敗" def has_permission(self, request, view): """ 判斷是否有權限訪問當前請求 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :return: True有權限;False無權限 """ if request.user == "管理員": return True # GenericAPIView中get_object時調用 def has_object_permission(self, request, view, obj): """ 視圖繼承GenericAPIView,並在其中使用get_object時獲取對象時,觸發單獨對象權限驗證 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :param obj: :return: True有權限;False無權限 """ if request.user == "管理員": return True class TestView(APIView): # 認證的動做是由request.user觸發 authentication_classes = [TestAuthentication, ] # 權限 # 循環執行全部的權限 permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication from app01 import models class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = models.Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("驗證失敗123!") else: return token_obj.user.name, token_obj.token
REST_FRAMEWORK={ 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_PERMISSION_CLASSES": ["app01.utils.TestPermission", ], "DEFAULT_AUTHENTICATION_CLASSES": ["app01.auth.tokenAuth", ] }
class AuthorModelView(viewsets.ModelViewSet): queryset = Author.objects.all() serializer_class = AuthorModelSerializers
實現原理web
以ip或用戶名(惟一標示)爲key,每次訪問的時間戳做爲key對應的列表中的元素,好比我規定每一個ip一分鐘之內最多訪問3次, 則某ip發送請求過來,我首先找到對應的key值,將當前時間減去60秒,將ip對應的列表中小於減去60秒以後的時間的時間戳刪除, 若個數等於3則禁止訪問,若小於3,則將當前時間戳添加至列表中頭部。 匿名用戶:沒法控制,由於用戶能夠換代理IP { 192.168.1.1:[1521223123.232, 1521223122.232, 1521223121.232], 192.168.1.2:[1521223123.232, 1521223122.232, 1521223121.232], 192.168.1.3:[1521223123.232, 1521223122.232, 1521223121.232], 192.168.1.4:[1521223123.232, 1521223122.232, 1521223121.232], 192.168.1.5:[1521223123.232, 1521223122.232, 1521223121.232], 192.168.1.6:[1521223123.232, 1521223122.232, 1521223121.232], } 登陸用戶:若是有不少帳號,也沒法限制 { alex:[1521223123.232, 1521223122.232, 1521223121.232], eric:[1521223123.232, 1521223122.232, 1521223121.232], } 參考源碼:from rest_framework.throttling import SimpleRateThrottle
""" Provides various throttling policies. """ from __future__ import unicode_literals import time from django.core.cache import cache as default_cache from django.core.exceptions import ImproperlyConfigured from rest_framework.settings import api_settings class BaseThrottle(object): """ Rate throttling of requests. """ def allow_request(self, request, view): """ Return `True` if the request should be allowed, `False` otherwise. """ raise NotImplementedError('.allow_request() must be overridden') def get_ident(self, request): """ Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get('HTTP_X_FORWARDED_FOR') remote_addr = request.META.get('REMOTE_ADDR') num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(',') client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ''.join(xff.split()) if xff else remote_addr def wait(self): """ Optionally, return a recommended number of seconds to wait before the next request. """ return None class SimpleRateThrottle(BaseThrottle): """ A simple cache implementation, that only requires `.get_cache_key()` to be overridden. The rate (requests / seconds) is set by a `rate` attribute on the View class. The attribute is a string of the form 'number_of_requests/period'. Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day') Previous request information used for throttling is stored in the cache. """ cache = default_cache timer = time.time cache_format = 'throttle_%(scope)s_%(ident)s' scope = None THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, 'rate', None): self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate) def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ raise NotImplementedError('.get_cache_key() must be overridden') def get_rate(self): """ Determine the string representation of the allowed request rate. """ if not getattr(self, 'scope', None): msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg) def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ if rate is None: return (None, None) num, period = rate.split('/') num_requests = int(num) duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] return (num_requests, duration) def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True self.key = self.get_cache_key(request, view) if self.key is None: return True self.history = self.cache.get(self.key, []) self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() def throttle_success(self): """ Inserts the current request's timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests) class AnonRateThrottle(SimpleRateThrottle): """ Limits the rate of API calls that may be made by a anonymous users. The IP address of the request will be used as the unique cache key. """ scope = 'anon' def get_cache_key(self, request, view): if request.user.is_authenticated: return None # Only throttle unauthenticated requests. return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class UserRateThrottle(SimpleRateThrottle): """ Limits the rate of API calls that may be made by a given user. The user id will be used as a unique cache key if the user is authenticated. For anonymous requests, the IP address of the request will be used. """ scope = 'user' def get_cache_key(self, request, view): if request.user.is_authenticated: ident = request.user.pk else: ident = self.get_ident(request) return self.cache_format % { 'scope': self.scope, 'ident': ident } class ScopedRateThrottle(SimpleRateThrottle): """ Limits the rate of API calls by different amounts for various parts of the API. Any view that has the `throttle_scope` property set will be throttled. The unique cache key will be generated by concatenating the user id of the request, and the scope of the view being accessed. """ scope_attr = 'throttle_scope' def __init__(self): # Override the usual SimpleRateThrottle, because we can't determine # the rate until called by the view. pass def allow_request(self, request, view): # We can only determine the scope once we're called by the view. self.scope = getattr(view, self.scope_attr, None) # If a view does not have a `throttle_scope` always allow the request if not self.scope: return True # Determine the allowed request rate as we normally would during # the `__init__` call. self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate) # We can now proceed as normal. return super(ScopedRateThrottle, self).allow_request(request, view) def get_cache_key(self, request, view): """ If `view.throttle_scope` is not set, don't apply this throttle. Otherwise generate the unique cache key by concatenating the user id with the '.throttle_scope` property of the view. """ if request.user.is_authenticated: ident = request.user.pk else: ident = self.get_ident(request) return self.cache_format % { 'scope': self.scope, 'ident': ident }
1.基於用戶IP限制訪問頻率sql
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import BaseThrottle from rest_framework.settings import api_settings # 保存訪問記錄 RECORD = { '用戶IP': [12312139, 12312135, 12312133, ] } class TestThrottle(BaseThrottle): ctime = time.time def get_ident(self, request): """ 根據用戶IP和代理IP,當作請求者的惟一IP Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get('HTTP_X_FORWARDED_FOR') remote_addr = request.META.get('REMOTE_ADDR') num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(',') client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ''.join(xff.split()) if xff else remote_addr def allow_request(self, request, view): """ 是否仍然在容許範圍內 Return `True` if the request should be allowed, `False` otherwise. :param request: :param view: :return: True,表示能夠經過;False表示已超過限制,不容許訪問 """ # 獲取用戶惟一標識(如:IP) # 容許一分鐘訪問10次 num_request = 10 time_request = 60 now = self.ctime() ident = self.get_ident(request) self.ident = ident if ident not in RECORD: RECORD[ident] = [now, ] return True history = RECORD[ident] while history and history[-1] <= now - time_request: history.pop() if len(history) < num_request: history.insert(0, now) return True def wait(self): """ 多少秒後能夠容許繼續訪問 Optionally, return a recommended number of seconds to wait before the next request. """ last_time = RECORD[self.ident][0] now = self.ctime() return int(60 + last_time - now) class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定製錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒以後再重試.' extra_detail_plural = '請 {wait} 秒以後再重試.' raise Throttled(wait)
2.基於用戶IP顯示訪問頻率(利於Django緩存)數據庫
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'test_scope': '10/m', }, }
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import SimpleRateThrottle class TestThrottle(SimpleRateThrottle): # 配置文件定義的顯示頻率的Key scope = "test_scope" def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定製錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒以後再重試.' extra_detail_plural = '請 {wait} 秒以後再重試.' raise Throttled(wait)
3.view中限制請求頻率django
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'xxxxxx': '10/m', }, }
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import ScopedRateThrottle # 繼承 ScopedRateThrottle class TestThrottle(ScopedRateThrottle): def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] # 在settings中獲取 xxxxxx 對應的頻率限制值 throttle_scope = "xxxxxx" def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定製錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒以後再重試.' extra_detail_plural = '請 {wait} 秒以後再重試.' raise Throttled(wait)
4. 匿名時用IP限制+登陸時用Token限制json
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': { 'luffy_anon': '10/m', 'luffy_user': '20/m', }, }
from django.conf.urls import url, include from web.views.s3_throttling import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle): """ 匿名用戶,根據IP進行限制 """ scope = "luffy_anon" def get_cache_key(self, request, view): # 用戶已登陸,則跳過 匿名頻率限制 if request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class LuffyUserRateThrottle(SimpleRateThrottle): """ 登陸用戶,根據用戶token限制 """ scope = "luffy_user" def get_ident(self, request): """ 認證成功時:request.user是用戶對象;request.auth是token對象 :param request: :return: """ # return request.auth.token return "user_token" def get_cache_key(self, request, view): """ 獲取緩存key :param request: :param view: :return: """ # 未登陸用戶,則跳過 Token限制 if not request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class TestView(APIView): throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
5.全局使用vim
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES': [ 'api.utils.throttles.throttles.LuffyAnonRateThrottle', 'api.utils.throttles.throttles.LuffyUserRateThrottle', ], 'DEFAULT_THROTTLE_RATES': { 'anon': '10/day', 'user': '10/day', 'luffy_anon': '10/m', 'luffy_user': '20/m', }, }
序列化用於對用戶請求數據進行驗證和數據進行序列化。
class PublishSerializers(serializers.Serializer): name = serializers.CharField() email = serializers.CharField() class PublishModelSerializers(serializers.ModelSerializer): class Meta: model=Publish fields="__all__" # queryset或者model對象-------------》json數據 ps=PublishSerializers(queryset,many=True) ps.data # [{},{},{}] ps=PublishSerializers(model_obj,many=False) ps.data # {} # json數據-------》記錄 # 添加操做 ps=PublishSerializers(data=request.data) if ps.is_valid(): ps.save() # create # 更新操做 ps=PublishSerializers(model_obj,data=request.data) if ps.is_valid(): ps.save() # update
1.自定義字段
from django.conf.urls import url, include from web.views.serializers import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from .. import models class PasswordValidator(object): def __init__(self, base): self.base = base def __call__(self, value): if value != self.base: message = 'This field must be %s.' % self.base raise serializers.ValidationError(message) def set_context(self, serializer_field): """ This hook is called by the serializer instance, prior to the validation call being made. """ # 執行驗證以前調用,serializer_fields是當前字段對象 pass class UserSerializer(serializers.Serializer): ut_title = serializers.CharField(source='ut.title') user = serializers.CharField(min_length=6) pwd = serializers.CharField(error_messages={'required': '密碼不能爲空'}, validators=[PasswordValidator('666')]) class TestView(APIView): def get(self, request, *args, **kwargs): # 序列化,將數據庫查詢字段序列化爲字典 data_list = models.UserInfo.objects.all() ser = UserSerializer(instance=data_list, many=True) # 或 # obj = models.UserInfo.objects.all().first() # ser = UserSerializer(instance=obj, many=False) return Response(ser.data) def post(self, request, *args, **kwargs): # 驗證,對請求發來的數據進行驗證 ser = UserSerializer(data=request.data) if ser.is_valid(): print(ser.validated_data) else: print(ser.errors) return Response('POST請求,響應內容')
2.基於model自動生成字段
from django.conf.urls import url, include from web.views.serializers import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from .. import models class PasswordValidator(object): def __init__(self, base): self.base = str(base) def __call__(self, value): if value != self.base: message = 'This field must be %s.' % self.base raise serializers.ValidationError(message) def set_context(self, serializer_field): """ This hook is called by the serializer instance, prior to the validation call being made. """ # 執行驗證以前調用,serializer_fields是當前字段對象 pass class ModelUserSerializer(serializers.ModelSerializer): user = serializers.CharField(max_length=32) class Meta: model = models.UserInfo fields = "__all__" # fields = ['user', 'pwd', 'ut'] depth = 2 extra_kwargs = {'user': {'min_length': 6}, 'pwd': {'validators': [PasswordValidator(666), ]}} # read_only_fields = ['user'] class TestView(APIView): def get(self, request, *args, **kwargs): # 序列化,將數據庫查詢字段序列化爲字典 data_list = models.UserInfo.objects.all() ser = ModelUserSerializer(instance=data_list, many=True) # 或 # obj = models.UserInfo.objects.all().first() # ser = UserSerializer(instance=obj, many=False) return Response(ser.data) def post(self, request, *args, **kwargs): # 驗證,對請求發來的數據進行驗證 print(request.data) ser = ModelUserSerializer(data=request.data) if ser.is_valid(): print(ser.validated_data) else: print(ser.errors) return Response('POST請求,響應內容')
實例:基於model生成字段+自定義one2one/fk/choice/m2m字段
知識點:
url(r'^course/$', course.CourseView.as_view({'get':'list'})), url(r'^course/(?P<pk>\d+)$', course.CourseView.as_view({'get':'retrieve'})),
# -*- coding: utf-8 -*- """ @Datetime: 2019/1/16 @Author: Zhang Yafei """ from rest_framework.views import APIView from rest_framework.response import Response from django.http import HttpResponse from rest_framework.renderers import JSONRenderer,BrowsableAPIRenderer,AdminRenderer from rest_framework.versioning import QueryParameterVersioning,URLPathVersioning from api import models from rest_framework import serializers class CourseSerializer(serializers.ModelSerializer): level = serializers.CharField(source='get_level_display') class Meta: model = models.Course fields = ['id','title', 'course_img', 'level'] class CourseDetailSerializer(serializers.ModelSerializer): # one2one/fk/choice title = serializers.CharField(source='course.title') img = serializers.CharField(source='course.course_img') level = serializers.CharField(source='course.get_level_display') # m2m recommends = serializers.SerializerMethodField() chapters = serializers.SerializerMethodField() class Meta: model = models.CourseDetail fields = ['course', 'title', 'img', 'level','slogon', 'why', 'recommends','chapters'] def get_recommends(self, obj): # 獲取全部推薦的課程 queryset = obj.recommend_courses.all() return [{'id':row.id,'title':row.title} for row in queryset] def get_chapters(self, obj): # 獲取指定課程全部章節 queryset = obj.course.chapters_set.all() return [{'id':row.id,'name':row.name} for row in queryset] from rest_framework.viewsets import GenericViewSet,ViewSetMixin class CourseView(ViewSetMixin, APIView): def list(self, request, *args, **kwargs): """ 課程列表接口 :param request: :param args: :param kwargs: :return: """ ret = {'code':1000, 'data':None} try: queryset = models.Course.objects.all() ser = CourseSerializer(instance=queryset, many=True) ret['data'] = ser.data except Exception: ret['code'] = 1001 ret['error'] = '獲取課程失敗' return Response(ret) def retrieve(self, request, *args, **kwargs): """ 課程詳細接口 :param request: :param args: :param kwargs: :return: """ ret = {'code':1000, 'data':None} try: # 課程ID=2 pk = kwargs.get('pk') # 課程詳細對象 obj = models.CourseDetail.objects.filter(course_id=pk).first() ser = CourseDetailSerializer(instance=obj, many=False) ret['data'] = ser.data except Exception: ret['code'] = 1001 ret['error'] = '獲取課程失敗' return Response(ret)
3.生成url
from django.conf.urls import url, include from web.views.serializers import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), url(r'detail/(?P<pk>\d+)/', TestView.as_view(), name='detail'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from .. import models class PasswordValidator(object): def __init__(self, base): self.base = str(base) def __call__(self, value): if value != self.base: message = 'This field must be %s.' % self.base raise serializers.ValidationError(message) def set_context(self, serializer_field): """ This hook is called by the serializer instance, prior to the validation call being made. """ # 執行驗證以前調用,serializer_fields是當前字段對象 pass class ModelUserSerializer(serializers.ModelSerializer): ut = serializers.HyperlinkedIdentityField(view_name='detail') class Meta: model = models.UserInfo fields = "__all__" extra_kwargs = { 'user': {'min_length': 6}, 'pwd': {'validators': [PasswordValidator(666),]}, } class TestView(APIView): def get(self, request, *args, **kwargs): # 序列化,將數據庫查詢字段序列化爲字典 data_list = models.UserInfo.objects.all() ser = ModelUserSerializer(instance=data_list, many=True, context={'request': request}) # 或 # obj = models.UserInfo.objects.all().first() # ser = UserSerializer(instance=obj, many=False) return Response(ser.data) def post(self, request, *args, **kwargs): # 驗證,對請求發來的數據進行驗證 print(request.data) ser = ModelUserSerializer(data=request.data) if ser.is_valid(): print(ser.validated_data) else: print(ser.errors) return Response('POST請求,響應內容')
4.自動生成url
from django.conf.urls import url, include from web.views.serializers import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), url(r'detail/(?P<pk>\d+)/', TestView.as_view(), name='xxxx'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from .. import models class PasswordValidator(object): def __init__(self, base): self.base = str(base) def __call__(self, value): if value != self.base: message = 'This field must be %s.' % self.base raise serializers.ValidationError(message) def set_context(self, serializer_field): """ This hook is called by the serializer instance, prior to the validation call being made. """ # 執行驗證以前調用,serializer_fields是當前字段對象 pass class ModelUserSerializer(serializers.HyperlinkedModelSerializer): ll = serializers.HyperlinkedIdentityField(view_name='xxxx') tt = serializers.CharField(required=False) class Meta: model = models.UserInfo fields = "__all__" list_serializer_class = serializers.ListSerializer extra_kwargs = { 'user': {'min_length': 6}, 'pwd': {'validators': [PasswordValidator(666), ]}, 'url': {'view_name': 'xxxx'}, 'ut': {'view_name': 'xxxx'}, } class TestView(APIView): def get(self, request, *args, **kwargs): # # 序列化,將數據庫查詢字段序列化爲字典 data_list = models.UserInfo.objects.all() ser = ModelUserSerializer(instance=data_list, many=True, context={'request': request}) # # 若是Many=True # # 或 # # obj = models.UserInfo.objects.all().first() # # ser = UserSerializer(instance=obj, many=False) return Response(ser.data) def post(self, request, *args, **kwargs): # 驗證,對請求發來的數據進行驗證 print(request.data) ser = ModelUserSerializer(data=request.data) if ser.is_valid(): print(ser.validated_data) else: print(ser.errors) return Response('POST請求,響應內容')
1.自定義路由
from django.conf.urls import url, include from web.views import render urlpatterns = [ url(r'^test/$', render.TestView.as_view()), url(r'^test\.(?P<format>[a-z0-9]+)$', render.TestView.as_view()), url(r'^test/(?P<pk>[^/.]+)/$', render.TestView.as_view()), url(r'^test/(?P<pk>[^/.]+)\.(?P<format>[a-z0-9]+)$', render.TestView.as_view()) ]
from rest_framework.views import APIView from rest_framework.response import Response from .. import models class TestView(APIView): def get(self, request, *args, **kwargs): print(kwargs) print(self.renderer_classes) return Response('...')
2.半自動路由
from django.conf.urls import url, include from web.views import generic urlpatterns = [ url(r'^test/$', generic.UserViewSet.as_view({'get': 'list', 'post': 'create'})), url(r'^test/(?P<pk>\d+)/$', generic.UserViewSet.as_view( {'get': 'retrieve', 'put': 'update', 'patch': 'partial_update', 'delete': 'destroy'})), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.viewsets import ModelViewSet from rest_framework import serializers from .. import models class UserSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class UserViewSet(ModelViewSet): queryset = models.UserInfo.objects.all() serializer_class = UserSerializer
3.全自動路由
from django.conf.urls import url, include from rest_framework import routers from web.views import generic router = routers.DefaultRouter() router.register(r'users', generic.UserViewSet) urlpatterns = [ url(r'^', include(router.urls)), ]
from rest_framework.viewsets import ModelViewSet from rest_framework import serializers from .. import models class UserSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class UserViewSet(ModelViewSet): queryset = models.UserInfo.objects.all() serializer_class = UserSerializer
視圖類的繼承
class View(object): class APIView(View): #什麼都本身寫 class GenericAPIView(views.APIView): #許多默認方法 class GenericViewSet(ViewSetMixin, generics.GenericAPIView): # as_view能夠加參數 class ModelViewSet(mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, mixins.ListModelMixin, GenericViewSet): # 自帶有增刪改查方法,但只能對單表操做
# Book表 class BookView(APIView): def get(self,request): book_list=Book.objects.all() bs=BookModelSerializers(book_list,many=True,context={'request': request}) return Response(bs.data) def post(self,request): # post請求的數據 bs=BookModelSerializers(data=request.data) if bs.is_valid(): print(bs.validated_data) bs.save()# create方法 return Response(bs.data) else: return Response(bs.errors) class BookDetailView(APIView): def get(self,request,id): book=Book.objects.filter(pk=id).first() bs=BookModelSerializers(book,context={'request': request}) return Response(bs.data) def put(self,request,id): book=Book.objects.filter(pk=id).first() bs=BookModelSerializers(book,data=request.data) if bs.is_valid(): bs.save() return Response(bs.data) else: return Response(bs.errors) def delete(self,request,id): Book.objects.filter(pk=id).delete() return Response()
from rest_framework import mixins from rest_framework import generics class AuthorView(mixins.ListModelMixin,mixins.CreateModelMixin,generics.GenericAPIView): queryset=Author.objects.all() serializer_class =AuthorModelSerializers def get(self,request, *args, **kwargs): return self.list(request, *args, **kwargs) def post(self,request, *args, **kwargs): return self.create(request, *args, **kwargs) class AuthorDetailView(mixins.RetrieveModelMixin,mixins.DestroyModelMixin,mixins.UpdateModelMixin,generics.GenericAPIView): queryset = Author.objects.all() serializer_class = AuthorModelSerializers def get(self,request,*args, **kwargs): return self.retrieve(request,*args, **kwargs) def delete(self,request,*args, **kwargs): return self.destroy(request,*args, **kwargs) def put(self,request,*args, **kwargs): return self.retrieve(request,*args, **kwargs)
from rest_framework import mixins from rest_framework import generics class AuthorView(generics.ListCreateAPIView): queryset=Author.objects.all() serializer_class =AuthorModelSerializers class AuthorDetailView(generics.RetrieveUpdateDestroyAPIView): queryset = Author.objects.all() serializer_class = AuthorModelSerializers
class AuthorModelView(viewsets.ModelViewSet): queryset = Author.objects.all() serializer_class = AuthorModelSerializers url(r'^authors/$', views.AuthorModelView.as_view({"get":"list","post":"create"}),name="author"), url(r'^authors/(?P<pk>\d+)/$', views.AuthorModelView.as_view({"get":"retrieve","put" 流程: url(r'^authors/$', views.AuthorModelView.as_view({"get":"list","post":"create"}),name="author"), url(r'^authors/$', ViewSetMixin.as_view({"get":"list","post":"create"}),name="author"), url(r'^authors/$', ViewSetMixin類下的view), 一旦訪問 /authors/: ViewSetMixin def view(): for method, action in actions.items(): # {"get":"list","post":"create"} handler = getattr(self, action) # self.list self.create setattr(self, method, handler) self.dispatch(request, *args, **kwargs) APIView類下的self.dispatch # 分發 if request.method.lower() in self.http_method_names: handler = getattr(self,request.method.lower(), self.http_method_not_allowed) response = handler(request, *args, **kwargs) # self.list() return response
1. 使用mixins和generics編寫視圖
from rest_framework import mixins from rest_framework import generics class BookViewSet(mixins.ListModelMixin, mixins.CreateModelMixin, generics.GenericAPIView): queryset = Book.objects.all() serializer_class = BookSerializers def get(self, request, *args, **kwargs): return self.list(request, *args, **kwargs) def post(self, request, *args, **kwargs): return self.create(request, *args, **kwargs) class BookDetailViewSet(mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, generics.GenericAPIView): queryset = Book.objects.all() serializer_class = BookSerializers def get(self, request, *args, **kwargs): return self.retrieve(request, *args, **kwargs) def put(self, request, *args, **kwargs): return self.update(request, *args, **kwargs) def delete(self, request, *args, **kwargs): return self.destroy(request, *args, **kwargs)
2.使用通用的基於類的視圖
經過使用mixin類,咱們使用更少的代碼重寫了這些視圖,但咱們還能夠再進一步。REST框架提供了一組已經混合好(mixed-in)的通用視圖,咱們可使用它來簡化咱們的views.py
模塊。
from rest_framework import mixins from rest_framework import generics class BookViewSet(generics.ListCreateAPIView): queryset = Book.objects.all() serializer_class = BookSerializers class BookDetailViewSet(generics.RetrieveUpdateDestroyAPIView): queryset = Book.objects.all() serializer_class = BookSerializers class PublishViewSet(generics.ListCreateAPIView): queryset = Publish.objects.all() serializer_class = PublshSerializers class PublishDetailViewSet(generics.RetrieveUpdateDestroyAPIView): queryset = Publish.objects.all() serializer_class = PublshSerializers
3. GenericViewSet
from django.conf.urls import url, include from web.views.viewset import TestView urlpatterns = [ url(r'test/', TestView.as_view({'get':'list'}), name='test'), url(r'detail/(?P<pk>\d+)/', TestView.as_view({'get':'list'}), name='xxxx'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework import viewsets from rest_framework.response import Response class TestView(viewsets.GenericViewSet): def list(self, request, *args, **kwargs): return Response('...') def add(self, request, *args, **kwargs): pass def delete(self, request, *args, **kwargs): pass def edit(self, request, *args, **kwargs): pass
4. ModelViewSet(自定義URL)
from django.conf.urls import url, include from web.views import generic urlpatterns = [ url(r'^test/$', generic.UserViewSet.as_view({'get': 'list', 'post': 'create'})), url(r'^test/(?P<pk>\d+)/$', generic.UserViewSet.as_view( {'get': 'retrieve', 'put': 'update', 'patch': 'partial_update', 'delete': 'destroy'})), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.viewsets import ModelViewSet from rest_framework import serializers from .. import models class UserSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class UserViewSet(ModelViewSet): queryset = models.UserInfo.objects.all() serializer_class = UserSerializer
5. ModelViewSet(rest framework路由)
from django.conf.urls import url, include from rest_framework import routers from app01 import views router = routers.DefaultRouter() router.register(r'users', views.UserViewSet) router.register(r'groups', views.GroupViewSet) # Wire up our API using automatic URL routing. # Additionally, we include login URLs for the browsable API. urlpatterns = [ url(r'^', include(router.urls)), ]
from rest_framework import viewsets from rest_framework import serializers class UserSerializer(serializers.HyperlinkedModelSerializer): class Meta: model = models.User fields = ('url', 'username', 'email', 'groups') class GroupSerializer(serializers.HyperlinkedModelSerializer): class Meta: model = models.Group fields = ('url', 'name') class UserViewSet(viewsets.ModelViewSet): """ API endpoint that allows users to be viewed or edited. """ queryset = User.objects.all().order_by('-date_joined') serializer_class = UserSerializer class GroupViewSet(viewsets.ModelViewSet): """ API endpoint that allows groups to be viewed or edited. """ queryset = Group.objects.all() serializer_class = GroupSerializer
a. 根據頁碼進行分頁
from django.conf.urls import url, include from rest_framework import routers from web.views import s9_pagination urlpatterns = [ url(r'^test/', s9_pagination.UserViewSet.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework import serializers from .. import models from rest_framework.pagination import PageNumberPagination class StandardResultsSetPagination(PageNumberPagination): # 默認每頁顯示的數據條數 page_size = 1 # 獲取URL參數中設置的每頁顯示數據條數 page_size_query_param = 'page_size' # 獲取URL參數中傳入的頁碼key page_query_param = 'page' # 最大支持的每頁顯示的數據條數 max_page_size = 1 class UserSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class UserViewSet(APIView): def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all().order_by('-id') # 實例化分頁對象,獲取數據庫中的分頁數據 paginator = StandardResultsSetPagination() page_user_list = paginator.paginate_queryset(user_list, self.request, view=self) # 序列化對象 serializer = UserSerializer(page_user_list, many=True) # 生成分頁和數據 response = paginator.get_paginated_response(serializer.data) return response
b. 位置和個數進行分頁
from django.conf.urls import url, include from web.views import s9_pagination urlpatterns = [ url(r'^test/', s9_pagination.UserViewSet.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework import serializers from .. import models from rest_framework.pagination import PageNumberPagination,LimitOffsetPagination class StandardResultsSetPagination(LimitOffsetPagination): # 默認每頁顯示的數據條數 default_limit = 10 # URL中傳入的顯示數據條數的參數 limit_query_param = 'limit' # URL中傳入的數據位置的參數 offset_query_param = 'offset' # 最大每頁顯得條數 max_limit = None class UserSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class UserViewSet(APIView): def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all().order_by('-id') # 實例化分頁對象,獲取數據庫中的分頁數據 paginator = StandardResultsSetPagination() page_user_list = paginator.paginate_queryset(user_list, self.request, view=self) # 序列化對象 serializer = UserSerializer(page_user_list, many=True) # 生成分頁和數據 response = paginator.get_paginated_response(serializer.data) return response
c. 遊標分頁
from django.conf.urls import url, include from web.views import s9_pagination urlpatterns = [ url(r'^test/', s9_pagination.UserViewSet.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework import serializers from .. import models from rest_framework.pagination import PageNumberPagination, LimitOffsetPagination, CursorPagination class StandardResultsSetPagination(CursorPagination): # URL傳入的遊標參數 cursor_query_param = 'cursor' # 默認每頁顯示的數據條數 page_size = 2 # URL傳入的每頁顯示條數的參數 page_size_query_param = 'page_size' # 每頁顯示數據最大條數 max_page_size = 1000 # 根據ID從大到小排列 ordering = "id" class UserSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class UserViewSet(APIView): def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all().order_by('-id') # 實例化分頁對象,獲取數據庫中的分頁數據 paginator = StandardResultsSetPagination() page_user_list = paginator.paginate_queryset(user_list, self.request, view=self) # 序列化對象 serializer = UserSerializer(page_user_list, many=True) # 生成分頁和數據 response = paginator.get_paginated_response(serializer.data) return response
d.局部分頁
http://127.0.0.1:8000/authors/?token=95631b025a67a0d640c33862d1788293&page=1&size=2 from rest_framework.pagination import PageNumberPagination,LimitOffsetPagination class MyPageNumberPagination(PageNumberPagination): page_size = 1 page_query_param = 'page' page_size_query_param = "size" max_page_size = 2 class AuthorModelView(viewsets.ModelViewSet): # authentication_classes = [TokenAuth, ] # permission_classes = [] throttle_classes = [VisitRateThrottle, ] # 限制某個IP每分鐘訪問次數不能超過20次 # parser_classes = [] pagination_class = MyPageNumberPagination queryset = Author.objects.all() serializer_class = AuthorModelSerializers
e.全局分頁
REST_FRAMEWORK={ "DEFAULT_PAGINATION_CLASS": ['app01.utils.MyPageNumberPagination'] }
根據請求頭 content-type 選擇對應的解析器就請求體內容進行處理。
""" Parsers are used to parse the content of incoming HTTP requests. They give us a generic way of being able to handle various media types on the request, such as form content or json encoded data. """ from __future__ import unicode_literals import codecs from django.conf import settings from django.core.files.uploadhandler import StopFutureHandlers from django.http import QueryDict from django.http.multipartparser import ChunkIter from django.http.multipartparser import \ MultiPartParser as DjangoMultiPartParser from django.http.multipartparser import MultiPartParserError, parse_header from django.utils import six from django.utils.encoding import force_text from django.utils.six.moves.urllib import parse as urlparse from rest_framework import renderers from rest_framework.exceptions import ParseError from rest_framework.settings import api_settings from rest_framework.utils import json class DataAndFiles(object): def __init__(self, data, files): self.data = data self.files = files class BaseParser(object): """ All parsers should extend `BaseParser`, specifying a `media_type` attribute, and overriding the `.parse()` method. """ media_type = None def parse(self, stream, media_type=None, parser_context=None): """ Given a stream to read from, return the parsed representation. Should return parsed data, or a `DataAndFiles` object consisting of the parsed data and files. """ raise NotImplementedError(".parse() must be overridden.") class JSONParser(BaseParser): """ Parses JSON-serialized data. """ media_type = 'application/json' renderer_class = renderers.JSONRenderer strict = api_settings.STRICT_JSON def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as JSON and returns the resulting data. """ parser_context = parser_context or {} encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) try: decoded_stream = codecs.getreader(encoding)(stream) parse_constant = json.strict_constant if self.strict else None return json.load(decoded_stream, parse_constant=parse_constant) except ValueError as exc: raise ParseError('JSON parse error - %s' % six.text_type(exc)) class FormParser(BaseParser): """ Parser for form data. """ media_type = 'application/x-www-form-urlencoded' def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as a URL encoded form, and returns the resulting QueryDict. """ parser_context = parser_context or {} encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) data = QueryDict(stream.read(), encoding=encoding) return data class MultiPartParser(BaseParser): """ Parser for multipart form data, which may include file data. """ media_type = 'multipart/form-data' def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as a multipart encoded form, and returns a DataAndFiles object. `.data` will be a `QueryDict` containing all the form parameters. `.files` will be a `QueryDict` containing all the form files. """ parser_context = parser_context or {} request = parser_context['request'] encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) meta = request.META.copy() meta['CONTENT_TYPE'] = media_type upload_handlers = request.upload_handlers try: parser = DjangoMultiPartParser(meta, stream, upload_handlers, encoding) data, files = parser.parse() return DataAndFiles(data, files) except MultiPartParserError as exc: raise ParseError('Multipart form parse error - %s' % six.text_type(exc)) class FileUploadParser(BaseParser): """ Parser for file upload data. """ media_type = '*/*' errors = { 'unhandled': 'FileUpload parse error - none of upload handlers can handle the stream', 'no_filename': 'Missing filename. Request should include a Content-Disposition header with a filename parameter.', } def parse(self, stream, media_type=None, parser_context=None): """ Treats the incoming bytestream as a raw file upload and returns a `DataAndFiles` object. `.data` will be None (we expect request body to be a file content). `.files` will be a `QueryDict` containing one 'file' element. """ parser_context = parser_context or {} request = parser_context['request'] encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) meta = request.META upload_handlers = request.upload_handlers filename = self.get_filename(stream, media_type, parser_context) if not filename: raise ParseError(self.errors['no_filename']) # Note that this code is extracted from Django's handling of # file uploads in MultiPartParser. content_type = meta.get('HTTP_CONTENT_TYPE', meta.get('CONTENT_TYPE', '')) try: content_length = int(meta.get('HTTP_CONTENT_LENGTH', meta.get('CONTENT_LENGTH', 0))) except (ValueError, TypeError): content_length = None # See if the handler will want to take care of the parsing. for handler in upload_handlers: result = handler.handle_raw_input(stream, meta, content_length, None, encoding) if result is not None: return DataAndFiles({}, {'file': result[1]}) # This is the standard case. possible_sizes = [x.chunk_size for x in upload_handlers if x.chunk_size] chunk_size = min([2 ** 31 - 4] + possible_sizes) chunks = ChunkIter(stream, chunk_size) counters = [0] * len(upload_handlers) for index, handler in enumerate(upload_handlers): try: handler.new_file(None, filename, content_type, content_length, encoding) except StopFutureHandlers: upload_handlers = upload_handlers[:index + 1] break for chunk in chunks: for index, handler in enumerate(upload_handlers): chunk_length = len(chunk) chunk = handler.receive_data_chunk(chunk, counters[index]) counters[index] += chunk_length if chunk is None: break for index, handler in enumerate(upload_handlers): file_obj = handler.file_complete(counters[index]) if file_obj is not None: return DataAndFiles({}, {'file': file_obj}) raise ParseError(self.errors['unhandled']) def get_filename(self, stream, media_type, parser_context): """ Detects the uploaded file name. First searches a 'filename' url kwarg. Then tries to parse Content-Disposition header. """ try: return parser_context['kwargs']['filename'] except KeyError: pass try: meta = parser_context['request'].META disposition = parse_header(meta['HTTP_CONTENT_DISPOSITION'].encode('utf-8')) filename_parm = disposition[1] if 'filename*' in filename_parm: return self.get_encoded_filename(filename_parm) return force_text(filename_parm['filename']) except (AttributeError, KeyError, ValueError): pass def get_encoded_filename(self, filename_parm): """ Handle encoded filenames per RFC6266. See also: https://tools.ietf.org/html/rfc2231#section-4 """ encoded_filename = force_text(filename_parm['filename*']) try: charset, lang, filename = encoded_filename.split('\'', 2) filename = urlparse.unquote(filename) except (ValueError, LookupError): filename = force_text(filename_parm['filename']) return filename
a. 僅處理請求頭content-type爲application/json的請求體
from django.conf.urls import url, include from web.views.s5_parser import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.parsers import JSONParser class TestView(APIView): parser_classes = [JSONParser, ] def post(self, request, *args, **kwargs): print(request.content_type) # 獲取請求的值,並使用對應的JSONParser進行處理 print(request.data) # application/x-www-form-urlencoded 或 multipart/form-data時,request.POST中才有值 print(request.POST) print(request.FILES) return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
b. 僅處理請求頭content-type爲application/x-www-form-urlencoded 的請求體
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.parsers import FormParser class TestView(APIView): parser_classes = [FormParser, ] def post(self, request, *args, **kwargs): print(request.content_type) # 獲取請求的值,並使用對應的JSONParser進行處理 print(request.data) # application/x-www-form-urlencoded 或 multipart/form-data時,request.POST中才有值 print(request.POST) print(request.FILES) return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
c. 僅處理請求頭content-type爲multipart/form-data的請求體
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.parsers import MultiPartParser class TestView(APIView): parser_classes = [MultiPartParser, ] def post(self, request, *args, **kwargs): print(request.content_type) # 獲取請求的值,並使用對應的JSONParser進行處理 print(request.data) # application/x-www-form-urlencoded 或 multipart/form-data時,request.POST中才有值 print(request.POST) print(request.FILES) return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form action="http://127.0.0.1:8000/test/" method="post" enctype="multipart/form-data"> <input type="text" name="user" /> <input type="file" name="img"> <input type="submit" value="提交"> </form> </body> </html>
d. 僅上傳文件
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'test/(?P<filename>[^/]+)', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.parsers import FileUploadParser class TestView(APIView): parser_classes = [FileUploadParser, ] def post(self, request, filename, *args, **kwargs): print(filename) print(request.content_type) # 獲取請求的值,並使用對應的JSONParser進行處理 print(request.data) # application/x-www-form-urlencoded 或 multipart/form-data時,request.POST中才有值 print(request.POST) print(request.FILES) return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form action="http://127.0.0.1:8000/test/f1.numbers" method="post" enctype="multipart/form-data"> <input type="text" name="user" /> <input type="file" name="img"> <input type="submit" value="提交"> </form> </body> </html>
e. 同時多個Parser
當同時使用多個parser時,rest framework會根據請求頭content-type自動進行比對,並使用對應parser
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.parsers import JSONParser, FormParser, MultiPartParser class TestView(APIView): parser_classes = [JSONParser, FormParser, MultiPartParser, ] def post(self, request, *args, **kwargs): print(request.content_type) # 獲取請求的值,並使用對應的JSONParser進行處理 print(request.data) # application/x-www-form-urlencoded 或 multipart/form-data時,request.POST中才有值 print(request.POST) print(request.FILES) return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
f. 全局使用
REST_FRAMEWORK = { 'DEFAULT_PARSER_CLASSES':[ 'rest_framework.parsers.JSONParser' 'rest_framework.parsers.FormParser' 'rest_framework.parsers.MultiPartParser' ] }
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'test/', TestView.as_view(), name='test'), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response class TestView(APIView): def post(self, request, *args, **kwargs): print(request.content_type) # 獲取請求的值,並使用對應的JSONParser進行處理 print(request.data) # application/x-www-form-urlencoded 或 multipart/form-data時,request.POST中才有值 print(request.POST) print(request.FILES) return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
注意:個別特殊的值能夠經過Django的request對象 request._request 來進行獲取
根據 用戶請求URL 或 用戶可接受的類型,篩選出合適的 渲染組件。
用戶請求URL:
用戶請求頭:
1.settings添加配置:
REST_FRAMEWORK = { 'DEFAULT_RENDERER_CLASSES':['rest_framework.renderers.JSONRenderer'], }
2. 設置路由
from django.contrib import admin from django.urls import path from django.urls import include urlpatterns = [ path('admin/', admin.site.urls), path('api/', include('api.urls'), name='api'), ]
from django.conf.urls import url, include from api.views import course urlpatterns = [ url(r'^course/$', course.CourseView.as_view()), ]
from rest_framework.views import APIView from rest_framework.response import Response from django.http import HttpResponse from rest_framework.renderers import JSONRenderer,BrowsableAPIRenderer class CourseView(APIView): # renderer_classes = [JSONRenderer,BrowsableAPIRenderer] def get(self, request, *args, **kwargs): return Response('...') # return HttpResponse('...')
3. 訪問URL
訪問URL:
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from rest_framework.renderers import AdminRenderer from .. import models class TestSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class TestView(APIView): renderer_classes = [AdminRenderer, ] def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all() ser = TestSerializer(instance=user_list, many=True) return Response(ser.data)
訪問URL:
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from rest_framework.renderers import JSONRenderer from rest_framework.renderers import AdminRenderer from rest_framework.renderers import HTMLFormRenderer from .. import models class TestSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class TestView(APIView): renderer_classes = [HTMLFormRenderer, ] def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all().first() ser = TestSerializer(instance=user_list, many=False) return Response(ser.data)
訪問URL:
from django.conf.urls import url, include from web.views import s11_render urlpatterns = [ url(r'^test/$', s11_render.TestView.as_view()), url(r'^test\.(?P<format>[a-z0-9]+)', s11_render.TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from rest_framework.renderers import TemplateHTMLRenderer from .. import models class TestSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class TestView(APIView): renderer_classes = [TemplateHTMLRenderer, ] def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all().first() ser = TestSerializer(instance=user_list, many=False) return Response(ser.data, template_name='user_detail.html')
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> {{ user }} {{ pwd }} {{ ut }} </body> </html>
訪問URL:
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import serializers from rest_framework.renderers import JSONRenderer from rest_framework.renderers import BrowsableAPIRenderer from .. import models class TestSerializer(serializers.ModelSerializer): class Meta: model = models.UserInfo fields = "__all__" class CustomBrowsableAPIRenderer(BrowsableAPIRenderer): def get_default_renderer(self, view): return JSONRenderer() class TestView(APIView): renderer_classes = [CustomBrowsableAPIRenderer, ] def get(self, request, *args, **kwargs): user_list = models.UserInfo.objects.all().first() ser = TestSerializer(instance=user_list, many=False) return Response(ser.data, template_name='user_detail.html')
注意:若是同時多個存在時,自動根據URL後綴來選擇渲染器。
原理
django的請求生命週期 執行遵循wsgi協議的模塊(socket服務端) 中間件(路由匹配) 視圖函數(業務處理:ORM,模板渲染) self.dispatch方法,將請求相關信息封裝,並通過版本、認證、權限、頻率組件以後,將request傳入視圖函數,並執行視圖函數,返回response def dispatch(self, request, *args, **kwargs): """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """ self.args = args self.kwargs = kwargs # 對request進行封裝 request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: # 版本組件,認證組件,權限組件,頻率組件 self.initial(request, *args, **kwargs) # Get the appropriate handler method if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) self.response = self.finalize_response(request, response, *args, **kwargs) return self.response def initial(self, request, *args, **kwargs): """ Runs anything that needs to occur prior to calling the method handler. """ self.format_kwarg = self.get_format_suffix(**kwargs) # Perform content negotiation and store the accepted info on the request neg = self.perform_content_negotiation(request) request.accepted_renderer, request.accepted_media_type = neg # Determine the API version, if versioning is in use. # 版本組件 version, scheme = self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted # 認證組件 self.perform_authentication(request) # 權限組件 self.check_permissions(request) # 頻率組件 self.check_throttles(request) 中間件 wsgi返回
五種默認版本路由
from __future__ import unicode_literals import re from django.utils.translation import ugettext_lazy as _ from rest_framework import exceptions from rest_framework.compat import unicode_http_header from rest_framework.reverse import _reverse from rest_framework.settings import api_settings from rest_framework.templatetags.rest_framework import replace_query_param from rest_framework.utils.mediatypes import _MediaType class BaseVersioning(object): default_version = api_settings.DEFAULT_VERSION allowed_versions = api_settings.ALLOWED_VERSIONS version_param = api_settings.VERSION_PARAM def determine_version(self, request, *args, **kwargs): msg = '{cls}.determine_version() must be implemented.' raise NotImplementedError(msg.format( cls=self.__class__.__name__ )) def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra): return _reverse(viewname, args, kwargs, request, format, **extra) def is_allowed_version(self, version): if not self.allowed_versions: return True return ((version is not None and version == self.default_version) or (version in self.allowed_versions)) class AcceptHeaderVersioning(BaseVersioning): """ GET /something/ HTTP/1.1 Host: example.com Accept: application/json; version=1.0 """ invalid_version_message = _('Invalid version in "Accept" header.') def determine_version(self, request, *args, **kwargs): media_type = _MediaType(request.accepted_media_type) version = media_type.params.get(self.version_param, self.default_version) version = unicode_http_header(version) if not self.is_allowed_version(version): raise exceptions.NotAcceptable(self.invalid_version_message) return version # We don't need to implement `reverse`, as the versioning is based # on the `Accept` header, not on the request URL. class URLPathVersioning(BaseVersioning): """ To the client this is the same style as `NamespaceVersioning`. The difference is in the backend - this implementation uses Django's URL keyword arguments to determine the version. An example URL conf for two views that accept two different versions. urlpatterns = [ url(r'^(?P<version>[v1|v2]+)/users/$', users_list, name='users-list'), url(r'^(?P<version>[v1|v2]+)/users/(?P<pk>[0-9]+)/$', users_detail, name='users-detail') ] GET /1.0/something/ HTTP/1.1 Host: example.com Accept: application/json """ invalid_version_message = _('Invalid version in URL path.') def determine_version(self, request, *args, **kwargs): version = kwargs.get(self.version_param, self.default_version) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra): if request.version is not None: kwargs = {} if (kwargs is None) else kwargs kwargs[self.version_param] = request.version return super(URLPathVersioning, self).reverse( viewname, args, kwargs, request, format, **extra ) class NamespaceVersioning(BaseVersioning): """ To the client this is the same style as `URLPathVersioning`. The difference is in the backend - this implementation uses Django's URL namespaces to determine the version. An example URL conf that is namespaced into two separate versions # users/urls.py urlpatterns = [ url(r'^/users/$', users_list, name='users-list'), url(r'^/users/(?P<pk>[0-9]+)/$', users_detail, name='users-detail') ] # urls.py urlpatterns = [ url(r'^v1/', include('users.urls', namespace='v1')), url(r'^v2/', include('users.urls', namespace='v2')) ] GET /1.0/something/ HTTP/1.1 Host: example.com Accept: application/json """ invalid_version_message = _('Invalid version in URL path. Does not match any version namespace.') def determine_version(self, request, *args, **kwargs): resolver_match = getattr(request, 'resolver_match', None) if resolver_match is None or not resolver_match.namespace: return self.default_version # Allow for possibly nested namespaces. possible_versions = resolver_match.namespace.split(':') for version in possible_versions: if self.is_allowed_version(version): return version raise exceptions.NotFound(self.invalid_version_message) def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra): if request.version is not None: viewname = self.get_versioned_viewname(viewname, request) return super(NamespaceVersioning, self).reverse( viewname, args, kwargs, request, format, **extra ) def get_versioned_viewname(self, viewname, request): return request.version + ':' + viewname class HostNameVersioning(BaseVersioning): """ GET /something/ HTTP/1.1 Host: v1.example.com Accept: application/json """ hostname_regex = re.compile(r'^([a-zA-Z0-9]+)\.[a-zA-Z0-9]+\.[a-zA-Z0-9]+$') invalid_version_message = _('Invalid version in hostname.') def determine_version(self, request, *args, **kwargs): hostname, separator, port = request.get_host().partition(':') match = self.hostname_regex.match(hostname) if not match: return self.default_version version = match.group(1) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version # We don't need to implement `reverse`, as the hostname will already be # preserved as part of the REST framework `reverse` implementation. class QueryParameterVersioning(BaseVersioning): """ GET /something/?version=0.1 HTTP/1.1 Host: example.com Accept: application/json """ invalid_version_message = _('Invalid version in query parameter.') def determine_version(self, request, *args, **kwargs): version = request.query_params.get(self.version_param, self.default_version) if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version def reverse(self, viewname, args=None, kwargs=None, request=None, format=None, **extra): url = super(QueryParameterVersioning, self).reverse( viewname, args, kwargs, request, format, **extra ) if request.version is not None: return replace_query_param(url, self.version_param, request.version) return url
使用
1.添加配置
REST_FRAMEWORK = { 'DEFAULT_VERSIONING_CLASS': 'rest_framework.versioning.URLPathVersioning', 'ALLOWED_VERSIONS':['v1','v2'],# 容許的版本 'VERSION_PARAM':'version', # 參數 'DEFALUR_VERSION':'v1', # 默認版本 }
2.設置路由
django_rest_framework\urls.py urlpatterns = [ path('admin/', admin.site.urls), re_path('api/(?P<version>[v1|v2]+)/', include('api.urls'), name='api'), ] api/urls.py urlpatterns = [ url(r'^course/$', course.CourseView.as_view()), ]
3.訪問url
http://127.0.0.1:8000/api/v1/course/ # versioning_class = URLPathVersioning # http://127.0.0.1:8000/api/course/?version=v1 # versioning_class=QueryParameterVersioning
4.獲取版本
class CourseView(APIView): # 局部配置 # renderer_classes = [JSONRenderer,BrowsableAPIRenderer] # versioning_class = QueryParameterVersioning # versioning_class = URLPathVersioning def get(self, request, *args, **kwargs): print(request.version) return Response('...')