在使用緩存的時候,咱們每每會先從緩存中獲取數據,若是獲取不到,就去數據源加載數據,而後寫入緩存python
data = cache.get(key) #step1
if data is None:
data = load_data() #step2
cache.set(key,data) #step3
return data
複製代碼
這基本上也是後端開發再熟悉不過的一種模式,但這種模式,在高併發的場景下,頗有可能會產生緩存擊穿的問題。舉個例子,在系統運行的高峯期,某個熱點key過時了,大量的併發請求在step1階段獲取不到數據,就會進入到step2階段,這時候就會有大量的請求直接打到數據庫上,可能直接就把數據庫給打死了。redis
緩存擊穿產生主要緣由,就是緩存中獲取不到數據時,大量併發請求同時去數據庫加載數據,形成數據庫壓力太大。分析一下這個問題,咱們不難知道,假如同時有1000個併發請求,其實加載數據、更新緩存的操做,作1次就行了,剩下的999次請求作的徹底是重複操做。數據庫
那怎麼避免加載數據、更新緩存的重複操做呢?首先想到的一個辦法就是加鎖:獲取到鎖的請求,去加載數據,沒有獲取到鎖的請求,就先等待。後端
加鎖方案1:(錯誤方案)緩存
data = cache.get(key) #step1
if data is None:
lock.lock() #阻塞
data = load_data() #step2
cache.set(key,data) #step3
lock.unlock()
return data
複製代碼
這種加鎖方案是不行的。這種方案下,雖熱避免了併發地加載數據,保證數據庫不會被打死,但依然沒有解決重複操做的問題。並且這種方案下,大量請求串行化操做,會加大系統延遲。bash
加鎖方案2:try-lock模式架構
data = cache.get(key) #step1
if data is None:
locked = lock.try_lock() #非阻塞
if locked:
data = load_data() #step2
cache.set(key,data) #step3
lock.unlock()
else:
time.sleep(1)
data = cache.get(key)
return data
複製代碼
這種方案,基本上已經可以知足咱們的需求,只有獲取到鎖的線程會去加載數據,其餘線程會休眠一段時間後,再次嘗試從緩存中獲取數據。但這裏仍是有兩個細節須要注意,一是線程休眠是時間,過短的話可能新的數據還沒加載到緩存,太長的話會影響性能。二是鎖的粒度,鎖的粒度若是太大,好比獲取某條訂單數據的時候,若是全部的訂單都用同一把鎖,仍是會影響系統的性能,比較合理地方式應該是爲每一個訂單建立一把鎖。這裏能夠考慮使用redis實現分佈式鎖,用訂單號做爲分佈式鎖的key。併發
singleflight是groupcache項目下的一個庫,感興趣的同窗能夠研究一下groupcache的源碼。簡單來講,singleflight 可以使多個併發請求所觸發的回源操做裏,只有第一個回源被執行,其他請求阻塞等待第一個被執行的那個回源操做完成後,直接取其結果,以此保證同一時刻只有一個回源操做在執行,以達到防止擊穿的效果。app
groupcache是memcache做者用go語言開發的一個緩存系統,我參考其中源碼,實現了一個singlecache的Python版本,下面直接上代碼:dom
from threading import Condition, Lock
class Caller(object):
def __init__(self):
self._cond = Condition()
self.val = None
self.err = None
self.done = False
def result(self):
with self._cond: # 阻塞等待執行結果
while not self.done:
self._cond.wait()
if self.err:
raise self.err
return self.val
def notify(self):
with self._cond:
self.done = True
self._cond.notify_all()#通知全部阻塞線程,執行已經完成
class SingleFlight(object):
def __init__(self):
self.map = {}
self.lock = Lock()
def do(self, key, fn, *args, **kwargs):
self.lock.acquire()
if key in self.map:
#已經存在對該key的請求,則新線程不會重複處理key的請求因此釋放鎖,而後阻塞等待請求獲得的結果
caller = self.map[key]
self.lock.release()
return caller.result()
caller = Caller()
self.map[key] = caller
self.lock.release()
# 執行真正的請求函數,獲得對該 key 請求的結果
try:
caller.val = fn(*args, **kwargs)
except Exception as e:
caller.err = e
finally:
caller.notify()#通知該key下的阻塞線程,執行已經完成,能夠獲取結果了
# 執行已經完成,刪除對應的key,下次再有一樣的key,還會再次執行
self.lock.acquire()
del self.map[key]
self.lock.release()
#返回執行結果
return caller.result()
複製代碼
讓咱們來簡單模型一些併發操做,看看使用SingleFlight的執行效果:
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=20)
single_flight = SingleFlight()
def long_task(delay=1):
print("run long task")
time.sleep(delay)
return random.uniform(1, 100)
def run_in_single_flight():
return single_flight.do('long_task', long_task, delay=0.1)
tasks = []
for i in range(10):
task = executor.submit(run_in_single_flight)
tasks.append(task)
for task in as_completed(tasks):
data = task.result()
print(data)
複製代碼
執行後的輸出結果以下:能夠看到,咱們啓動了10個線程併發的執行long_task,每一個線程都拿到了執行結果,但long_task實際上只執行了一次。
run long task
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
61.1450551376543
複製代碼
回到緩存擊穿的問題上,使用SingleFlight解決緩存擊穿的代碼也很簡單,其實關鍵就在於SingleFlight可以保證同一時刻只有一個load_data_and_set_cache操做在執行,執行完畢會馬上通知其餘阻塞的線程,並且經過設置不一樣的key,能有效控制阻塞的粒度。
single_flight = SingleFlight()
def load_data_and_set_cache(key):
data = load_data() #step2
cache.set(key,data) #step3
def request_handler():
data = cache.get(key) #step1
if data is None:
data = single_flight.do(load_data_and_set_cache,key)
return data
複製代碼
singleflight能夠看作是一種併發控制機制,這種機制能夠有不一樣的實現,單價版的、分佈式版的,其實大部分狀況下,單機版的就能很大程度上控制併發訪問,也就夠用了。這種控制機制,不僅是在緩存擊穿問題上,在微服務架構中,爲了不給下游服務形成太大的壓力,也能夠考慮使用singleflight機制。