Redis發佈訂閱機制

1. 什麼是Redis

Redis是一個開源的內存數據庫,它以鍵值對的形式存儲數據。因爲數據存儲在內存中,所以Redis的速度很快,可是每次重啓Redis服務時,其中的數據也會丟失,所以,Redis也提供了持久化存儲機制,將數據以某種形式保存在文件中,每次重啓時,能夠自動從文件加載數據到內存當中。 
這裏寫圖片描述 
Redis的架構包括兩個部分:Redis Client和Redis Server。Redis客戶端負責向服務器端發送請求並接受來自服務器端的響應。服務器端負責處理客戶端請求,例如,存儲數據,修改數據等。 
Redis一般用做數據庫,緩存以及消息系統。mysql

2. Redis發佈訂閱

2.1 Redis發佈訂閱架構

Redis提供了發佈訂閱功能,能夠用於消息的傳輸,Redis的發佈訂閱機制包括三個部分,發佈者,訂閱者和Channel。 
這裏寫圖片描述 
發佈者和訂閱者都是Redis客戶端,Channel則爲Redis服務器端,發佈者將消息發送到某個的頻道,訂閱了這個頻道的訂閱者就能接收到這條消息。Redis的這種發佈訂閱機制與基於主題的發佈訂閱相似,Channel至關於主題。redis

2.2 Redis發佈訂閱功能

(1)發送消息 
Redis採用PUBLISH命令發送消息,其返回值爲接收到該消息的訂閱者的數量。 
這裏寫圖片描述 
(2)訂閱某個頻道 
Redis採用SUBSCRIBE命令訂閱某個頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數量,以及接收到的消息,其中subscribe表示已經成功訂閱了某個頻道。 
這裏寫圖片描述 
(3)模式匹配 
模式匹配功能容許客戶端訂閱符合某個模式的頻道,Redis採用PSUBSCRIBE訂閱符合某個模式全部頻道,用「」表示模式,「」能夠被任意值代替。 
這裏寫圖片描述 
假設客戶端同時訂閱了某種模式和符合該模式的某個頻道,那麼發送給這個頻道的消息將被客戶端接收到兩次,只不過這兩條消息的類型不一樣,一個是message類型,一個是pmessage類型,但其內容相同。 
(4)取消訂閱 
Redis採用UNSUBSCRIBE和PUNSUBSCRIBE命令取消訂閱,其返回值與訂閱相似。 
因爲Redis的訂閱操做是阻塞式的,所以一旦客戶端訂閱了某個頻道或模式,就將會一直處於訂閱狀態直到退出。在SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE命令中,其返回值都包含了該客戶端當前訂閱的頻道和模式的數量,當這個數量變爲0時,該客戶端會自動退出訂閱狀態。sql

2.3 Redis發佈訂閱實現

因爲Redis是一個開源的系統,所以咱們能夠經過其源代碼查看內部的實現細節。 
(1)SUBSCRIBE 
這裏寫圖片描述 
當客戶端訂閱某個頻道時,Redis須要將該頻道和該客戶端綁定。首先,在客戶端結構體client中,有一個屬性爲pubsub_channels,該屬性代表了該客戶端訂閱的全部頻道,它是一個字典類型,經過哈希表實現,其中的每一個元素都包含了一個鍵值對以及指向下一個元素的指針,每次訂閱都要向其中插入一個結點,鍵表示訂閱的頻道,值爲空。而後,在表示服務器端的結構體redisServer中,也有一個屬性爲pubsub_channels,但此處它表示的是該服務器端中的全部頻道以及訂閱了這個頻道的客戶端,它也是一個字典類型,插入結點時,鍵表示頻道,值則是訂閱了這個頻道的全部客戶端組成的鏈表。最後Redis通知客戶端其訂閱成功。 
(2)PSUBSCRIBE 
這裏寫圖片描述 
當客戶端訂閱某個模式時,Redis一樣須要將該模式和該客戶端綁定。首先,在結構體client中,有一個屬性爲pubsub_patterns,該屬性表示該客戶端訂閱的全部模式,它是一個鏈表類型,每一個結點包括了訂閱的模式和指向下一個結點的指針,每次訂閱某個模式時,都要向其中插入一個結點。而後,在結構體redisServer中,有一個屬性也叫pubsub_patterns,它表示了該服務器端中的全部模式和訂閱了這些模式的客戶端,它也是一個鏈表類型,插入結點時,每一個結點都要包含訂閱的模式,以及訂閱這個模式的客戶端,和指向下一個結點的指針。 
(3)PUBLISH 
這裏寫圖片描述 
當客戶端向某個頻道發送消息時,Redis首先在結構體redisServer中的pubsub_channels中找出鍵爲該頻道的結點,遍歷該結點的值,即遍歷訂閱了該頻道的全部客戶端,將消息發送給這些客戶端。而後,遍歷結構體redisServer中的pubsub_patterns,找出包含該頻道的模式的結點,將消息發送給訂閱了該模式的客戶端。數據庫

