併發中如何保證緩存DB雙寫一致性(JAVA栗子)

  併發場景中大部分處理的是先更新DB,再(刪緩、更新)緩存的處理方式,可是在實際場景中有可能DB更新成功了,可是緩存設置失敗了,就形成了緩存與DB數據不一致的問題,下面就以實際狀況說下怎麼解決此類問題。java

  名詞 Cache:本文內指redis,ReadRequest:請求從Cache、Db中拿去數據,WriteRequest:數據寫入DB並刪除緩存redis

  若要保證數據庫與緩存一直,咱們須要採用先刪緩存,在更新DB的狀況,這時候有的同窗可能會問,若是緩存刪除成功了,而DB更新失敗了怎麼辦,其實仔細考慮一下,DB雖然失敗了,那真正是不會產生數據影響的,而當下次一次請求進來的時候,咱們從新把DB中未更新的數據從新塞入緩存,從結果上來看是沒有影響的。咱們把請求分爲ReadRequest 、WriteRequest,大部分同窗都知道咱們在使用Cache時 首先都會去Cache內查一下,若是Cache中沒有拿到數據咱們在從數據庫中去獲取數據,這個時候在高併發的場景的踩過坑的同窗都知道恰巧在這時候有更新請求把緩存刪除了,這時候大量請求進來,Cache內沒有此項數據,請求就會直接落在DB上,就很容易形成緩存雪崩,數據庫極可能瞬時就掛掉了,因此處理方案就是咱們須要對查詢寫入的緩存進行排隊處理,而正確從cache內獲取的姿式:數據庫

  一、每次查詢數據的時候咱們吧請求數據放入隊列,由隊列消費者去檢查一下cache是否存在,不存在則進行插入,存在就跳過緩存

  二、當前readRequest就自循環,咱們不斷嘗試從cache內去獲取數據,拿到數據或超時當前線程當即退出安全

  三、若是拿到數據了就返回結果,沒有拿到數據咱們就從DB去查多線程

  而WriteRequest 的處理相對就簡單多了咱們直接刪除緩存後,更新DB便可,下面上代碼說明:併發

  消息隊列這裏咱們基於jdk併發包內的BlockingQueue進行實現,使用MQ(Rabbit,Kafka等)的話思想差很少,只是須要交互一次mq的服務端。首先項目啓動時咱們在程序後臺開闢監聽線程,從數據共享緩衝區(ArrayBlockingQueue)內監聽消息ide

 

public class BlockQueueThreadPool { /** * 核心線程數 */
    private Integer corePoolSize = 10; /** * 線程池最大線程數 */
    private Integer maximumPoolSize = 20; /** * 線程最大存活時間 */
    private Long keepAliveTime = 60L; private ExecutorService threadPool = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(this.corePoolSize)); public BlockQueueThreadPool() { RequestQueue requestQueue = RequestQueue.getInstance(); BlockingQueue<RequestAction> queue = new ArrayBlockingQueue<>(this.corePoolSize); requestQueue.add(queue); this.threadPool.submit(new JobThread(queue)); } }

   PS:ArrayBlockingQueue中很好的利用了Condition中的等待和通知功能,這裏咱們就能實現對共享通道隊列的事件監聽了。高併發

public class JobThread implements Callable<Boolean> { private BlockingQueue<RequestAction> queue; public JobThread(BlockingQueue<RequestAction> queue) { this.queue = queue; } @Override public Boolean call() throws Exception { try { while (true) { // ArrayBlockingQueue take方法 獲取隊列排在首位的對象,若是隊列爲空或者隊列滿了,則會被阻塞住
                RequestAction request = this.queue.take(); RequestQueue requestQueue = RequestQueue.getInstance(); Map<String, Boolean> tagMap = requestQueue.getTagMap(); if (request instanceof ReadRequest) { Boolean tag = tagMap.get(request.getIdentity()); if (null == tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && !tag) { return Boolean.TRUE; } } else if (request instanceof WriteRequest) { // 若是是更新數據庫的操做
 tagMap.put(request.getIdentity(), Boolean.TRUE); } // 執行請求處理
                log.info("緩存隊列執行+++++++++++++++++,{}", request.getIdentity()); request.process(); } } catch (Exception e) { e.printStackTrace(); } return Boolean.TRUE; } }

  接下來就要定義咱們的WriteRequest、ReadRequest了優化

