高併發情境下首先考慮到的第一層優化方案就是增長緩存,尤爲是經過Redis將本來在數據庫中的數據複製一份放到內存中,能夠減小對數據庫的讀操做,數據庫的壓力下降,同時也會加快系統的響應速度,可是一樣的也會帶來其餘的問題,好比須要考慮數據的一致性、還須要預防可能的緩存擊穿、穿透和雪崩問題等等。java
先查詢緩存中有沒有要的數據,若是有,就直接返回緩存中的數據。若是緩存中沒有要的數據,纔去查詢數據庫,將獲得數據更新到緩存再返回,若是數據庫中也沒有就能夠返回空。redis
考慮數據一致性,緩存處的代碼邏輯都較爲標準化,首先取Redis,擊中則返回,未擊中則經過數據庫來進行查詢和同步。算法
public Result query(String id) {
數據庫
Result result = null;
緩存
//1.從Redis緩存中取數據
併發
result = (Result)redisTemplate.opsForValue().get(id);
異步
if (null != result){
分佈式
System.out.println("緩存中獲得數據");
高併發
return result;
優化
}
//2.經過DB查詢,有則同步更新redis,不然返回空
System.out.println("數據庫中獲得數據");
result = Dao.query(id);
if (null != result){
redisTemplate.opsForValue().set(id,result);
redisTemplate.expire(id,20000, TimeUnit.MILLISECONDS);
}
return result;
}
其餘的新增、刪除和更新操做,能夠直接採用先清空該Key下的緩存值再進行DB操做,這樣邏輯清晰簡單,維護的複雜度會下降,而付出代價就是多查詢一次。
public void update(Entity entity) {
redisTemplate.delete(entity.getId());
Dao.update(entity);
return entity;
}
public Entity add(Entity entity) {
redisTemplate.delete(entity.getId());
Dao.insert(entity);
return entity;
}
適用於作緩存的場景通常都是:訪問頻繁、讀場景較多而寫場景少、對數據一致性要求不高。若是上面三個條件都不符合,那維護一套緩存數據的意義並不大了,實際應用中一般都須要針對業務場景來選擇合適的緩存方案,下面給出了四種緩存策略,由上到下就是按照一致性由強到弱的順序。
更新策略 | 特色 | 適用場景 |
---|---|---|
實時更新 | 同步更新保證強一致性,與業務強侵入強耦合 | 金融轉帳業務等 |
弱實時 | 異步更新(MQ/發佈訂閱/觀察者模式),業務解耦,弱一致性存在延遲 | 不適合寫頻繁場景 |
失效機制 | 設置緩存失效,有必定延遲,可能存在雪崩 | 適用讀多寫少,能接受必定的延時 |
任務調度 | 經過定時任務進行全量更新 | 統計類業務,訪問頻繁且按期更新 |
緩存雪崩是指在咱們設置緩存時採用了相同的過時時間,致使緩存在某一時刻同時失效,請求所有轉發到DB,DB瞬時壓力太重雪崩。和緩存擊穿不一樣的是,緩存擊穿指併發查同一條數據,緩存雪崩是不一樣數據都過時了,不少數據都查不到從而查數據庫。
將緩存失效時間分散開,好比咱們能夠在原有的失效時間基礎上增長一個隨機值,好比1-5分鐘隨機,這樣每個緩存的過時時間的重複率就會下降,就很難引起集體失效的事件。
用加鎖或者隊列的方式保證緩存的單線程(進程)寫,從而避免失效時大量的併發請求落到底層存儲系統上。
第一種方案比較容易實現,第二種的思路主要是從加阻塞式的排它鎖來實現,在緩存查詢不到的狀況下,每此只容許一個線程去查詢DB,這樣可避免同一個ID的大量併發請求都落到數據庫中。
public Result query(String id) {
// 1.從緩存中取數據
Result result = null;
result = (Result)redisTemplate.opsForValue().get(id);
if (result ! = null) {
logger.info("緩存中獲得數據");
return result;
}
//2.加鎖排隊,阻塞式鎖
doLock(id);//多少個id就可能有多少把鎖
try{
//一次只有一個線程
//雙重校驗,第一次獲取到後面的均可以從緩存中直接擊中
result = (Result)redisTemplate.opsForValue().get(id);
if (result != null) {
logger.info("緩存中獲得數據");
return result;//第二個線程,這裏返回
}
result = dao.query(id);
// 3.從數據庫查詢的結果不爲空,則把數據放入緩存中,方便下次查詢
if (null != result) {
redisTemplate.opsForValue().set(id,result);
redisTemplate.expire(id,20000, TimeUnit.MILLISECONDS);
}
return provinces;
} catch(Exception e) {
return null;
} finally {
//4.解鎖
releaseLock(provinceid);
}
}
private void releaseLock(String userCode) {
ReentrantLock oldLock = (ReentrantLock) locks.get(userCode);
if(oldLock !=null && oldLock.isHeldByCurrentThread()){
oldLock.unlock();
}
}
private void doLock(String lockcode) {
//id有不一樣的值
//id相同的,加一個鎖,不是同一個key,不能用同一個鎖
ReentrantLock newLock = new ReentrantLock();//建立一個鎖
//若已存在,則newLock直接丟棄
Lock oldLock = locks.putIfAbsent(lockcode, newLock);
if(oldLock == null){
newLock.lock();
}else{
oldLock.lock();
}
}
注意:加鎖排隊的解決方式在處理分佈式環境的併發問題,有可能還要解決分佈式鎖的問題;線程還會被阻塞,用戶體驗不好!所以,在真正的高併發場景下不多使用!
一個存在的key,在緩存過時的一刻,同時有大量的請求,這些請求都會擊穿到DB,形成瞬時DB請求量大、壓力驟增。
在訪問key以前,採用SETNX(set if not exists)來設置另外一個短時間key來鎖住當前key的訪問,訪問結束再刪除該短時間key。
緩存穿透是指緩存和數據庫中都沒有的數據,而用戶不斷髮起請求,如發起爲id爲「-1」的數據或id爲特別大不存在的數據。這時的用戶極可能是攻擊者,攻擊會致使數據庫壓力過大。
布隆過濾器的使用方法,相似java的SET集合,用來判斷某個元素(key)是否在某個集合中。和通常的hash set不一樣的是,這個算法無需存儲key的值,對於每一個key,只須要k個比特位,每一個存儲一個標誌,用來判斷key是否在集合中。
使用步驟:
將List數據裝載入布隆過濾器中
private BloomFilter<String> bf =null;
//PostConstruct註解對象建立後,自動調用本方法
@PostConstruct
public void init(){
//在bean初始化完成後,實例化bloomFilter,並加載數據
List<Entity> entities= initList();
//初始化布隆過濾器
bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), entities.size());
for (Entity entity : entities) {
bf.put(entity.getId());
}
}
訪問通過布隆過濾器,存在才能夠往db中查詢
public Provinces query(String id) {
//先判斷布隆過濾器中是否存在該值,值存在才容許訪問緩存和數據庫
if(!bf.mightContain(id)) {
Log.info("非法訪問"+System.currentTimeMillis());
return null;
}
Log.info("數據庫中獲得數據"+System.currentTimeMillis());
Entity entity= super.query(id);
return entity;
}
這樣當外界有惡意攻擊時,不存在的數據請求就能夠直接攔截在過濾器層,而不會影響到底層數據庫系統。