2.4 Redis發佈訂閱在Redis中的應用

Redis的發佈訂閱功能與Redis中的數據存儲時無關的,它不會影響Redis的key space,即不會影響Redis中存儲的數據,但經過發佈訂閱機制,Redis還提供了另外一個功能,即Keyspace Notification,容許客戶端經過訂閱特定的頻道,從而得知是否有改變Redis中的數據的事件。例如,有一個客戶端刪除了Redis中鍵爲mykey的數據,該操做會觸發兩條消息,mykey del和del mykey,前者屬於頻道keysapce,表示keyspace發生的變化,後者屬於頻道keyevent,表示執行的操做。 
這裏寫圖片描述api

2.5 Redis發佈訂閱與ActiveMQ的比較

(1)ActiveMQ支持多種消息協議,包括AMQP,MQTT,Stomp等,而且支持JMS規範,但Redis沒有提供對這些協議的支持; 
(2)ActiveMQ提供持久化功能,但Redis沒法對消息持久化存儲,一旦消息被髮送,若是沒有訂閱者接收,那麼消息就會丟失; 
(3)ActiveMQ提供了消息傳輸保障,當客戶端鏈接超時或事務回滾等狀況發生時,消息會被從新發送給客戶端,Redis沒有提供消息傳輸保障。 
總之,ActiveMQ所提供的功能遠比Redis發佈訂閱要複雜,畢竟Redis不是專門作發佈訂閱的,可是若是系統中已經有了Redis,而且須要基本的發佈訂閱功能,就沒有必要再安裝ActiveMQ了,由於可能ActiveMQ提供的功能大部分都用不到,而Redis的發佈訂閱機制就能知足需求。緩存

創建一個Publisher (發佈者)服務器

複製代碼

public class Publisher extends Thread{

    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }
    
    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();   //鏈接池中取出一個鏈接
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);   //從 mychannel 的頻道上推送消息
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

複製代碼

再創建一個訂閱者架構

複製代碼

public class Subscriber extends JedisPubSub {

    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {       //收到消息會調用
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    //訂閱了頻道會調用
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消訂閱 會調用
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));

    }
}

複製代碼

這裏訂閱者須要繼承JedisPubSub,來重寫它的三個方法。用途 註釋上已經寫了,很簡單。ide

咱們這裏只是定義了一個訂閱者,下面去訂閱頻道。測試

複製代碼

public class SubThread extends Thread {

    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一個鏈接
            jedis.subscribe(subscriber, channel);    //經過subscribe 的api去訂閱,入參是訂閱者和頻道名
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

複製代碼

最後,再寫一個測試類去跑一下。鍵盤輸入消息,訂閱者就會觸發onMessage方法

複製代碼

public class PubSubDemo {

    public static void main( String[] args )
    {
        // 鏈接redis服務端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
        
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));

        SubThread subThread = new SubThread(jedisPool);  //訂閱者
        subThread.start();

        Publisher publisher = new Publisher(jedisPool);    //發佈者
        publisher.start();
    }
}

複製代碼

相關文章
相關標籤/搜索