你真的瞭解Redis的發佈訂閱?(含Java版實現源碼)

Redis發佈訂閱使用場景及JAVA代碼實現(含源碼)html

導語

Redis是咱們很經常使用的一款nosql數據庫產品,咱們一般會用Redis來配合關係型數據庫一塊兒使用,彌補關係型數據庫的不足。java

其中,Redis的發佈訂閱功能也是它的一大亮點。雖然它不是一款專門作發佈訂閱的產品,但其自帶的發佈訂閱功能已經知足咱們平常需求。web

那Redis的發佈訂閱功能的原理和它均可以用在哪些場景呢?今天咱們就來探討一下這個問題。redis

什麼是發佈訂閱

所謂發佈訂閱,就是消息發佈者發佈消息及消息訂閱者接收消息,兩者經過某種媒介關聯起來。這相似之前的『訂報』,當咱們訂閱了某種報紙後(好比財經報),每當報紙有新的期刊出版後,就會有郵遞員給咱們送過來。即,只有定了這種報紙纔會收到出版社發佈的這種新報紙。sql

Redis的發佈訂閱功能也是相似,首先要有消息的發佈者,其次要有消息的訂閱者。有了消息發佈者和訂閱者以後,還缺乏什麼?數據庫

那就是上述的『某種報紙』,並非出版社出版的每一種報紙(如人民日報,財經報,體育報)都給你送過來,而是明確你要定哪種,你定了哪種纔給你送哪種。編程

回到Redis的發佈訂閱上,上述的『某種報紙』就抽象爲頻道channel,客戶端訂閱了某channel後,當發佈者經過此channel發佈消息時,全部訂閱者就會收到該頻道發佈的消息。緩存

發佈和訂閱機制

當一個客戶端經過 PUBLISH 命令向訂閱者發送信息的時候,咱們稱這個客戶端爲發佈者(publisher)。

而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE命令接收信息的時候,咱們稱這個客戶端爲訂閱者(subscriber)。

爲了解耦發佈者(publisher)和訂閱者(subscriber)之間的關係,Redis 使用了 channel (頻道)做爲二者的中介 —— 發佈者將信息直接發佈給 channel ,而 channel 負責將信息發送給適當的訂閱者,發佈者和訂閱者之間沒有相互關係,也不知道對方的存在。app

如上圖所示, Redis client ARedis client B 訂閱了 channel-> Financial newspapers,當 Redis client C經過 channel-> Financial newspapers 發佈消息 Stocks are up today! 時, Redis client ARedis client B 就會收到該消息。

原理

Redis是使用C實現的,經過分析 Redis 源碼裏的 pubsub.c 文件,瞭解發佈和訂閱機制的底層實現,籍此加深對 Redis 的理解。異步

Redis 經過 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令實現發佈和訂閱功能。

經過 SUBSCRIBE 命令訂閱某頻道後,redis-server 裏維護了一個字典,字典的鍵就是一個個 channel ,而字典的值則是一個鏈表,鏈表中保存了全部訂閱這個 channel 的客戶端。SUBSCRIBE 命令的關鍵,就是將客戶端添加到給定 channel 的訂閱鏈表中。

經過 PUBLISH 命令向訂閱者發送消息,redis-server 會使用給定的頻道做爲鍵,在它所維護的 channel 字典中查找記錄了訂閱這個頻道的全部客戶端的鏈表,遍歷這個鏈表,將消息發佈給全部訂閱者。

詳細參考:Redis 發佈/訂閱機制原理分析

業務場景

明確了Redis發佈訂閱的原理和基本流程後,咱們來看一下Redis的發佈訂閱到底具體能作什麼。

一、異步消息通知

好比渠道在調支付平臺的時候,咱們能夠用回調的方式給支付平臺一個咱們的回調接口來通知咱們支付狀態,還能夠利用Redis的發佈訂閱來實現。好比咱們發起支付的同時訂閱頻道pay_notice_ + wk (假如咱們的渠道標識是wk,不能讓其餘渠道也訂閱這個頻道),當支付平臺處理完成後,支付平臺往該頻道發佈消息,告訴頻道的訂閱者該訂單的支付信息及狀態。收到消息後,根據消息內容更新訂單信息及後續操做。

