Java實現Redis發佈/訂閱

    今天經理讓我實現一個Redis發佈/訂閱功能,用來記錄審計信息。我查了一天,才弄出來。總結以下,大神勿噴。次日我百度「JedisPubSub 」發現好多代碼,後來測試了一下發現,百度關鍵詞「Jedis訂閱發佈」確實沒多少能夠參考的,可是百度關鍵詞「Jedis的Publish/Subscribe」代碼就有不少,我也是醉了。html

    redis的發佈/訂閱模式是消息機制之一,另一個叫生成者消費者模式。Redis發佈訂閱模式講解能夠參考菜鳥教程的這篇文章http://www.runoob.com/redis/redis-pub-sub.htmljava

    一、Redis發佈訂閱模式客戶端實現。在打開Redis服務器後,再打開兩個客戶端,客戶端1用來接收消息,客戶端2用來發布消息。mysql

客戶端1訂閱bar頻道。格式:SUBSCRIBE name1 name2。 
成功訂閱回覆,分別對應訂閱類型、訂閱頻道、訂閱數量。
web

127.0.0.1:6379> SUBSCRIBE bar
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "bar"3) (integer) 1

客戶端2,發送消息。格式:publish channelName Message。redis

127.0.0.1:6379> publish bar val
(integer) 1

客戶端1訂閱回覆,分別對應消息類型,頻道,消息。spring

1) "message"2) "bar"3) "val"

效果圖以下:sql

Redis支持模式匹配訂閱,*爲模糊匹配符。服務器

訂閱全部頻道的消息ide

PSUBSCRIBE *  

訂閱以news.開頭的全部頻道。工具

PSUBSCRIBE news.*

    其餘操做這裏不在贅述。

    

    二、Java實現Redis的發佈訂閱。

    Java實現Redis的功能大部分是使用jedis的jar包來進行操做。我的感受,jedis封裝了操做redis的經常使用命令,寫多了就會發現知道redis命令怎麼寫的,就能夠猜出來jedis中怎麼寫的。

    首先在咱們封裝的JedisUtils中加入發佈和訂閱操做的方法。你們沒有JedisUtils的能夠參考JeeSite中的JedisUtils怎麼寫的。

 

 /**
  * 發佈一個消息
  * @param channel
  * @param message
  */
 public static void publishMsg(String channel,String message){
  Jedis jedis = null;
  try {
   jedis = getResource();
   
jedis.publish(channel, message);
   logger.debug("publishMsg {} = {}", channel, message);
  } catch (Exception e) {
   logger.warn("publishMsg {} = {}", channel, message, e);
  } finally {
   returnResource(jedis);
  }
 }
 
 /**
  * 發佈一個消息
  * @param channel
  * @param message
  */
 public static void publishMsg(byte[] channel,byte[] message){
  Jedis jedis = null;
  try {
   jedis = getResource();
  
 jedis.publish(channel, message);
   logger.debug("publishMsg {} = {}", channel, message);
  } catch (Exception e) {
   logger.warn("publishMsg {} = {}", channel, message, e);
  } finally {
   returnResource(jedis);
  }
 }

 

     上面的兩個方法的核心處理就是jedis.publish(channel, message);參數channel是消息的頻道,message是消息的內容。在Junit測試或者其餘的地方,使用工具類的此方法便可發佈一個消息。

    接收消息代碼多一些。首先定義一個類繼承JedisPubSub,而後實現其中的未實現的方法,最後在工具類JedisUtils中定義一個操做的方法便可。代碼以下:

 /**
  * 接收消息。在main方法調用後,會一直執行下去。當有發佈對應消息時,就會在jedisPubSub中接收到!
  * @param jedisPubSub
  * @param channels
  */
 public static void subscribeMsg(JedisPubSub jedisPubSub,String channels){
  Jedis jedis = null;
  try {
   jedis = getResource();
   jedis.subscribe(jedisPubSub, channels);
   logger.debug("subscribeMsg {} = {}", jedisPubSub, channels);
  } catch (Exception e) {
   logger.warn("subscribeMsg {} = {}", jedisPubSub, channels, e);
  } finally {
   returnResource(jedis);
  }
 }

JedisPubSub類的定義以下:

package com.aq.web.shiro.redis.msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;
import com.aq.dao.info.RedisMsgAuditInfo;
import com.aq.web.shiro.redis.SelfObjectUtils;
/**
 * Redis監聽訂閱事件 
 * @author 
 *
 */