@Slf4j public class ReadRequest<TResult> extends BaseRequest { public ReadRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { TResult result = (TResult) action.exec(); if (Objects.isNull(result)) { //防止緩存擊穿
            redis.set(cacheKey, "", 10000); } else { redis.set(cacheKey, result, 10000); } } }
public class WriteRequest<TResult> extends BaseRequest { public WriteRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { redis.del(cacheKey); action.exec(); } }

  這裏咱們須要坐下判斷,在數據庫內查詢數據爲空後把「」寫入了緩存,這樣子是避免有人惡意請求不存在的數據時形成緩存擊穿。接下來就是咱們針對各項業務場景中須要獲取與更新緩存的路由端了

@UtilityClass public class RouteUtils { public static void route(RequestAction requestAction) { try { BlockingQueue<RequestAction> queue = RequestQueue.getInstance().getQueue(0); queue.put(requestAction); } catch (Exception e) { e.printStackTrace(); } } }
public class RequestQueue { private RequestQueue() { } private List<BlockingQueue<RequestAction>> queues = new ArrayList<>(); private Map<String, Boolean> tagMap = new ConcurrentHashMap<>(1); private static class Singleton { private static RequestQueue queue; static { queue = new RequestQueue(); } private static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance() { return Singleton.getInstance(); } public void add(BlockingQueue<RequestAction> queue) { this.queues.add(queue); } public BlockingQueue<RequestAction> getQueue(int index) { return this.queues.get(index); } public int size() { return this.queues.size(); } public Map<String, Boolean> getTagMap() { return this.tagMap; } }

  這裏有一個小的知識點,不少時候咱們在保證線程安全的時候多數會使用DSL雙鎖模型,可是我始終以爲這類代碼不夠美觀,因此咱們能夠利用JVM的類加載原則,使用靜態類包裹初始化類,這樣子也必定能保證單例模型,而且代碼也更美觀了。接下來就能夠看下Service的代碼

@Service public class StudentService { public Student getStudent(String name) { ReadRequest<Student> readRequest = new ReadRequest<>(name, () -> Student.builder().name(name).age(3).build()); return CacheProcessor.builder().build().getData(readRequest); } public void update(Student student) { WriteRequest<Student> writeRequest = new WriteRequest<>(student.getName(), () -> student); CacheProcessor.builder().build().setData(writeRequest); } }

Service內直接調用了Cachce的處理者,咱們經過處理者來獲取緩存與更新緩存

@Builder public class CacheProcessor { public <TResult> TResult getData(ReadRequest readRequest) { try { RouteUtils.route(readRequest); long startTime = System.currentTimeMillis(); long waitTime = 0L; while (true) { if (waitTime > 3000) { break; } TResult result = (TResult) readRequest.redis.get(readRequest.getIdentity()); if (!Objects.isNull(result)) { return result; } else { Thread.sleep(20); waitTime = System.currentTimeMillis() - startTime; } } return (TResult) readRequest.get(); } catch (Exception e) { return null; } } public void setData(WriteRequest writeRequest){ RouteUtils.route(writeRequest); } }

  這裏咱們就先把請求數據發送到數據共享渠道,消費者端與當前的ReadRequest線程同步執行,拿到數據後ReadRequest就立馬退出,超時後咱們就從數據庫中獲取數據。這裏面我使用了java8 @FunctionalInterface 標記接口,對各個業務中須要用到緩存的地方統一進行封裝方便調用,以上的代碼就已經基本說明併發中Db和Cache雙休一致性的解決思路,聰明的小夥伴確定能看出其實還有不少優化的地方,好比說咱們栗子中是單線程吞吐量不高,採用多線程與多消費者端的時候咱們還須要保證商品的更新和讀取請求須要落在同一個消費者端等等問題。或者在使用外部MQ時,咱們除了要考慮以上同一商品的讀寫保證落在一個消費節點上,還須要考慮隊列內有插入緩存請求的時候須要跳過的處理等等,更多狀況還須要根據實際狀況你們本身去發現咯

 

參考:中華石杉的教程

相關文章
相關標籤/搜索