當不少人都調用支付平臺時,支付時都去訂閱同一個頻道會有問題。好比用戶A支付完訂閱頻道pay_notice_wk,在支付平臺未處理完時,用戶B支付完也訂閱了pay_notice_wk,當A收到通知後,接着B的支付通知也發佈了,這時渠道收不到第二次消息發佈。由於同一個頻道收到消息後,訂閱自動取消,也就是訂閱是一次性的。

因此咱們訂閱的訂單支付狀態的頻道就得惟一,一個訂單一個頻道,咱們能夠在頻道上加上訂單號pay_notice_wk+orderNo保證頻道惟一。這樣咱們能夠把頻道號在支付時當作參數一併傳過去,支付平臺處理完就能夠用此頻道發佈消息給咱們了。(實際大多使用接口回調通知的方式,由於用Redis發佈訂閱限制條件苛刻,系統間必須共用一套Redis)

二、任務通知

好比經過跑批系統通知應用系統作一些事(跑批系統沒法拿到用戶數據,且應用系統又不能作定時任務的狀況下)。如天天凌晨3點提早加載一些用戶的用戶數據到Redis,應用系統不能作定時任務,能夠經過系統公共的Redis來由跑批系統發佈任務給應用系統,應用系統收到指令,去作相應的操做。

這裏須要注意的是在線上集羣部署的狀況下,全部服務實例都會收到通知,都要作一樣的操做嗎?徹底不必。能夠用Redis實現鎖機制,其中一臺實例拿到鎖後執行任務。另外若是任務比較耗時,能夠不用鎖,能夠考慮一下任務分片執行。固然這不在本文的討論範疇,這裏不在贅述。

三、參數刷新加載

衆所周知,咱們用Redis無非就是將系統中不怎麼變的、查詢又比較頻繁的數據緩存起來,例如咱們系統首頁的輪播圖啊,頁面的動態連接啊,一些系統參數啊,公共數據啊都加載到Redis,而後有個後臺管理系統去配置修改這些數據。

打個比方咱們首頁的輪播圖要再增長一個圖,那咱們就在後管系統加上,加上就完事了嗎?固然沒有,由於Redis裏仍是老數據。那你會說不是有過時時間嗎?是的,但有的過時時間設置的較長如24小時而且咱們想當即生效怎麼辦?這時候咱們就能夠利用Redis的發佈訂閱機制來實現數據的實時刷新。當咱們修改完數據後,點擊刷新按鈕,經過發佈訂閱機制,訂閱者接收到消息後調用從新加載的方法便可。

代碼實現

發佈訂閱的理論以及使用場景你們都已經有了大體瞭解了,可是怎麼用代碼實現發佈訂閱呢?在這裏給你們分享一下實現方式。

咱們以第三種使用場景爲例,先來看一下總體實現類圖吧。

解釋一下,這裏咱們首先定義一個統一接口ICacheUpdate,只有一個update方法,咱們令Service層實現這個方法,執行具體的更新操做。咱們再來看RedisMsgPubSub,它繼承redis.clients.jedis.JedisPubSub,主要重寫其onMessage()方法(訂閱的頻道有消息到來時會觸發這個方法),咱們在這個方法裏調用RedisMsgPubSubupdate方法執行更新操做。當咱們有多個Service實現ICacheUpdate時,咱們就很是迫切地須要一個管理器來集中管理這些Service,而且當觸發onMessage方法時要告訴onMessage方法具體調用哪一個ICacheUpdate的實現類,因此咱們有了PubSubManager。而且咱們單獨開啓一個線程來維護髮布訂閱,因此管理器繼承了Thread類。

具體代碼:

統一接口

ICacheUpdate.java

public interface ICacheUpdate {
    public void update();
}
複製代碼

Service層

實現ICacheUpdate的update方法,執行具體的更新操做

InfoService.java

