問題:控制訪問頻率,在訪問的時候加上必定的次數限制python
views.pydjango
class VisitThrottle(object): def allow_request(self, request, view): return True # 能夠繼續訪問 # return False # 訪問頻率過高, 被限制 def wait(self): pass
能夠進一步的升級,限制 10s 內只能訪問3次api
import time VISIT_RECORD = {} class VisitThrottle(object): ''' 10s內只能訪問3次 ''' def allow_request(self, request, view): # 1. 獲取用戶IP remote_addr = request.META.get('REMOTE_ADDR') ctime = time.time() if remote_addr not in VISIT_RECORD: VISIT_RECORD[remote_addr] = [ctime, ] return True history = VISIT_RECORD.get(remote_addr) while history and history[-1] < ctime - 10: history.pop() if len(history) < 3: history.insert(0, ctime) return True # return True # 能夠繼續訪問 # return False # 訪問頻率過高, 被限制 def wait(self): ''' 還須要等待的時間 ''' ctime = time.time() return 60 - (ctime - self.history[-1])
和前面同樣,也是從 dispatch
開始,到 initial
緩存
def initial(self, request, *args, **kwargs): # Ensure that the incoming request is permitted self.perform_authentication(request) self.check_permissions(request) # 控制訪問頻率 self.check_throttles(request)
def check_throttles(self, request): # get_throttles 裏面是一個列表生成式 for throttle in self.get_throttles(): if not throttle.allow_request(request, self): self.throttled(request, throttle.wait())
def get_throttles(self): """ Instantiates and returns the list of throttles that this view uses. """ return [throttle() for throttle in self.throttle_classes]
throttle_classes
默認使用配置文件ide
class APIView(View): ... throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES ...
能夠添加到全局使用,首先在 utils 下新建 throttle.py,將視圖文件中的類移至 throttle.py,這裏修改了 60s內能訪問3次函數
# throttle.py import time VISIT_RECORD = {} class VisitThrottle(object): ''' 60s內只能訪問3次 ''' def __init__(self): self.history = None def allow_request(self, request, view): # 1. 獲取用戶IP remote_addr = request.META.get('REMOTE_ADDR') ctime = time.time() if remote_addr not in VISIT_RECORD: VISIT_RECORD[remote_addr] = [ctime, ] return True history = VISIT_RECORD.get(remote_addr) self.history = history while history and history[-1] < ctime - 60: history.pop() if len(history) < 3: history.insert(0, ctime) return True # return True # 能夠繼續訪問 # return False # 訪問頻率過高, 被限制 def wait(self): ''' 還須要等待的時間 ''' ctime = time.time() return 60 - (ctime - self.history[-1])
而後在配置文件 settings.py 中添加路徑post
REST_FRAMEWORK = { ... 'DEFAULT_THROTTLE_CLASSES': ['api.utils.throttle.VisitThrottle'] }
最後將視圖中的局部配置刪除便可。this
回到 check_throttles
spa
def check_throttles(self, request): for throttle in self.get_throttles(): # throttle.allow_request 爲 False,走下一步,throttled 拋出異常,表示訪問頻率過多 if not throttle.allow_request(request, self): self.throttled(request, throttle.wait())
def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """ raise exceptions.Throttled(wait)
在自定義頻率的時候,爲了更加規範,須要繼承,而且父類有獲取 IP 的方法(能夠在 BaseThrottle
中查看),所以這裏直接調用父類的方法便可rest
from rest_framework.throttling import BaseThrottle import time VISIT_RECORD = {} class VisitThrottle(BaseThrottle): ''' 60s內只能訪問3次 ''' def __init__(self): self.history = None def allow_request(self, request, view): # 1. 獲取用戶IP,調用父類的方法 remote_addr = self.get_ident(request) ctime = time.time() if remote_addr not in VISIT_RECORD: VISIT_RECORD[remote_addr] = [ctime, ] return True history = VISIT_RECORD.get(remote_addr) self.history = history while history and history[-1] < ctime - 60: history.pop() if len(history) < 3: history.insert(0, ctime) return True # return True # 能夠繼續訪問 # return False # 訪問頻率過高, 被限制 def wait(self): ''' 還須要等待的時間 ''' ctime = time.time() return 60 - (ctime - self.history[-1])
進入 BaseThrottle
,發如今其下方有個 SimpleRateThrottle
,也是繼承 BaseThrottle
。首先看 SimpleRateThrottle
的 __init__
方法
class SimpleRateThrottle(BaseThrottle): ... # 省略的內容 scope = None THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, 'rate', None): # 這裏執行了 get_rate 方法 self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate)
def get_rate(self): """ Determine the string representation of the allowed request rate. """ if not getattr(self, 'scope', None): msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: # scope其實是一個字典的 key,這裏在 THROTTLE_RATES 中取值 # 在上面的代碼中看到 THROTTLE_RATES 是一個配置項,獲取用戶自定義的配置 return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg)
至此,就能夠在配置文件中寫一個 60s內能訪問3次 的程序,讓它自動完成,無需自定義寫
throttle.py
class VisitThrottle(SimpleRateThrottle): scope = "xi" # scope做爲key使用
settings.py
REST_FRAMEWORK = { ... # 省略 'DEFAULT_THROTTLE_CLASSES': ['api.utils.throttle.VisitThrottle'], 'DEFAULT_THROTTLE_RATES' : { 'xi': '3/m' # m是分鐘,每分鐘訪問3次 } }
這時,配置了訪問次數,就會在 return self.THROTTLE_RATES[self.scope]
中獲取到,返回給 get_rate
方法,而後 __init__
中的 rate
拿到的就是 3/m
class SimpleRateThrottle(BaseThrottle): ... # 省略的內容 scope = None THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, 'rate', None): # '3/m' self.rate = self.get_rate() # 將字符串 '3/m' 當作參數傳遞給 parse_rate # 走完 parse_rate,num_requests表明3次,duration表明60s self.num_requests, self.duration = self.parse_rate(self.rate) .... # 省略 def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ # rate就是 '3/m' if rate is None: return (None, None) num, period = rate.split('/') num_requests = int(num) duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] return (num_requests, duration)
此時,構造函數走完,接着查看 allow_request
def allow_request(self, request, view): if self.rate is None: return True # 內置提供的訪問記錄放在了緩存中,經過 get_cache_key 實現 self.key = self.get_cache_key(request, view) if self.key is None: return True self.history = self.cache.get(self.key, []) self.now = self.timer()
# 來到 get_cache_key,源碼上並無寫什麼,這表示是讓咱們本身寫的 def get_cache_key(self, request, view): raise NotImplementedError('.get_cache_key() must be overridden')
# get_cache_key 其實是表示可以惟一標識的方法,因此返回值能夠是獲取IP,用來表示誰的訪問記錄 # throttle.py class VisitThrottle(SimpleRateThrottle): scope = "xi" def get_cache_key(self, request, view): return self.get_ident(request) # 獲取IP
回到 allow_request
def allow_request(self, request, view): if self.rate is None: return True # 內置提供的訪問記錄放在了緩存中,經過 get_cache_key 實現 self.key = self.get_cache_key(request, view) if self.key is None: return True # 去緩存中取出全部記錄 # cache = default_cache,是django內置的緩存 self.history = self.cache.get(self.key, []) self.now = self.timer() # timer() = time.time(),獲取當前時間 # Drop any requests from the history which have now passed the # throttle duration # 這裏與上面自定義的相同 while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() def throttle_success(self): """ Inserts the current request's timestamp along with the key into the cache. """ # 若是成功,加到歷史記錄中 self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)
照樣是前三次能夠訪問,後面再訪問須要等一分鐘,這是對匿名用戶的控制
也能夠對登陸的用戶進行控制,但在全局的設置中,不能既有匿名的,還有登陸的。這時,就能夠將登陸用戶的訪問控制設爲全局,匿名用戶使用局部的設置。
settings.py
REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ['api.utils.auth.FirstAuthentication', 'api.utils.auth.Authentication'], # 'DEFAULT_AUTHENTICATION_CLASSES': ['api.utils.auth.FirstAuthentication', ], 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_PERMISSION_CLASSES': ['api.utils.permission.SVIPPermission'], 'DEFAULT_THROTTLE_CLASSES': ['api.utils.throttle.UserThrottle'], # 登陸用戶 'DEFAULT_THROTTLE_RATES' : { 'xi': '3/m', 'xiUser': '10/m' } }
throttle.py
# 匿名用戶 class VisitThrottle(SimpleRateThrottle): scope = "xi" def get_cache_key(self, request, view): return self.get_ident(request) # 登陸用戶 class UserThrottle(SimpleRateThrottle): scope = "xiUser" def get_cache_key(self, request, view): return request.user.username
views.py
from django.shortcuts import render, HttpResponse from django.http import JsonResponse from rest_framework.views import APIView from api import models from api.utils.permission import SVIPPermission, MyPermission from api.utils.throttle import VisitThrottle ORDER_DICT = { 1: { 'name': 'qiu', 'age': 18, 'gender': '男', 'content': '...' }, 2: { 'name': 'xi', 'age': 19, 'gender': '男', 'content': '.....' } } def md5(user): import hashlib import time ctime = str(time.time()) m = hashlib.md5(bytes(user, encoding='utf-8')) m.update(bytes(ctime, encoding='utf-8')) return m.hexdigest() class AuthView(APIView): authentication_classes = [] permission_classes = [] throttle_classes = [VisitThrottle] # 爲匿名用戶設置頻率控制 def post(self, request, *args, **kwargs): ret = {'code': 1000, 'msg': None} try: user = request._request.POST.get('username') pwd = request._request.POST.get('password') obj = models.UerInfo.objects.filter(username=user, password=pwd).first() if not obj: ret['code'] = 1001 ret['msg'] = '用戶名或密碼錯誤' # 爲登陸用戶建立token else: token = md5(user) # 存在就更新, 不存在就建立 models.UserToken.objects.update_or_create(user=obj, defaults={'token': token}) ret['token'] = token except Exception as e: ret['code'] = 1002 ret['msg'] = '請求異常' return JsonResponse(ret) class OrderView(APIView): ''' 訂單相關業務(只有SVIP用戶有權限) ''' def get(self, request, *args, **kwargs): ret = {'code': 1000, 'msg': None, 'data': None} try: ret['data'] = ORDER_DICT except Exception as e: pass return JsonResponse(ret) class UserInfoView(APIView): ''' 用戶中心(普通用戶、VIP有權限) ''' permission_classes = [MyPermission] def get(self, request, *args, **kwargs): return HttpResponse('用戶信息')
類,繼承 BaseThrottle
,實現 allow_request
、wait
類,繼承 SimpleRateThrottle
,實現 get_cache_key
、scope = "xi"(配置文件中的key)
局部:throttle_classes = [VisitThrottle]
全局:配置 settings.py