使用singleflight解決緩存擊穿(python版本)

緩存擊穿

在使用緩存的時候,咱們每每會先從緩存中獲取數據,若是獲取不到,就去數據源加載數據,而後寫入緩存python

data = cache.get(key)  #step1
if data is None:
  data = load_data()   #step2
  cache.set(key,data)	 #step3
return data
複製代碼

這基本上也是後端開發再熟悉不過的一種模式,但這種模式,在高併發的場景下,頗有可能會產生緩存擊穿的問題。舉個例子,在系統運行的高峯期,某個熱點key過時了,大量的併發請求在step1階段獲取不到數據,就會進入到step2階段,這時候就會有大量的請求直接打到數據庫上,可能直接就把數據庫給打死了。redis

解決方案

緩存擊穿產生主要緣由,就是緩存中獲取不到數據時,大量併發請求同時去數據庫加載數據,形成數據庫壓力太大。分析一下這個問題,咱們不難知道,假如同時有1000個併發請求,其實加載數據、更新緩存的操做,作1次就行了,剩下的999次請求作的徹底是重複操做。數據庫

加鎖

那怎麼避免加載數據、更新緩存的重複操做呢?首先想到的一個辦法就是加鎖:獲取到鎖的請求,去加載數據,沒有獲取到鎖的請求,就先等待。後端

加鎖方案1:(錯誤方案)緩存

data = cache.get(key)  #step1
if data is None:
  lock.lock()  #阻塞
  data = load_data()   #step2
  cache.set(key,data)	 #step3
  lock.unlock()
return data
複製代碼

這種加鎖方案是不行的。這種方案下,雖熱避免了併發地加載數據,保證數據庫不會被打死,但依然沒有解決重複操做的問題。並且這種方案下,大量請求串行化操做,會加大系統延遲。bash

加鎖方案2:try-lock模式架構

data = cache.get(key)  #step1
if data is None:
  locked = lock.try_lock()  #非阻塞
  if locked:
    data = load_data()   #step2
    cache.set(key,data)	 #step3
    lock.unlock()
  else:
    time.sleep(1)
    data = cache.get(key)
return data
複製代碼

這種方案,基本上已經可以知足咱們的需求,只有獲取到鎖的線程會去加載數據,其餘線程會休眠一段時間後,再次嘗試從緩存中獲取數據。但這裏仍是有兩個細節須要注意,一是線程休眠是時間,過短的話可能新的數據還沒加載到緩存,太長的話會影響性能。二是鎖的粒度,鎖的粒度若是太大,好比獲取某條訂單數據的時候,若是全部的訂單都用同一把鎖,仍是會影響系統的性能,比較合理地方式應該是爲每一個訂單建立一把鎖。這裏能夠考慮使用redis實現分佈式鎖,用訂單號做爲分佈式鎖的key。併發

singleflight

singleflight是groupcache項目下的一個庫,感興趣的同窗能夠研究一下groupcache的源碼。簡單來講,singleflight 可以使多個併發請求所觸發的回源操做裏,只有第一個回源被執行,其他請求阻塞等待第一個被執行的那個回源操做完成後,直接取其結果,以此保證同一時刻只有一個回源操做在執行,以達到防止擊穿的效果。app

groupcache是memcache做者用go語言開發的一個緩存系統,我參考其中源碼,實現了一個singlecache的Python版本,下面直接上代碼:dom

from threading import Condition, Lock

class Caller(object):
    def __init__(self):
        self._cond = Condition()
        self.val = None
        self.err = None
        self.done = False

    def result(self):
        with self._cond: # 阻塞等待執行結果
            while not self.done:
                self._cond.wait()

        if self.err:
            raise self.err

        return self.val

    def notify(self):
        with self._cond:
            self.done = True
            self._cond.notify_all()#通知全部阻塞線程,執行已經完成


class SingleFlight(object):
    def __init__(self):
        self.map = {}
        self.lock = Lock()

    def do(self, key, fn, *args, **kwargs):
        self.lock.acquire()
        if key in self.map:
           #已經存在對該key的請求,則新線程不會重複處理key的請求因此釋放鎖,而後阻塞等待請求獲得的結果
            caller = self.map[key]
            self.lock.release()
            return caller.result()

        caller = Caller()
        self.map[key] = caller
        self.lock.release()
				
        # 執行真正的請求函數,獲得對該 key 請求的結果
        try:
            caller.val = fn(*args, **kwargs)
        except Exception as e:
            caller.err = e
        finally:
            caller.notify()#通知該key下的阻塞線程,執行已經完成,能夠獲取結果了
            
				# 執行已經完成,刪除對應的key,下次再有一樣的key,還會再次執行
        self.lock.acquire()
        del self.map[key]
        self.lock.release()
				
        #返回執行結果
        return caller.result()

複製代碼

讓咱們來簡單模型一些併發操做,看看使用SingleFlight的執行效果:

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=20)
    single_flight = SingleFlight()

    def long_task(delay=1):
        print("run long task")
        time.sleep(delay)
        return random.uniform(1, 100)

    def run_in_single_flight():
        return single_flight.do('long_task', long_task, delay=0.1)

    tasks = []
    for i in range(10):
        task = executor.submit(run_in_single_flight)
        tasks.append(task)
    for task in as_completed(tasks):
        data = task.result()
        print(data)

複製代碼

執行後的輸出結果以下:能夠看到,咱們啓動了10個線程併發的執行long_task,每一個線程都拿到了執行結果,但long_task實際上只執行了一次。

run long task
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
複製代碼

回到緩存擊穿的問題上,使用SingleFlight解決緩存擊穿的代碼也很簡單,其實關鍵就在於SingleFlight可以保證同一時刻只有一個load_data_and_set_cache操做在執行,執行完畢會馬上通知其餘阻塞的線程,並且經過設置不一樣的key,能有效控制阻塞的粒度。

single_flight = SingleFlight()

def load_data_and_set_cache(key):
  data = load_data()   #step2
  cache.set(key,data)	 #step3


def request_handler():
  data = cache.get(key)  #step1
  if data is None:
    data = single_flight.do(load_data_and_set_cache,key)
  return data
複製代碼

總結

singleflight能夠看作是一種併發控制機制,這種機制能夠有不一樣的實現,單價版的、分佈式版的,其實大部分狀況下,單機版的就能很大程度上控制併發訪問,也就夠用了。這種控制機制,不僅是在緩存擊穿問題上,在微服務架構中,爲了不給下游服務形成太大的壓力,也能夠考慮使用singleflight機制。

相關文章
相關標籤/搜索