public class InfoService implements ICacheUpdate {
	private static Logger logger = LoggerFactory.getLogger(InfoService.class);
	@Autowired
	private RedisCache redisCache;
	@Autowired
	private InfoMapper infoMapper;
	/** * 按信息類型分類查詢信息 * @return */
	public Map<String, List<Map<String, Object>>> selectAllInfo(){
		Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>();
		List<String> infoTypeList = infoMapper.selectInfoType();//信息表中全部涉及的信息類型
		logger.info("-------按信息類型查找公共信息開始----"+infoTypeList);
		if(infoTypeList!=null && infoTypeList.size()>0) {
			for (String infoType : infoTypeList) {
				List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType);
				resultMap.put(infoType, result);
			}
		}
		return resultMap;
	}
	@Override
	public void update() {
		//緩存首頁信息
		logger.info("InfoService selectAllInfo 刷新緩存");
		Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo();
		Set<String> keySet = resultMap.keySet();
		for(String key:keySet){
			List<Map<String, Object>> value = resultMap.get(key);
			redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value);
		}
	}
}
複製代碼

Redis發佈訂閱的擴展類

做用:

一、統一管理ICacheUpdate,把全部實現ICacheUpdate接口的類添加到updates容器

二、重寫onMessage方法,訂閱到消息後進行刷新緩存的操做

RedisMsgPubSub.java

/** * Redis發佈訂閱的擴展類 * 做用:一、統一管理ICacheUpdate,把全部實現ICacheUpdate接口的類添加到updates容器 * 二、重寫onMessage方法,訂閱到消息後進行刷新緩存的操做 */
public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class);
    private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>();
    //一、由updates統一管理ICacheUpdate
    public boolean addListener(String key , ICacheUpdate update) {
        if(update == null) 
            return false;
	updates.put(key, update);
	return true;
    }
    /** * 二、重寫onMessage方法,訂閱到消息後進行刷新緩存的操做 * 訂閱頻道收到的消息 */
    @Override  
    public void onMessage(String channel, String message) {
        logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message);
        ICacheUpdate updater = null;
        if(StringUtil.isNotEmpty(message)) 
            updater = updates.get(message);
        if(updater!=null)
            updater.update();
    }
    //other code...
}
複製代碼

發佈訂閱的管理器

執行的操做:

一、將全部須要刷新加載的Service類(實現ICacheUpdate接口)添加到RedisMsgPubSub的updates中

二、啓動線程訂閱pubsub_config頻道,收到消息後的五秒後再次訂閱(避免訂閱到一次消息後結束訂閱)

PubSubManager.java

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class);

    public static Jedis jedis;
    RedisMsgPubSub msgPubSub = new RedisMsgPubSub();
    //頻道
    public static final String PUNSUB_CONFIG = "pubsub_config";
    //1.將全部須要刷新加載的Service類(實現ICacheUpdate接口)添加到RedisMsgPubSub的updates中
    public boolean addListener(String key, ICacheUpdate listener){
        return msgPubSub.addListener(key,listener);
    }
    @Override
    public void run(){
        while (true){
            try {
                JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class);
                if(jedisPool!=null){
                    jedis = jedisPool.getResource();
                    if(jedis!=null){
                        //2.啓動線程訂閱pubsub_config頻道 阻塞
                        jedis.subscribe(msgPubSub,PUNSUB_CONFIG);
                    }
                }
            } catch (Exception e) {
                logger.error("redis connect error!");
            } finally {
                if(jedis!=null)
                    jedis.close();
            }
            try {
                //3.收到消息後的五秒後再次訂閱(避免訂閱到一次消息後結束訂閱)
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("InterruptedException in redis sleep!");
            }
        }
    }
}
複製代碼

到此,Redis的發佈訂閱大體已經實現。咱們何時啓用呢?咱們能夠選擇在啓動項目時完成訂閱和基礎數據的加載,因此咱們經過實現javax.servlet.SevletContextListener來完成這一操做。而後將監聽器添加到web.xml

CacheInitListener.java

/** * 加載系統參數 */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class);

    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
    }

    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        logger.info("---CacheListener初始化開始---");
        init();
        logger.info("---CacheListener初始化結束---");
    }

    public void init() {
        try {
            //得到管理器
            PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class);

            InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
            //添加到管理器
            pubSubManager.addListener("infoService", infoService);
            //other service...

            //啓動線程執行訂閱操做
            pubSubManager.start();
            //初始化加載
            loadParamToRedis();
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        }
    }

    private void loadParamToRedis() {
        InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
        infoService.update();
        //other service...
    }
}
複製代碼

web.xml

<listener>
	<listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>
複製代碼

【end】

文章首發於公衆號@編程大道

相關文章
相關標籤/搜索