Redis的Pub/Sub模式

Redis一樣支持消息的發佈/訂閱(Pub/Sub)模式,這和中間件activemq有些相似。訂閱者(Subscriber)能夠訂閱本身感興趣的頻道(Channel),發佈者(Publisher)能夠將消息發往指定的頻道(Channel),正式經過這種方式,能夠將消息的發送者和接收者解耦。另外,因爲能夠動態的Subscribe和Unsubscribe,也能夠提升系統的靈活性和可擴展性。java

關於如何搭建Redis環境,請參考其餘文章。這裏假設有一個可用的Redis環境(單節點和集羣都可)。redis

在redis-cli中使用Pub/Sub

普通channel的Pub/Sub

先用一個客戶端來訂閱頻道:shell

上圖中先使用redis-cli做爲客戶端鏈接了Redis,以後使用了SUBSCRIBE命令,後面的參數表示訂閱了china和hongkong兩個channel。能夠看到"SUBSCRIBE china hongkong"這條命令的輸出是6行(能夠分爲2組,每一組是一個Message)。由於訂閱、取消訂閱的操做跟發佈的消息都是經過消息(Message)的方式發送的,消息的第一個元素就是消息類型,它能夠是如下幾種類型:apache

subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.app

unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.maven

message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.ide

--from http://redis.io/topics/pubsub函數

上圖的訂閱命令將使得發往這兩個channel的消息會被這個客戶端接收到。須要注意的是,redis-cli客戶端在進入subscribe模式之後,將不能再響應其餘的任何命令測試

A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.ui

The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT

--from http://redis.io/topics/pubsub

官網說客戶端在subscribe下除了可使用以上命令外,不能使用其餘命令了。可是本人在Subscribe狀態下使用上述幾個命令,根本沒反應。也就是說,使用redis-cli訂閱channel後,該客戶端將不能響應任何命令。除非按下(ctrl+c),但該操做不是取消訂閱,而是退出redis-cli,此時將回到shell命令行下。

關於這個狀況,我在官網上沒有找到對這種狀況的解釋,也有很多的人在網上問,找來找去,本人以爲還算合理的解釋是:

On this page: http://redis.io/commands/subscribe applies only to those clients.

The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.

Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).

--from http://stackoverflow.com/questions/17621371/redis-unsubscribe

就是說,官網中說明的client,並不包含這裏使用的redis-cli,因而它能夠和其餘的client有不一樣表現。(先不糾結這個問題,稍後再用jedis來測試一下。)

接下來再用一個客戶端來發布消息:

能夠看到,新的一個客戶端使用PUBLISH命令往china頻道發佈了一條叫"China News"的消息,接下來再看看訂閱端:

能夠看見,這條消息已經被接收到了。能夠看到,收到的消息中第一個參數是類型"message",第二個參數是channel名字"china",第三個參數是消息內容"China News",這和開始說的message類型的結構一致。

通配符的Pub/Sub

Redis還支持通配符的訂閱和發佈。客戶端能夠訂閱知足一個或多個規則的channel消息,相應的命令是PSUBSCRIBE和PUNSUBSCRIBE。接下來咱們再用另外一個redis-cli客戶端來訂閱"chi*"的channel,如圖:

和subscribe/unsubscribe的輸出相似,能夠看到第一部分是消息類型「psubscribe」,第二部分是訂閱的規則「chi*」,第三部分則是該客戶端目前訂閱的全部規則個數。

接下來再發布一條消息到china這個channel中,此時,兩個訂閱者應該都能收到該消息:

實際測試結果跟預期相同。須要注意的是,訂閱者2經過通配符訂閱的,收到的消息類型是「pmessage」:

pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.

--from http://redis.io/topics/pubsub

第二部分是匹配的模式「chi*」,第三部分是實際的channel名字「china」,第四部分是消息內容「China Daily」。

咱們再發布一條消息到chinnna中,此時只有訂閱者2能接收到消息了:

一樣,在使用PSUBSCRIBE進入訂閱模式之後,該redis-cli也不能再監聽其餘任何的命令,要退出該模式,只能使用ctrl+c。

使用Jedis實現Pub/Sub

Jedis是Redis客戶端的一種Java實現,在http://redis.io/clients#java中也能找到。

這裏使用maven來管理包的依賴,因爲使用了Log4j來輸出日誌,所以會用到log4j的jar包:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

Jedis中的JedisPubSub抽象類提供了訂閱和取消的功能。想處理訂閱和取消訂閱某些channel的相關事件,咱們得擴展JedisPubSub類並實現相關的方法:

package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {//注意這裏繼承了抽象類JedisPubSub

    private static final Logger LOGGER = Logger.getLogger(Subscriber.class);

    @Override
    public void onMessage(String channel, String message) {
    	LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message));
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
    	LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s", 
    	    pattern, channel, message));
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onSubscribe");
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onUnsubscribe");
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPUnsubscribe");
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPSubscribe");
    }
}

有了訂閱者,咱們還須要一個發佈者:

package com.demo.redis;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;

public class Publisher {

    private static final Logger LOGGER = Logger.getLogger(Publisher.class);
    private final Jedis publisherJedis;
    private final String channel;

    public Publisher(Jedis publisherJedis, String channel) {
        this.publisherJedis = publisherJedis;
        this.channel = channel;
    }

