在項目開發當中,常常有這樣一種場景,對數據庫進行添加、修改、刪除操做的應用直接鏈接master庫,只對數據庫進行查詢的應用,會先創建一箇中央緩存,例如redis或者memcache,若是緩存沒有命中,那麼直接訪問slave庫。下文會介紹一下在刷新中央緩存時,若是發生主從延遲,應該如何處理。也便是,當應用System-A 把數據庫寫入master庫的時候,System-B應用在讀取slave庫的時候,master庫的數據還沒同步到slave庫,若是這個時候刷新緩存的話,會直接把舊的數據刷到緩存裏的。mysql
備註:redis
筆者在處理這個問題時,刷新中央緩存的機制是使用MQ消息進行通知的。本文也是基於MQ這種技術背景下,想到的一些解決方案。複製代碼
咱們能夠根據數據的update_time
來判斷master庫的數據是否已經同步到slave庫。假設有一個update數據庫的操做,經過update_time得知,最新的master庫的數據還未同步到slave庫,那麼咱們能夠把這條數據的主鍵存儲到本地緩存當中,例如使用LinkedBlockingQueue
這個隊列做爲本地緩存,將數據主鍵id存儲到隊列中,而後啓動一個job去掃描這個隊列,一旦發現隊列中有數據,則進行處理。在處理數據的過程當中,若是發現該條數據仍是未從master同步過來,那麼繼續把這條數據的主鍵放入隊列中,等待下一次的處理,一直到master庫的數據同步過來爲止。如若因爲數據庫緣由或者數據緣由或者代碼問題等,致使數據一直處於入隊列/出隊列的死循環當中,那麼咱們能夠爲數據設置一個出入隊列的次數,例如5次,超過五次的,則該條數據把它丟失掉。spring
下面列出一些僞代碼:sql
public class DelayQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueue.class);
private static final int QUEUE_MAX_ELEMENT_COUNT = 20000;
private LinkedBlockingQueue<MessageElement> queue = new LinkedBlockingQueue<MessageElement>(QUEUE_MAX_ELEMENT_COUNT);
private static class SingletonHolder {
private static final DelayQueue INSTANCE = new DelayQueue ();
}
private DelayQueue (){}
public static final DelayQueue getInstance() {
return SingletonHolder.INSTANCE;
}
/**
*把元素插入隊列,若是此時隊列已滿,則丟棄掉
*/
public void offer(MessageElement messageElement){
boolean result = queue.offer(messageElement);
//隊列滿了
if (!result) {
LOGGER.warn(dataBase masterSlaveDataDelayQueue full);
}
}
/**
* 把頭部的元素出棧
*/
public MessageElement poll (){
return queue.poll();
}
}複製代碼
使用spring的定時任務註解:數據庫
/**
*該方法是單線程調度的,若是該線程未執行完,後續的調度將不會執行
*/
@Scheduled(cron="0 0/5 * * * ?")
public void handleQueueMessage(){
while(true){
String result = "true";//最好從配置文件讀取,當值爲false時,不接收消息
if (Constants.FALSE.equals(result)) {
return;
}
MessageElement messageElement = DelayQueue .getInstance().poll();
if (messageElement == null) {
break;
}
LOGGER.info("receiveMessage from delay queue"+messageElement.toString());
salesService.handleMessage(messageElement);
}
}複製代碼
public class MessageElement {
private Long id;//數據主鍵
private AtomicInteger count = new AtomicInteger();//控制出入隊列的最大次數
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}複製代碼
備註:緩存
注意要設置隊列的最大容量,若是隊列中的數據數量超過最大容量,能夠根據本身的業務狀況,刪除隊頭或者再也不加入數據。bash
這個方案在應用重啓的數據,本地緩存會被清理,形成數據丟失。併發
必須有一個開關,控制是否接收消息。由於一旦生產者發送的併發量太大,會引發其餘問題,這個時候,能夠經過開關控制不接收消息,以便達到降級的效果。畢竟咱們只是刷新緩存而已,大不了不刷。框架
若是MQ有以下的特性的話,也能夠嘗試使用:ui
當數據未從master同步過來時,能夠把消息的狀態設置爲later,讓消息發送者每隔一段時間再次發送,例如2s後、5s後1分鐘後,這樣不斷的發送,直到一個小時後,中止發送。複製代碼
這樣的話,應用就無需使用本地緩存了,直接利用MQ。同時當應用重啓的時候,消息也不會丟失。