【趙強老師】Redis的消息發佈與訂閱

Redis 做爲一個publish/subscribe server,起到了消息路由的功能。訂閱者能夠經過subscribe和psubscribe命令向Redis server訂閱本身感興趣的消息類型,當發佈者經過publish命令向Redis server發送特定類型的消息時。訂閱該消息類型的所有client都會收到此消息。這裏消息的傳遞是多對多的。一個client能夠訂閱多個channel,也能夠向多個channel發送消息。redis

下圖爲你們展現了Redis消息機制的體系架構。服務器

發佈者和訂閱者都是Redis客戶端,Channel則爲Redis服務器端,發佈者將消息發送到某個的頻道,訂閱了這個頻道的訂閱者就能接收到這條消息。Redis的這種發佈訂閱機制與基於主題的發佈訂閱相似,Channel至關於主題。架構

下面列出來了Redis發佈消息、訂閱消息的相關命令。ide

publish:
發送消息:Redis採用PUBLISH命令發送消息,其返回值爲接收到該消息的訂閱者的數量。 

subscribe:
訂閱某個頻道:Redis採用SUBSCRIBE命令訂閱某個頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數量,以及接收到的消息,其中subscribe表示已經成功訂閱了某個頻道。 

psubscribe:
模式匹配:模式匹配功能容許客戶端訂閱符合某個模式的頻道,Redis採用PSUBSCRIBE訂閱符合某個模式全部頻道,用「」表示模式,「」能夠被任意值代替。
 

案例一:一個消息生產者,兩個消息消費者測試

案例二:兩個消息生產者,一個消息消費者this

案例三:Redis消息機制的Java APIspa

添加依賴:code

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.1.0</version>
</dependency> 

 

消息監聽器類:server

import redis.clients.jedis.JedisPubSub;
 
public class RedisMsgPubSubListener extends JedisPubSub {
    @Override
    public void unsubscribe() {
        super.unsubscribe();
    }
 
    @Override
    public void unsubscribe(String... channels) {
        super.unsubscribe(channels);
    }
 
    @Override
    public void subscribe(String... channels) {
        super.subscribe(channels);
    }
 
    @Override
    public void psubscribe(String... patterns) {
        super.psubscribe(patterns);
    }
 
    @Override
    public void punsubscribe() {
        super.punsubscribe();
    }
 
    @Override
    public void punsubscribe(String... patterns) {
        super.punsubscribe(patterns);
    }
 
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("channel:" + channel + "receives message :" + message);
        this.unsubscribe();
    }
 
    @Override
    public void onPMessage(String pattern, String channel, String message) {
 
    }
 
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("channel:" + channel + "is been subscribed:" + subscribedChannels);
    }
 
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
 
    }
 
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
 
    }
 
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("channel:" + channel + "is been unsubscribed:" + subscribedChannels);
    }
}

 

測試程序:blog

import redis.clients.jedis.Jedis;

public class TestMain {

   @Test
    public void testSubscribe() throws Exception{
        Jedis jedis = new Jedis("localhost");
        RedisMsgPubSubListener listener = new RedisMsgPubSubListener();
        jedis.subscribe(listener, "redisChatTest");
        //other code
    }

    @Test
    public void testPublish() throws Exception{
        Jedis jedis = new Jedis("localhost");
        jedis.publish("redisChatTest", "Hello World");
        Thread.sleep(5000);
        jedis.publish("redisChatTest", "Hello Redis");
    }
}

相關文章
相關標籤/搜索