public class RedisMsgPubSubListener extends JedisPubSub{
 private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
 
 
 /**
  * 取得訂閱的消息後的處理
  */
 @Override
 public void onMessage(String channel, String message) {
  logger.debug("onMessage: channel["+channel+"], message["+message+"]");
  
  //若是消息類型與定義的審計專用類型一致
  if(RedisMsgJedisUtils.getRedisMsgChannelString().equalsIgnoreCase(channel)){
   //處理審計的消息
   this.AuditMsgHandler(message);
  }
 }
 /**
  * 取得按表達式的方式訂閱的消息後的處理
  */
 @Override
 public void onPMessage(String pattern, String channel, String message) {
  logger.debug("onPMessage: channel["+channel+"], message["+message+"]");
  
 }
 /**
  * 初始化訂閱時候的處理   
  */
 @Override
 public void onSubscribe(String channel, int subscribedChannels) {
   logger.debug("onSubscribe: channel["+channel+"],"+
     "subscribedChannels["+subscribedChannels+"]");
  
 }
 /**
  * 取消訂閱時候的處理 
  */
 @Override
 public void onUnsubscribe(String channel, int subscribedChannels) {
  logger.debug("onUnsubscribe: channel["+channel+"], "+
                                               "subscribedChannels["+subscribedChannels+"]");
  
 }
 /**
  * 取消按表達式的方式訂閱時候的處理
  */
 @Override
 public void onPUnsubscribe(String pattern, int subscribedChannels) {
   logger.debug("onPUnsubscribe: pattern["+pattern+"],"+
                                               "subscribedChannels["+subscribedChannels+"]");
  
 }
 
 /**
  * 初始化按表達式的方式訂閱時候的處理 
  */
 @Override
 public void onPSubscribe(String pattern, int subscribedChannels) {
  logger.debug("onPSubscribe: pattern["+pattern+"], "+
                                               "subscribedChannels["+subscribedChannels+"]");
  
 }
 
 /**
  * 私有方法:用於處理審計的消息
  */
 private void AuditMsgHandler(String message){
  //審計日誌反序列化 String -> byte[] -> RedisMsgAuditInfo
  RedisMsgAuditInfo msg3 = (RedisMsgAuditInfo) SelfObjectUtils.unserialize(message.getBytes());
  logger.debug(msg3.toString());
  //TODO 後續怎麼存先不寫...
 }
 
}

 審計日誌反序列化部分代碼能夠省略掉。

 

 最後Junit中測試以下:

 

package ap.shiro.redis.msg;
import org.junit.Test;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import com.aq.web.shiro.redis.JedisUtils;
import com.aq.web.shiro.redis.msg.RedisMsgJedisUtils;
import com.aq.web.shiro.redis.msg.RedisMsgPubSubListener;
@ContextConfiguration(locations = "/mysql-test.xml")
public class RedisMsgPubSubTester extends AbstractJUnit4SpringContextTests{
 
 @Test
 public void testMsg_pub(){
  
  RedisMsgJedisUtils.publishMsg("news.share", "2016年3月28日 15:34:37");
  
 }
 
 @Test
 public void testMsg_sub(){
  RedisMsgPubSubListener pubsub = new RedisMsgPubSubListener();
  RedisMsgJedisUtils.subscribeMsg(pubsub, "news.share");
  
 }
}

    建議測試時,分開方法進行測試。其中testMsg_pub()方法是用來測試發佈消息的,testMsg_sub()是用來測試接收訂閱消息的。這裏只是使用了普通訂閱,你們還可使用模式訂閱。執行testMsg_sub()方法後,客戶端會一直開啓着,不會關閉。另外,在其餘的redis客戶端中發佈一條消息,控制檯就會馬上輸出該消息。

    這樣Redis的訂閱發佈的Java實現就完成了。整體不太難,我今天搜資料的時候發現資料不少也很亂,固然沒在開源中國搜。

    另外,上面例子中使用的JedisPubSub類的子類接收消息。次日測試時發現,JedisPubSub類的子類接收字符串類的消息沒問題,可是接收對象轉byte[]的消息後不能正確地轉換回對象。這裏的對象是自定義的,經過ByteArray流和Object流完成Object與byte[]之間的轉換。

    通過查資料後發現,使用BinaryJedisPubSub類的子類接收消息能夠正確地轉換對象,不會出現上述問題。你們本身試下。繼承BinaryJedisPubSub類的監聽器跟JedisPubSub類的監聽器相似。

相關文章
相關標籤/搜索