實時刷新緩存-處理mysql主從延遲的一些設計方案

在項目開發當中,常常有這樣一種場景,對數據庫進行添加、修改、刪除操做的應用直接鏈接master庫,只對數據庫進行查詢的應用,會先創建一箇中央緩存,例如redis或者memcache,若是緩存沒有命中,那麼直接訪問slave庫。下文會介紹一下在刷新中央緩存時,若是發生主從延遲,應該如何處理。也便是,當應用System-A 把數據庫寫入master庫的時候,System-B應用在讀取slave庫的時候,master庫的數據還沒同步到slave庫,若是這個時候刷新緩存的話,會直接把舊的數據刷到緩存裏的。mysql

備註:redis

筆者在處理這個問題時,刷新中央緩存的機制是使用MQ消息進行通知的。本文也是基於MQ這種技術背景下,想到的一些解決方案。複製代碼

本地緩存框架緩存數據

咱們能夠根據數據的update_time 來判斷master庫的數據是否已經同步到slave庫。假設有一個update數據庫的操做,經過update_time得知,最新的master庫的數據還未同步到slave庫,那麼咱們能夠把這條數據的主鍵存儲到本地緩存當中,例如使用LinkedBlockingQueue 這個隊列做爲本地緩存,將數據主鍵id存儲到隊列中,而後啓動一個job去掃描這個隊列,一旦發現隊列中有數據,則進行處理。在處理數據的過程當中,若是發現該條數據仍是未從master同步過來,那麼繼續把這條數據的主鍵放入隊列中,等待下一次的處理,一直到master庫的數據同步過來爲止。如若因爲數據庫緣由或者數據緣由或者代碼問題等,致使數據一直處於入隊列/出隊列的死循環當中,那麼咱們能夠爲數據設置一個出入隊列的次數,例如5次,超過五次的,則該條數據把它丟失掉。spring

下面列出一些僞代碼:sql

隊列實現

public class DelayQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueue.class);
    private static final int QUEUE_MAX_ELEMENT_COUNT = 20000;
    private  LinkedBlockingQueue<MessageElement> queue = new  LinkedBlockingQueue<MessageElement>(QUEUE_MAX_ELEMENT_COUNT);
    private static class SingletonHolder {
        private static final DelayQueue INSTANCE = new DelayQueue ();
    }

    private DelayQueue (){}

    public static final DelayQueue getInstance() {
        return SingletonHolder.INSTANCE;
    }


    /**
     *把元素插入隊列,若是此時隊列已滿,則丟棄掉
     */
    public void offer(MessageElement messageElement){
            boolean result = queue.offer(messageElement);
            //隊列滿了
            if (!result) {
                LOGGER.warn(dataBase masterSlaveDataDelayQueue full);
            }
    }

    /**
     * 把頭部的元素出棧
     */
    public MessageElement poll (){
        return queue.poll();
    }
}複製代碼

掃描本地緩存隊列的job

使用spring的定時任務註解:數據庫

/**
     *該方法是單線程調度的,若是該線程未執行完,後續的調度將不會執行
     */

    @Scheduled(cron="0 0/5 * * * ?")
    public void handleQueueMessage(){
        while(true){
            String result = "true";//最好從配置文件讀取,當值爲false時,不接收消息
            if (Constants.FALSE.equals(result)) {
                return;
            }
            MessageElement messageElement = DelayQueue .getInstance().poll();
            if (messageElement == null) {
                break;
            }
            LOGGER.info("receiveMessage from delay queue"+messageElement.toString());
            salesService.handleMessage(messageElement);
        }
    }複製代碼

消息體

public class MessageElement {

    private Long id;//數據主鍵
    private AtomicInteger count = new AtomicInteger();//控制出入隊列的最大次數

    public long getId() {
        return id;
    }
    public void setId(long id) {
        this.id = id;
    }

    public AtomicInteger getCount() {
        return count;
    }
    public void setCount(AtomicInteger count) {
        this.count = count;
    }
}複製代碼

備註:緩存

  1. 注意要設置隊列的最大容量,若是隊列中的數據數量超過最大容量,能夠根據本身的業務狀況,刪除隊頭或者再也不加入數據。bash

  2. 這個方案在應用重啓的數據,本地緩存會被清理,形成數據丟失。併發

  3. 必須有一個開關,控制是否接收消息。由於一旦生產者發送的併發量太大,會引發其餘問題,這個時候,能夠經過開關控制不接收消息,以便達到降級的效果。畢竟咱們只是刷新緩存而已,大不了不刷。框架

    使用MQ

    若是MQ有以下的特性的話,也能夠嘗試使用:ui

當數據未從master同步過來時,能夠把消息的狀態設置爲later,讓消息發送者每隔一段時間再次發送,例如2s後、5s後1分鐘後,這樣不斷的發送,直到一個小時後,中止發送。複製代碼

這樣的話,應用就無需使用本地緩存了,直接利用MQ。同時當應用重啓的時候,消息也不會丟失。


原文連接


實時刷新緩存-處理mysql主從延遲的一些設計方案

相關文章
相關標籤/搜索