1、什麼是RESTfulpython
2、認證補充:web
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import BasePermission from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,若是驗證成功後返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 若是跳過了全部認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值爲:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值爲:None else: self.auth = None (user,token)表示驗證經過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登陸用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestPermission(BasePermission): message = "權限驗證失敗" def has_permission(self, request, view): """ 判斷是否有權限訪問當前請求 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :return: True有權限;False無權限 """ if request.user == "管理員": return True # GenericAPIView中get_object時調用 def has_object_permission(self, request, view, obj): """ 視圖繼承GenericAPIView,並在其中使用get_object時獲取對象時,觸發單獨對象權限驗證 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :param obj: :return: True有權限;False無權限 """ if request.user == "管理員": return True class TestView(APIView): # 認證的動做是由request.user觸發 authentication_classes = [TestAuthentication, ] # 權限 # 循環執行全部的權限 permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') views.py
# 2 class MyAuthtication(BasicAuthentication): 3 def authenticate(self, request): 4 token = request.query_params.get('token') #注意是沒有GET的,用query_params表示 5 if token == 'zxxzzxzc': 6 return ('uuuuuu','afsdsgdf') #返回user,auth 7 # raise AuthenticationFailed('認證錯誤') #只要拋出認證錯誤這樣的異常就會去執行下面的函數 8 raise APIException('認證錯誤') 9 def authenticate_header(self, request): #認證不成功的時候執行 10 return 'Basic reala="api"' 11 12 class UserView(APIView): 13 authentication_classes = [MyAuthtication,] 14 def get(self,request,*args,**kwargs): 15 print(request.user) 16 print(request.auth) 17 return Response('用戶列表')
django的中間件比rest_framework執行的早
認證的功能放到中間件也是能夠作的django
認證通常作,檢查用戶是否存在,若是存在request.user/request.auth;不存在request.user/request.auth=Noneapi
認證小總結:緩存
---類:authenticate/authenticate_header網絡
---返回值:None,元組(user,auth),異常架構
---配置:app
---視圖: ide
class IndexView(APIView):
authentication_classes = [MyAuthentication,]函數
---全局
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
3、權限
權限的應用:
from django.conf.urls import url from django.contrib import admin from app01 import views from app02 import views as app02_view from app03 import views as app03_view from app04 import views as app04_view urlpatterns = [ django rest framework url(r'^auth/', app02_view.AuthView.as_view()), url(r'^hosts/', app02_view.HostView.as_view()), url(r'^users/', app02_view.UserView.as_view()), url(r'^salary/', app02_view.SalaryView.as_view()), ]
from django.views import View from rest_framework.views import APIView from rest_framework.authentication import SessionAuthentication from rest_framework.authentication import BasicAuthentication from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import AllowAny from rest_framework.renderers import JSONRenderer,BrowsableAPIRenderer from rest_framework.request import Request from rest_framework.exceptions import APIException,AuthenticationFailed from rest_framework import exceptions from rest_framework.response import Response from app02 import models import hashlib import time class AuthView(APIView): authentication_classes=[] def get(self,request): """ 接收用戶名和密碼 :param request: :return: """ ret = {'code':1000,'msg':None} user = request.query_params.get('user') pwd = request.query_params.get('pwd') obj = models.UserInfo.objects.filter(username=user,password=pwd).first() if not obj: ret['code'] = 1001 ret['msg'] = "用戶名或密碼錯誤" return Response(ret) # 建立隨機字符串 ctime = time.time() key = "%s|%s" %(user,ctime) m = hashlib.md5() m.update(key.encode('utf-8')) token = m.hexdigest() # 保存到數據 obj.token = token obj.save() ret['token'] = token return Response(ret) class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # return 'Basic realm="api"' pass class MyPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user: return True return False class AdminPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user == 'alex': return True return False class HostView(APIView): """ 匿名用戶和用戶都能訪問 """ authentication_classes = [MyAuthentication,] permission_classes = [] def get(self,request,*args,**kwargs): # 原來request對象,django.core.handlers.wsgi.WSGIRequest # 如今的request對象,rest_framework.request.Request\ self.dispatch print(request.user) # print(request.user) # print(request.auth) return Response('主機列表') class UserView(APIView): """ 用戶能訪問 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,] def get(self,request,*args,**kwargs): return Response('用戶列表') class SalaryView(APIView): """ 用戶能訪問 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,AdminPermission,] def get(self,request,*args,**kwargs): self.dispatch return Response('薪資列表') def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated(detail='xxxxxxxx') raise exceptions.PermissionDenied(detail=message)
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], 'DEFAULT_PERMISSION_CLASSES':[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } }
權限才真正的作request.user/request.auth拿到它們作是否訪問的判斷
權限小總結:
---類: has_permission/has_object_permission
---返回值:True、False、exceptions.PermissionDenied(detail="錯誤信息")
---配置:
---視圖:
class IndexView(APIView):
permission_classes = [MyPermission,]
---全局:
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
4、限制訪問的頻率
限制訪問頻率的應用:
a、對匿名用戶進行限制,每一個用戶1分鐘容許訪問10次
在這裏用惟一標識:self.get_ident()
from django.conf.urls import url from django.contrib import admin from app01 import views from app02 import views as app02_view from app03 import views as app03_view from app04 import views as app04_view urlpatterns = [ # url(r'^limit/', app03_view.LimitView.as_view()), ]
from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions RECORD = { } class MyThrottle(BaseThrottle): def allow_request(self,request,view): """ # 返回False,限制 # 返回True,通行 :param request: :param view: :return: """ """ a. 對匿名用戶進行限制:每一個用戶1分鐘容許訪問10次 - 獲取用戶IP request 1.1.1.1 """ import time ctime = time.time() ip = self.get_ident() if ip not in RECORD: RECORD[ip] = [ctime,] else: # [4507862389234,3507862389234,2507862389234,1507862389234,] time_list = RECORD[ip] while True: val = time_list[-1] if (ctime-60) > val: time_list.pop() else: break if len(time_list) > 10: return False time_list.insert(0,ctime) return True def wait(self): import time ctime = time.time() first_in_time = RECORD[self.get_ident()][-1] wt = 60 - (ctime - first_in_time) return wt class MySimpleRateThrottle(SimpleRateThrottle): scope = "wdp" def get_cache_key(self, request, view): return self.get_ident(request) class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MySimpleRateThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('控制訪問頻率示例') def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """ class MyThrottled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = '還須要再等待{wait}' raise MyThrottled(wait)
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], 'DEFAULT_PERMISSION_CLASSES':[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } }
b、對匿名用戶進行限制:每一個用戶1分鐘 容許訪問5次,登陸用戶1分鐘容許訪問10次,VIP1分鐘容許訪問20次
from django.conf.urls import url from django.contrib import admin from app01 import views from app02 import views as app02_view from app03 import views as app03_view from app04 import views as app04_view urlpatterns = [ # url(r'^limit/', app03_view.LimitView.as_view()), url(r'^index/', app04_view.IndexView.as_view()), url(r'^manage/', app04_view.ManageView.as_view()), ]
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework.authentication import BaseAuthentication from app02 import models class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None def authenticate_header(self, request): pass class MyPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user: return True return False class AdminPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user == 'alex': return True return False class AnonThrottle(SimpleRateThrottle): scope = "wdp_anon" def get_cache_key(self, request, view): # 返回None,表示我不限制 # 登陸用戶我無論 if request.user: return None # 匿名用戶 return self.get_ident(request) class UserThrottle(SimpleRateThrottle): scope = "wdp_user" def get_cache_key(self, request, view): # 登陸用戶 if request.user: return request.user # 匿名用戶我無論 return None # 無需登陸就能夠訪問 class IndexView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [] throttle_classes=[AnonThrottle,UserThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('訪問首頁') # 需登陸就能夠訪問 class ManageView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] throttle_classes=[AnonThrottle,UserThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('訪問首頁')
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], 'DEFAULT_PERMISSION_CLASSES':[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } } CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache', 'LOCATION': 'cache', } }
c、基於用戶IP限制訪問頻率
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
import time from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import BaseThrottle from rest_framework.settings import api_settings # 保存訪問記錄 RECORD = { '用戶IP': [12312139, 12312135, 12312133, ] } class TestThrottle(BaseThrottle): ctime = time.time def get_ident(self, request): """ 根據用戶IP和代理IP,當作請求者的惟一IP Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get('HTTP_X_FORWARDED_FOR') remote_addr = request.META.get('REMOTE_ADDR') num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(',') client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ''.join(xff.split()) if xff else remote_addr def allow_request(self, request, view): """ 是否仍然在容許範圍內 Return `True` if the request should be allowed, `False` otherwise. :param request: :param view: :return: True,表示能夠經過;False表示已超過限制,不容許訪問 """ # 獲取用戶惟一標識(如:IP) # 容許一分鐘訪問10次 num_request = 10 time_request = 60 now = self.ctime() ident = self.get_ident(request) self.ident = ident if ident not in RECORD: RECORD[ident] = [now, ] return True history = RECORD[ident] while history and history[-1] <= now - time_request: history.pop() if len(history) < num_request: history.insert(0, now) return True def wait(self): """ 多少秒後能夠容許繼續訪問 Optionally, return a recommended number of seconds to wait before the next request. """ last_time = RECORD[self.ident][0] now = self.ctime() return int(60 + last_time - now) class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定製錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒以後再重試.' extra_detail_plural = '請 {wait} 秒以後再重試.' raise Throttled(wait)
d. 基於用戶IP顯示訪問頻率(利於Django緩存)
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'test_scope': '10/m', }, }
from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import SimpleRateThrottle class TestThrottle(SimpleRateThrottle): # 配置文件定義的顯示頻率的Key scope = "test_scope" def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定製錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒以後再重試.' extra_detail_plural = '請 {wait} 秒以後再重試.' raise Throttled(wait)
e. view中限制請求頻率
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'xxxxxx': '10/m', }, }
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'xxxxxx': '10/m', }, }
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import ScopedRateThrottle # 繼承 ScopedRateThrottle class TestThrottle(ScopedRateThrottle): def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] # 在settings中獲取 xxxxxx 對應的頻率限制值 throttle_scope = "xxxxxx" def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定製錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒以後再重試.' extra_detail_plural = '請 {wait} 秒以後再重試.' raise Throttled(wait)
f. 匿名時用IP限制+登陸時用Token限制
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': { 'luffy_anon': '10/m', 'luffy_user': '20/m', }, }
from django.conf.urls import url, include from web.views.s3_throttling import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle): """ 匿名用戶,根據IP進行限制 """ scope = "luffy_anon" def get_cache_key(self, request, view): # 用戶已登陸,則跳過 匿名頻率限制 if request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class LuffyUserRateThrottle(SimpleRateThrottle): """ 登陸用戶,根據用戶token限制 """ scope = "luffy_user" def get_ident(self, request): """ 認證成功時:request.user是用戶對象;request.auth是token對象 :param request: :return: """ # return request.auth.token return "user_token" def get_cache_key(self, request, view): """ 獲取緩存key :param request: :param view: :return: """ # 未登陸用戶,則跳過 Token限制 if not request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class TestView(APIView): throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
限制訪問的頻率小總結:
---類: allow_request/wait PS: scope = "wdp_user"
---返回值:True、False
---配置:
---視圖:
class IndexView(APIView):
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('訪問首頁')
---全局:
REST_FRAMEWORK = {
"DEFAULT_THROTTLE_CLASSES":[
],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute',
}
}
5、認證、權限、限制訪問頻率返回結果的比較:
認證、權限、限制訪問頻率這三個返回的結果:
認證返回的三種結果:
a、None
b、元組(user,auth)
c、異常 raise APIException(...)
權限的返回值三種結果:
a、True 有權限
b、False 沒權限
c、異常
限制訪問的頻率返回值的兩種結果:
a、True
b、False