今天經理讓我實現一個Redis發佈/訂閱功能,用來記錄審計信息。我查了一天,才弄出來。總結以下,大神勿噴。次日我百度「JedisPubSub 」發現好多代碼,後來測試了一下發現,百度關鍵詞「Jedis訂閱發佈」確實沒多少能夠參考的,可是百度關鍵詞「Jedis的Publish/Subscribe」代碼就有不少,我也是醉了。html
redis的發佈/訂閱模式是消息機制之一,另一個叫生成者消費者模式。Redis發佈訂閱模式講解能夠參考菜鳥教程的這篇文章http://www.runoob.com/redis/redis-pub-sub.html。java
一、Redis發佈訂閱模式客戶端實現。在打開Redis服務器後,再打開兩個客戶端,客戶端1用來接收消息,客戶端2用來發布消息。mysql
客戶端1訂閱bar頻道。格式:SUBSCRIBE name1 name2。
成功訂閱回覆,分別對應訂閱類型、訂閱頻道、訂閱數量。web
127.0.0.1:6379> SUBSCRIBE bar
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "bar"3) (integer) 1
客戶端2,發送消息。格式:publish channelName Message。redis
127.0.0.1:6379> publish bar val
(integer) 1
客戶端1訂閱回覆,分別對應消息類型,頻道,消息。spring
1) "message"2) "bar"3) "val"
效果圖以下:sql
Redis支持模式匹配訂閱,*爲模糊匹配符。服務器
訂閱全部頻道的消息ide
PSUBSCRIBE *
訂閱以news.開頭的全部頻道。工具
PSUBSCRIBE news.*
其餘操做這裏不在贅述。
二、Java實現Redis的發佈訂閱。
Java實現Redis的功能大部分是使用jedis的jar包來進行操做。我的感受,jedis封裝了操做redis的經常使用命令,寫多了就會發現知道redis命令怎麼寫的,就能夠猜出來jedis中怎麼寫的。
首先在咱們封裝的JedisUtils中加入發佈和訂閱操做的方法。你們沒有JedisUtils的能夠參考JeeSite中的JedisUtils怎麼寫的。
/** * 發佈一個消息 * @param channel * @param message */ public static void publishMsg(String channel,String message){ Jedis jedis = null; try { jedis = getResource(); jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } } /** * 發佈一個消息 * @param channel * @param message */ public static void publishMsg(byte[] channel,byte[] message){ Jedis jedis = null; try { jedis = getResource(); jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } } |
上面的兩個方法的核心處理就是jedis.publish(channel, message);參數channel是消息的頻道,message是消息的內容。在Junit測試或者其餘的地方,使用工具類的此方法便可發佈一個消息。
接收消息代碼多一些。首先定義一個類繼承JedisPubSub,而後實現其中的未實現的方法,最後在工具類JedisUtils中定義一個操做的方法便可。代碼以下:
/** * 接收消息。在main方法調用後,會一直執行下去。當有發佈對應消息時,就會在jedisPubSub中接收到! * @param jedisPubSub * @param channels */ public static void subscribeMsg(JedisPubSub jedisPubSub,String channels){ Jedis jedis = null; try { jedis = getResource(); jedis.subscribe(jedisPubSub, channels); logger.debug("subscribeMsg {} = {}", jedisPubSub, channels); } catch (Exception e) { logger.warn("subscribeMsg {} = {}", jedisPubSub, channels, e); } finally { returnResource(jedis); } } |
JedisPubSub類的定義以下: package com.aq.web.shiro.redis.msg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPubSub; import com.aq.dao.info.RedisMsgAuditInfo; import com.aq.web.shiro.redis.SelfObjectUtils; /** * Redis監聽訂閱事件 * @author * */ public class RedisMsgPubSubListener extends JedisPubSub{ private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class); /** * 取得訂閱的消息後的處理 */ @Override public void onMessage(String channel, String message) { logger.debug("onMessage: channel["+channel+"], message["+message+"]"); //若是消息類型與定義的審計專用類型一致 if(RedisMsgJedisUtils.getRedisMsgChannelString().equalsIgnoreCase(channel)){ //處理審計的消息 this.AuditMsgHandler(message); } } /** * 取得按表達式的方式訂閱的消息後的處理 */ @Override public void onPMessage(String pattern, String channel, String message) { logger.debug("onPMessage: channel["+channel+"], message["+message+"]"); } /** * 初始化訂閱時候的處理 */ @Override public void onSubscribe(String channel, int subscribedChannels) { logger.debug("onSubscribe: channel["+channel+"],"+ "subscribedChannels["+subscribedChannels+"]"); } /** * 取消訂閱時候的處理 */ @Override public void onUnsubscribe(String channel, int subscribedChannels) { logger.debug("onUnsubscribe: channel["+channel+"], "+ "subscribedChannels["+subscribedChannels+"]"); } /** * 取消按表達式的方式訂閱時候的處理 */ @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.debug("onPUnsubscribe: pattern["+pattern+"],"+ "subscribedChannels["+subscribedChannels+"]"); } /** * 初始化按表達式的方式訂閱時候的處理 */ @Override public void onPSubscribe(String pattern, int subscribedChannels) { logger.debug("onPSubscribe: pattern["+pattern+"], "+ "subscribedChannels["+subscribedChannels+"]"); } /** * 私有方法:用於處理審計的消息 */ private void AuditMsgHandler(String message){ //審計日誌反序列化 String -> byte[] -> RedisMsgAuditInfo RedisMsgAuditInfo msg3 = (RedisMsgAuditInfo) SelfObjectUtils.unserialize(message.getBytes()); logger.debug(msg3.toString()); //TODO 後續怎麼存先不寫... } } 審計日誌反序列化部分代碼能夠省略掉。 |
最後Junit中測試以下:
package ap.shiro.redis.msg; import org.junit.Test; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; import com.aq.web.shiro.redis.JedisUtils; import com.aq.web.shiro.redis.msg.RedisMsgJedisUtils; import com.aq.web.shiro.redis.msg.RedisMsgPubSubListener; @ContextConfiguration(locations = "/mysql-test.xml") public class RedisMsgPubSubTester extends AbstractJUnit4SpringContextTests{ @Test public void testMsg_pub(){ RedisMsgJedisUtils.publishMsg("news.share", "2016年3月28日 15:34:37"); } @Test public void testMsg_sub(){ RedisMsgPubSubListener pubsub = new RedisMsgPubSubListener(); RedisMsgJedisUtils.subscribeMsg(pubsub, "news.share"); } } 建議測試時,分開方法進行測試。其中testMsg_pub()方法是用來測試發佈消息的,testMsg_sub()是用來測試接收訂閱消息的。這裏只是使用了普通訂閱,你們還可使用模式訂閱。執行testMsg_sub()方法後,客戶端會一直開啓着,不會關閉。另外,在其餘的redis客戶端中發佈一條消息,控制檯就會馬上輸出該消息。 |
這樣Redis的訂閱發佈的Java實現就完成了。整體不太難,我今天搜資料的時候發現資料不少也很亂,固然沒在開源中國搜。
另外,上面例子中使用的JedisPubSub類的子類接收消息。次日測試時發現,JedisPubSub類的子類接收字符串類的消息沒問題,可是接收對象轉byte[]的消息後不能正確地轉換回對象。這裏的對象是自定義的,經過ByteArray流和Object流完成Object與byte[]之間的轉換。
通過查資料後發現,使用BinaryJedisPubSub類的子類接收消息能夠正確地轉換對象,不會出現上述問題。你們本身試下。繼承BinaryJedisPubSub類的監聽器跟JedisPubSub類的監聽器相似。