Java實現Redis發佈/訂閱

今天經理讓我實現一個Redis發佈/訂閱功能,用來記錄審計信息。我查了一天,才弄出來。總結以下,大神勿噴。html

redis的發佈/訂閱模式是消息機制之一,另一個叫生成者消費者模式。Redis發佈訂閱模式講解能夠參考菜鳥教程的這篇文章http://www.runoob.com/redis/redis-pub-sub.html。

一、Redis發佈訂閱模式客戶端實現。在打開Redis服務器後,再打開兩個客戶端,客戶端1用來接收消息,客戶端2用來發布消息。

客戶端1訂閱bar頻道。格式:SUBSCRIBE name1 name2。 成功訂閱回覆,分別對應訂閱類型、訂閱頻道、訂閱數量。 127.0.0.1:6379> SUBSCRIBE bar Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "bar"3) (integer) 1 客戶端2,發送消息。格式:publish channelName Message。 127.0.0.1:6379> publish bar val (integer) 1 客戶端1訂閱回覆,分別對應消息類型,頻道,消息。mysql

  1. "message"2) "bar"3) "val" 效果圖以下:

Redis支持模式匹配訂閱,*爲模糊匹配符。web

訂閱全部頻道的消息redis

PSUBSCRIBE *
訂閱以news.開頭的全部頻道。spring

PSUBSCRIBE news.* 其餘操做這裏不在贅述。sql

二、Java實現Redis的發佈訂閱。

Java實現Redis的功能大部分是使用jedis的jar包來進行操做。我的感受,jedis封裝了操做redis的經常使用命令,寫多了就會發現知道redis命令怎麼寫的,就能夠猜出來jedis中怎麼寫的。

首先在咱們封裝的JedisUtils中加入發佈和訂閱操做的方法。你們沒有JedisUtils的能夠參考JeeSite中的JedisUtils怎麼寫的。

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 /**服務器

  • 發佈一個消息
  • @param channel
  • @param message */ public static void publishMsg(String channel,String message){ Jedis jedis = null; try { jedis = getResource();

jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } }ide

/**工具

  • 發佈一個消息
  • @param channel
  • @param message */ public static void publishMsg(byte[] channel,byte[] message){ Jedis jedis = null; try { jedis = getResource();

jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } }測試

上面的兩個方法的核心處理就是jedis.publish(channel, message);參數channel是消息的頻道,message是消息的內容。在Junit測試或者其餘的地方,使用工具類的此方法便可發佈一個消息。

接收消息代碼多一些。首先定義一個類繼承JedisPubSub,而後實現其中的未實現的方法,最後在工具類JedisUtils中定義一個操做的方法便可。代碼以下:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 /**

  • 接收消息。在main方法調用後,會一直執行下去。當有發佈對應消息時,就會在jedisPubSub中接收到!
  • @param jedisPubSub
  • @param channels */ public static void subscribeMsg(JedisPubSub jedisPubSub,String channels){ Jedis jedis = null; try { jedis = getResource(); jedis.subscribe(jedisPubSub, channels); logger.debug("subscribeMsg {} = {}", jedisPubSub, channels); } catch (Exception e) { logger.warn("subscribeMsg {} = {}", jedisPubSub, channels, e); } finally { returnResource(jedis); } } JedisPubSub類的定義以下:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.aq.web.shiro.redis.msg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPubSub; import com.aq.dao.info.RedisMsgAuditInfo; import com.aq.web.shiro.redis.SelfObjectUtils; /**

  • Redis監聽訂閱事件
  • @author

*/ public class RedisMsgPubSubListener extends JedisPubSub{ private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);

/**

  • 取得訂閱的消息後的處理 */ @Override public void onMessage(String channel, String message) { logger.debug("onMessage: channel["+channel+"], message["+message+"]");

//若是消息類型與定義的審計專用類型一致 if(RedisMsgJedisUtils.getRedisMsgChannelString().equalsIgnoreCase(channel)){ //處理審計的消息 this.AuditMsgHandler(message); } } /**

  • 取得按表達式的方式訂閱的消息後的處理 */ @Override public void onPMessage(String pattern, String channel, String message) { logger.debug("onPMessage: channel["+channel+"], message["+message+"]");

} /**

  • 初始化訂閱時候的處理
    */ @Override public void onSubscribe(String channel, int subscribedChannels) { logger.debug("onSubscribe: channel["+channel+"],"+ "subscribedChannels["+subscribedChannels+"]");

} /**

  • 取消訂閱時候的處理 */ @Override public void onUnsubscribe(String channel, int subscribedChannels) { logger.debug("onUnsubscribe: channel["+channel+"], "+ "subscribedChannels["+subscribedChannels+"]");

} /**

  • 取消按表達式的方式訂閱時候的處理 */ @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.debug("onPUnsubscribe: pattern["+pattern+"],"+ "subscribedChannels["+subscribedChannels+"]");

}

/**

  • 初始化按表達式的方式訂閱時候的處理 */ @Override public void onPSubscribe(String pattern, int subscribedChannels) { logger.debug("onPSubscribe: pattern["+pattern+"], "+ "subscribedChannels["+subscribedChannels+"]");

}

/**

  • 私有方法:用於處理審計的消息 */ private void AuditMsgHandler(String message){ //審計日誌反序列化 String -> byte[] -> RedisMsgAuditInfo RedisMsgAuditInfo msg3 = (RedisMsgAuditInfo) SelfObjectUtils.unserialize(message.getBytes()); logger.debug(msg3.toString()); //TODO 後續怎麼存先不寫... }

} 審計日誌反序列化部分代碼能夠省略掉。

最後Junit中測試以下:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package ap.shiro.redis.msg; import org.junit.Test; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; import com.aq.web.shiro.redis.JedisUtils; import com.aq.web.shiro.redis.msg.RedisMsgJedisUtils; import com.aq.web.shiro.redis.msg.RedisMsgPubSubListener; @ContextConfiguration(locations = "/mysql-test.xml") public class RedisMsgPubSubTester extends AbstractJUnit4SpringContextTests{

@Test public void testMsg_pub(){

RedisMsgJedisUtils.publishMsg("news.share", "2016年3月28日 15:34:37");

}

@Test public void testMsg_sub(){ RedisMsgPubSubListener pubsub = new RedisMsgPubSubListener(); RedisMsgJedisUtils.subscribeMsg(pubsub, "news.share");

} } 建議測試時,分開方法進行測試。其中testMsg_pub()方法是用來測試發佈消息的,testMsg_sub()是用來測試接收訂閱消息的。這裏只是使用了普通訂閱,你們還可使用模式訂閱。執行testMsg_sub()方法後,客戶端會一直開啓着,不會關閉。另外,在其餘的redis客戶端中發佈一條消息,控制檯就會馬上輸出該消息。

這樣Redis的訂閱發佈的Java實現就完成了。整體不太難,我今天搜資料的時候發現資料不少也很亂,固然沒在開源中國搜。
相關文章
相關標籤/搜索