仿源碼自定義頻率規則:api
class MyThrottles(): VISIT_RECORD = {} def __init__(self): self.history=None def allow_request(self,request, view): #(1)取出訪問者ip ip=request.META.get('REMOTE_ADDR') import time ctime=time.time() # (2)判斷當前ip不在訪問字典裏,添加進去,而且直接返回True,表示第一次訪問 if ip not in self.VISIT_RECORD: self.VISIT_RECORD[ip]=[ctime,] return True self.history=self.VISIT_RECORD.get(ip) # (3)循環判斷當前ip的列表,有值,而且當前時間減去列表的最後一個時間大於60s,把這種數據pop掉,這樣列表中只有60s之內的訪問時間, while self.history and ctime-self.history[-1]>60: self.history.pop() # (4)判斷,當列表小於3,說明一分鐘之內訪問不足三次,把當前時間插入到列表第一個位置,返回True,順利經過 # (5)當大於等於3,說明一分鐘內訪問超過三次,返回False驗證失敗 if len(self.history)<3: self.history.insert(0,ctime) return True else: return False def wait(self): import time ctime=time.time() return 60-(ctime-self.history[-1])
定義頻率類,必須繼承SimpleRateThrottle類:緩存
from rest_framework.throttling import SimpleRateThrottle class VisitThrottle(SimpleRateThrottle):
# 必須配置scope參數 經過scope參數去settings中找頻率限定的規則 scope = 'throttle'
# 必須定義 get_cache_key函數 返回用戶標識的key 這裏借用源碼中BaseThrottle類(SimpleRateThrottle的父類)中的get_ident函數返回用戶ip地址 def get_cache_key(self, request, view): return self.get_ident(request)
局部使用:ide
視圖函數中加上
throttle_classes = [VisitThrottle,]
全局使用:settings中配置函數
REST_FRAMEWORK={ 'DEFAULT_THROTTLE_CLASSES':['utils.common.VisitThrottle'],
# 局部使用也要在settings中配置上 DEFAULT_THROTTLE_RATES 經過self.scope取頻率規則 (一分鐘訪問3次) 'DEFAULT_THROTTLE_RATES':{'throttle':'3/m',} }
設置錯誤信息爲中文:源碼分析
class Course(APIView): authentication_classes = [TokenAuth, ] permission_classes = [UserPermission, ] throttle_classes = [MyThrottles,] def get(self, request): return HttpResponse('get') def post(self, request): return HttpResponse('post')
# 函數名爲throttled 重寫Throttled類中默認的錯誤信息 def throttled(self, request, wait): from rest_framework.exceptions import Throttled class MyThrottled(Throttled): default_detail = '訪問頻繁' extra_detail_singular = '等待 {wait} second.' extra_detail_plural = '等待 {wait} seconds.' raise MyThrottled(wait)
源碼查看流程:post
APIView的dispatch方法() ---- self.initial(request, *args, **kwargs) --- self.check_throttles(request)spa
SimpleRateThrottle類詳解:rest
class SimpleRateThrottle(BaseThrottle): # 字典 {ip1:[time1,time2],ip2:[time1,time2]} 放在緩存中 cache = default_cache # 獲取當前時間 timer = time.time # 作了一個字符串格式化,爲登陸用戶ip限制,配合auth模塊使用 cache_format = 'throttle_%(scope)s_%(ident)s' scope = None # 從配置文件中取DEFAULT_THROTTLE_RATES THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, 'rate', None): # 從配置文件中找出scope配置的名字對應的值 self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate) # 這個方法須要重寫,不然直接拋異常 def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ raise NotImplementedError('.get_cache_key() must be overridden') def get_rate(self): if not getattr(self, 'scope', None): msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: # 獲取在setting裏配置的字典中的值 return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg) # 解析 3/m這種傳參 def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ if rate is None: return (None, None) num, period = rate.split('/') num_requests = int(num) # 只取了第一位,也就是 3/mimmmmmmm也是表明一分鐘 duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] return (num_requests, duration) # 頻率驗證邏輯 def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True self.key = self.get_cache_key(request, view) if self.key is None: return True self.history = self.cache.get(self.key, []) self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() # 成功返回true,而且插入到緩存中 def throttle_success(self): """ Inserts the current request's timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True # 失敗返回false def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False # 計算返回還需等待多長時間 def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)