上篇博文介紹了常見須要進行請求重試的場景,本篇博文試着剖析有名的python第三方庫retrying源碼。python
在剖析其源碼以前,有必要講一下retrying的用法,方便理解。app
安裝:dom
pip install retryingide
或者函數
easy_install retryingthis
一些用法實例以下:spa
#example 1 from retrying import retry @retry def never_give_up_never_surrender(): print "一直重試且兩次重試之間無需等待"
#example 2 from retrying import retry @retry(stop_max_attempt_number=7) def stop_after_7_attempts(): print "重試七次後中止"
#example 3 from retrying import retry @retry(stop_max_delay=10000) def stop_after_10_s(): print "十秒以後中止重試"
#example 4 from retrying import retry @retry(wait_fixed=2000) def wait_2_s(): print "每次重試間隔兩秒"
#example 5 from retrying import retry @retry(wait_random_min=1000, wait_random_max=2000) def wait_random_1_to_2_s(): print "每次重試隨機等待1到2秒"
#example 6 from retrying import retry @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000) def wait_exponential_1000(): print "指數退避,每次重試等待 2^x * 1000 毫秒,上限是10秒,達到上限後每次都等待10秒"
#example 7 def retry_if_io_error(exception): """Return True if we should retry (in this case when it's an IOError), False otherwise""" return isinstance(exception, IOError) @retry(retry_on_exception=retry_if_io_error) def might_io_error(): print "IO異常則重試,而且將其它異常拋出" @retry(retry_on_exception=retry_if_io_error, wrap_exception=True) def only_raise_retry_error_when_not_io_error(): print "IO異常則重試,而且將其它異經常使用RetryError對象包裹"
#exampe 8,根據返回結果判斷是否重試 def retry_if_result_none(result): """Return True if we should retry (in this case when result is None), False otherwise""" return result is None @retry(retry_on_result=retry_if_result_none) def might_return_none(): print "若返回結果爲None則重試"
上面八個例子是retrying的用法,只需在要重試的方法上加上@retry註解,並以相應的條件爲參數便可,那麼@retry背後究竟是如何實現的呢?下面給出@retry註解實現的方法。code
1 #裝飾器模式,對須要重試的函數,利用retry註解返回 2 def retry(*dargs, **dkw): 3 """ 4 Decorator function that instantiates the Retrying object 5 @param *dargs: positional arguments passed to Retrying object 6 @param **dkw: keyword arguments passed to the Retrying object 7 """ 8 # support both @retry and @retry() as valid syntax 9 #當用法爲@retry不帶括號時走這條路徑,dargs[0]爲retry註解的函數,返回函數對象wrapped_f 10 if len(dargs) == 1 and callable(dargs[0]): 11 def wrap_simple(f): 12 13 @six.wraps(f)#註解用於將函數f的簽名複製到新函數wrapped_f 14 def wrapped_f(*args, **kw): 15 return Retrying().call(f, *args, **kw) 16 17 return wrapped_f 18 19 return wrap_simple(dargs[0]) 20 21 else:#當用法爲@retry()帶括號時走這條路徑,返回函數對象wrapped_f 22 def wrap(f): 23 24 @six.wraps(f)#註解用於將函數f的簽名複製到新函數wrapped_f 25 def wrapped_f(*args, **kw): 26 return Retrying(*dargs, **dkw).call(f, *args, **kw) 27 28 return wrapped_f 29 30 return wrap
當用@retry標記函數時,例如實例1,其實執行了orm
never_give_up_never_surrender = retry(never_give_up_never_surrender)
此時的never_give_up_never_surrender函數其實是10-19行返回的wrapped_f函數,後續對never_give_up_never_surrender函數的調用都是調用的14行的wrapped_f函數。對象
當使用@retry()或者帶參數的@retry(params)時,如實例2,實際執行了:
stop_after_7_attempts = retry(stop_max_attempt_number)(stop_after_7_attempts)
此時的stop_after_7_attempts函數其實是22-29行的wrapped_f函數,後續對stop_after_7_attempts函數的調用都是對25行的wrapped_f函數調用。
能夠看到實際上@retry將對須要重試的函數調用轉化爲對Retrying類中call函數的調用,重試邏輯也在這個函數實現,實現對邏輯代碼的無侵入,代碼以下:
1 def call(self, fn, *args, **kwargs): 2 start_time = int(round(time.time() * 1000)) 3 attempt_number = 1 4 while True: 5 #_before_attempts爲@retry傳進來的before_attempts,在每次調用函數前執行一些操做 6 if self._before_attempts: 7 self._before_attempts(attempt_number) 8 9 try:#Attempt將函數執行結果或者異常信息以及執行次數做爲內部狀態,用True或False標記是內部存的值正常執行結果仍是異常 10 attempt = Attempt(fn(*args, **kwargs), attempt_number, False) 11 except: 12 tb = sys.exc_info()#獲取異常堆棧信息,sys.exc_info()返回type(異常類型), value(異常說明), traceback(traceback對象,包含更豐富的信息) 13 attempt = Attempt(tb, attempt_number, True) 14 15 if not self.should_reject(attempt):#根據本次執行結果或異常類型判斷是否應該中止 16 return attempt.get(self._wrap_exception) 17 18 if self._after_attempts:#_after_attempts爲@retry傳進來的after_attempts,在每次調用函數後執行一些操做 19 self._after_attempts(attempt_number) 20 21 delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time 22 if self.stop(attempt_number, delay_since_first_attempt_ms):#根據重試次數和延遲判斷是否應該中止 23 if not self._wrap_exception and attempt.has_exception: 24 # get() on an attempt with an exception should cause it to be raised, but raise just in case 25 raise attempt.get() 26 else: 27 raise RetryError(attempt) 28 else:#不中止則等待必定時間,延遲時間根據wait函數返回值和_wait_jitter_max計算 29 sleep = self.wait(attempt_number, delay_since_first_attempt_ms) 30 if self._wait_jitter_max: 31 jitter = random.random() * self._wait_jitter_max 32 sleep = sleep + max(0, jitter) 33 time.sleep(sleep / 1000.0) 34 35 attempt_number += 1 #進行下一輪重試
9-13行將函數執行返回結果或異常存入Attempt對象attempt中,Attempt類以下:
class Attempt(object): """ An Attempt encapsulates a call to a target function that may end as a normal return value from the function or an Exception depending on what occurred during the execution. """ #value值爲函數返回結果或異常,根據has_exception判斷 def __init__(self, value, attempt_number, has_exception): self.value = value self.attempt_number = attempt_number self.has_exception = has_exception #返回函數執行結果或異常,並根據wrap_exception參數對異經常使用RetryError包裹 def get(self, wrap_exception=False): """ Return the return value of this Attempt instance or raise an Exception. If wrap_exception is true, this Attempt is wrapped inside of a RetryError before being raised. """ if self.has_exception: if wrap_exception: raise RetryError(self) else:#從新構造原異常拋出 six.reraise(self.value[0], self.value[1], self.value[2]) else: return self.value def __repr__(self): if self.has_exception: return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2]))) else: return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)
15行根據should_reject函數的返回值判斷是否中止重試,代碼以下:
def should_reject(self, attempt): reject = False #假如異常在retry_on_exception參數中返回True,則重試,默認不傳異常參數時,發生異常一直重試 if attempt.has_exception: reject |= self._retry_on_exception(attempt.value[1]) else:#假如函數返回結果在retry_on_result參數函數中爲True,則重試 reject |= self._retry_on_result(attempt.value) return reject
22行根據重試次數和延遲判斷是否應該中止重試,self.stop的賦值代碼在構造函數中,代碼片斷以下:
stop_funcs = [] if stop_max_attempt_number is not None: stop_funcs.append(self.stop_after_attempt) if stop_max_delay is not None: stop_funcs.append(self.stop_after_delay) if stop_func is not None: self.stop = stop_func elif stop is None:#執行次數和延遲任何一個達到限制則中止 self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs) else: self.stop = getattr(self, stop)
def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms): """Stop after the previous attempt >= stop_max_attempt_number.""" return previous_attempt_number >= self._stop_max_attempt_number def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms): """Stop after the time from the first attempt >= stop_max_delay.""" return delay_since_first_attempt_ms >= self._stop_max_delay
29-33行等待一段時間再次重試,其中延遲時間重點是根據29行的wait函數計算,wait函數在構造函數中賦值,代碼片斷以下:
wait_funcs = [lambda *args, **kwargs: 0] if wait_fixed is not None: wait_funcs.append(self.fixed_sleep) if wait_random_min is not None or wait_random_max is not None: wait_funcs.append(self.random_sleep) if wait_incrementing_start is not None or wait_incrementing_increment is not None: wait_funcs.append(self.incrementing_sleep) if wait_exponential_multiplier is not None or wait_exponential_max is not None: wait_funcs.append(self.exponential_sleep) if wait_func is not None: self.wait = wait_func elif wait is None:#返回幾個函數的最大值,做爲等待時間 self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs) else: self.wait = getattr(self, wait)
其中最值得研究的是指數退避延遲時間計算方法,函數爲exponential_sleep,代碼以下:
def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): exp = 2 ** previous_attempt_number result = self._wait_exponential_multiplier * exp #延遲時間爲_wait_exponential_multiplier*2^x if result > self._wait_exponential_max:#假如大於退避上限_wait_exponential_max,則result爲上限值 result = self._wait_exponential_max if result < 0: result = 0 return result