    /**
     * 不停的讀取輸入,而後發佈到channel上面,遇到quit則中止發佈。
     */
    public void startPublish() {
    	LOGGER.info("Type your message (quit for terminate)");
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String line = reader.readLine();
                if (!"quit".equals(line)) {
                    publisherJedis.publish(channel, line);
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            LOGGER.error("IO failure while reading input", e);
        }
    }
}

爲簡單起見,這個發佈者接收控制檯的輸入,而後將輸入的消息發佈到指定的channel上面,若是輸入quit,則中止發佈消息。

接下來是主函數:

package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class Program {
    
    public static final String CHANNEL_NAME = "MyChannel";
    //我這裏的Redis是一個集羣,192.168.56.101和192.168.56.102均可以使用
    public static final String REDIS_HOST = "192.168.56.101";
    public static final int REDIS_PORT = 7000;
    
    private final static Logger LOGGER = Logger.getLogger(Program.class);
    private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
    private final static JedisPool JEDIS_POOL = 
            new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);
    
    public static void main(String[] args) throws Exception {
        final Jedis subscriberJedis = JEDIS_POOL.getResource();
        final Jedis publisherJedis = JEDIS_POOL.getResource();
        final Subscriber subscriber = new Subscriber();
        //訂閱線程:接收消息
        new Thread(new Runnable() {
            public void run() {
                try {
                    LOGGER.info("Subscribing to \"MyChannel\". This thread will be blocked.");
                    //使用subscriber訂閱CHANNEL_NAME上的消息,這一句以後,線程進入訂閱模式,阻塞。
                    subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                    
                    //當unsubscribe()方法被調用時,才執行如下代碼
                    LOGGER.info("Subscription ended.");
                } catch (Exception e) {
                    LOGGER.error("Subscribing failed.", e);
                }
            }
        }).start();
        
        //主線程:發佈消息到CHANNEL_NAME頻道上
        new Publisher(publisherJedis, CHANNEL_NAME).startPublish();
        publisherJedis.close();
        
        //Unsubscribe
        subscriber.unsubscribe();
        subscriberJedis.close();
    }
}

主類Program中定義了channel名字、鏈接redis的地址和端口,並使用JedisPool來獲取Jedis實例。因爲訂閱者(subscriber)在進入訂閱狀態後會阻塞線程,所以新起一個線程(new Thread())做爲訂閱線程,並是用主線程來發布消息。待發布者(類中的new Publisher)中止發佈消息(控制檯中輸入quit便可)時,解除訂閱者的訂閱(subscriber.unsubscribe()方法)。此時訂閱線程解除阻塞,打印結束的日誌並退出。

運行程序以前,還須要一個簡單的log4j配置以觀察輸出:

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n

運行Program,如下是執行結果:

從結果看,當訂閱者訂閱後,訂閱線程阻塞,主線程中的Publisher接收輸入後,發佈消息到MyChannel中,此時訂閱該channel的訂閱者收到消息並打印。


Jedis源碼簡要分析

關於使用UNSUBSCRIBE

開始使用redis-cli時,在subscriber進入監聽狀態後,並不能使用UNSUBSCRIBE和PUNSUBSCRIBE命令,如今在Jedis中,在訂閱線程阻塞時,經過在main線程中調用改subscriber的unsubscribe()方法來解除阻塞。查看Jedis源碼,其實該方法也就是給redis發送了一個UNSUBSCRIBE命令而已:

所以這裏是支持在「客戶端」使用UNSUBSCRIBE命令的。

關於訂閱者接收消息

在接收消息前,須要訂閱channel,訂閱完成以後,會執行一個循環,這個循環會一直阻塞,直到該Client沒有訂閱數爲止,以下圖:

中間省略的其餘行,主要是用於解析收到的Redis響應,這段代碼也是根據響應的第一部分肯定響應的消息類型,而後挨個解析響應的後續內容,最後根據解析到消息類型,並使用後續解析到的內容做爲參數來回調相應的方法,省略的內容以下:

final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bchannel = (byte[]) reply.get(1);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  //調用onSubscribe方法,該方法在咱們的Subscriber類中實現
  onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bchannel = (byte[]) reply.get(1);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  //調用onUnsubscribe方法,該方法在咱們的Subscriber類中實現
  onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
  final byte[] bchannel = (byte[]) reply.get(1);
  final byte[] bmesg = (byte[]) reply.get(2);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
  //調用onMessage方法,該方法在咱們的Subscriber類中實現
  onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
  final byte[] bpattern = (byte[]) reply.get(1);
  final byte[] bchannel = (byte[]) reply.get(2);
  final byte[] bmesg = (byte[]) reply.get(3);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
  //調用onPMessage方法,該方法在咱們的Subscriber類中實現
  onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bpattern = (byte[]) reply.get(1);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bpattern = (byte[]) reply.get(1);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  //調用onPUnsubscribe方法,該方法在咱們的Subscriber類中實現
  onPUnsubscribe(strpattern, subscribedChannels);
} else {
  //對於其餘Redis沒有定義的返回消息類型,則直接報錯
  throw new JedisException("Unknown message type: " + firstObj);
}

以上就是爲何咱們須要在Subscriber中實現這幾個方法的緣由了(這些方法並非抽象的,能夠選擇實現使用到的方法)。


參考:

http://redis.io/topics/pubsub

http://basrikahveci.com/a-simple-jedis-publish-subscribe-example

相關文章
相關標籤/搜索