Redis的Keyspace notifications功能初探

本文出處:http://blog.csdn.net/chaijunkun/article/details/27361453,轉載請註明。因爲本人不按期會整理相關博文,會對相應內容做出完善。所以強烈建議在原始出處查看此文。 html


最近在作一套系統,其中要求若干個Worker服務器將心跳信息都上報給中央服務器。當必定時間中央服務器沒有獲得心跳信息時則認爲該Worker失效了,發出告警。 java


知足這種需求的解決方法多種多樣,我開始想到了memcache,上報一次心跳信息就刷新一次緩存,當緩存心裏跳信息對象超時被刪除,即認爲對應的Worker失效。然而因爲memcache的工做原理,刪除都是被動的,咱們沒法及時判斷數據是否過時,即使知道了數據過時,也沒有一種機制來回調方法來執行自定義的處理動做。難道緩存架構就真的不行了嗎? redis


答案是否認的。在Redis 2.8.0版本起,加入了「Keyspace notifications」(即「鍵空間通知」)的功能。如何理解該功能呢?咱們來看下官方是怎麼說的: 數據庫

鍵空間通知,容許Redis客戶端從「發佈/訂閱」通道中創建訂閱關係,以便客戶端可以在Redis中的數據因某種方式受到影響時收到相應事件。
緩存

可能接收到的事件舉例以下:
服務器

影響一個給出的鍵的全部命令(會告訴你哪一個鍵被執行了一個命令,這個命令是什麼);
架構

接收到了一個LPUSH操做的全部鍵(LPUSH命令:key v1 [v2 v3..]將指定的全部值從左到右進行壓棧操做,造成一個棧,並將該棧命名爲指定的key);
app

在數據庫0中失效的全部鍵(不必定非得是數據庫0,這裏這樣表述其實想表達能夠知道影響的哪一個數據庫)。 ide


看到這裏我聯想到,若是一條緩存數據失效了,經過訂閱關係,客戶端會收到消息,經過分析消息能夠得知何種消息,分析消息內容能夠知道是哪一個key失效了。這樣就能夠間接實現開頭所描述的功能。 測試


接下來咱們來看下實驗的步驟:

1.準備redis服務器

做爲開源軟件,redis下載後獲得的是源代碼,使用tar -xzvf redis-2.8.9.tar.gz解壓後對其進行編譯,過程也很簡單,make就能夠了。編譯完成以後可使用自帶的runtest進行測試,看是否編譯成功。而後就是安裝了,執行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX參數指定的就是安裝路徑。如今安裝的只有可執行文件,尚未配置文件。其實在源碼目錄中有一個模板redis.conf,咱們對它進行改動就能夠了。
其餘配置咱們不關心,可是官方文檔中說Keyspace notifications功能默認是關閉的(默認地,Keyspace 時間通知功能是禁用的,由於它或多或少會使用一些CPU的資源),咱們須要打開它。打開的方法也很簡單,配置屬性:notify-keyspace-events

默認配置是這樣的:notify-keyspace-events ""
根據文檔中的說明:
[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. K     Keyspace events, published with __keyspace@<db>__ prefix.  
  2. E     Keyevent events, published with __keyevent@<db>__ prefix.  
  3. g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...  
  4. $     String commands  
  5. l     List commands  
  6. s     Set commands  
  7. h     Hash commands  
  8. z     Sorted set commands  
  9. x     Expired events (events generated every time a key expires)  
  10. e     Evicted events (events generated when a key is evicted for maxmemory)  
  11. A     Alias for g$lshzxe, so that the "AKE" string means all the events.  
咱們配置爲: notify-keyspace-events Ex,含義爲:發佈key事件,使用過時事件(當每個key失效時,都會生成該事件)。

2.準備客戶端和鏈接配置

本文中使用的客戶端是Jedis,版本爲2.4.2,爲了代碼的通用性,我使用Spring來管理鏈接:

[html]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. <bean id="pool" class="redis.clients.jedis.JedisPool">  
  2.         <constructor-arg>  
  3.             <bean id="config" class="redis.clients.jedis.JedisPoolConfig">  
  4.                 <property name="maxIdle" value="0" />  
  5.                 <property name="maxTotal" value="20" />  
  6.                 <property name="maxWaitMillis" value="1000" />  
  7.                 <property name="testOnBorrow" value="true" />  
  8.         </bean>  
  9.         </constructor-arg>  
  10.         <constructor-arg>  
  11.             <value>192.168.1.100</value>  
  12.         </constructor-arg>  
  13.         <constructor-arg>  
  14.         <value>6379</value>  
  15.     </constructor-arg>  
  16. </bean>  
而後使用Spring Test和Junit來測試代碼

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. @RunWith(SpringJUnit4ClassRunner.class)  
  2. @ContextConfiguration("/applicationContext*.xml")  
  3. public class RedisSubscribeDemo {  
  4.       
  5.     private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);  
  6.       
  7.     @Resource   
  8.     private JedisPool pool;  
  9.       
  10.     @Test  
  11.     public void doTest() throws InterruptedException {  
  12.         for (int i = 0; i < 1; i++) {  
  13.             TestThread thread= new TestThread(pool);  
  14.             thread.start();  
  15.         }  
  16.         Thread.sleep(50000L);  
  17.         Log.info("Test finish...");  
  18.     }  
  19. }  

因爲要使用必定的延遲,咱們把主要測試代碼放到了TestThread中。當測試線程啓動後,主線程停滯50秒,讓咱們有足夠的時間來操做。

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. public class TestThread extends Thread {  
  2.       
  3.     private Log log= LogFactory.getLog(TestThread.class);  
  4.       
  5.     private JedisPool pool;  
  6.       
  7.     public TestThread(JedisPool pool){  
  8.         log.info("loading test thread");  
  9.         this.pool= pool;  
  10.     }  
  11.   
  12.     @Override  
  13.     public void run() {  
  14.         Jedis jedis= pool.getResource();  
  15.         jedis.psubscribe(new MySubscribe(), "*");  
  16.         try {  
  17.             Thread.sleep(10000L);  
  18.         } catch (InterruptedException e) {  
  19.             log.info("延時失敗", e);  
  20.         }  
  21.         jedis.close();  
  22.         log.info("Test run finished");  
  23.     }  
  24. }  
在測試線程中,咱們將自定義的MySubscribe加入到了Jedis的模板訂閱(即psubscribe,由於模板訂閱的channel是支持星號'*'通配的,這樣能夠收集到多個通配通道的消息,而與之相反的還有一個subscribe,此訂閱只能指定嚴格匹配的通道)中,一樣爲了測試過程可以將結果顯示出來,在綁定了訂閱後,對該線程進行了延時10秒。

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. public class MySubscribe extends JedisPubSub {  
  2.       
  3.     private static final Log log= LogFactory.getLog(MySubscribe.class);  
  4.   
  5.     // 初始化按表達式的方式訂閱時候的處理    
  6.     public void onPSubscribe(String pattern, int subscribedChannels) {    
  7.             log.info(pattern + "=" + subscribedChannels);  
  8.     }  
  9.       
  10.     // 取得按表達式的方式訂閱的消息後的處理    
  11.     public void onPMessage(String pattern, String channel, String message) {    
  12.             log.info(pattern + "=" + channel + "=" + message);  
  13.     }  
  14.   
  15.     ...其餘未用到的重寫方法忽略  
  16. }  
做爲Jedis自定義訂閱,必須繼承redis.clients.jedis.JedisPubSub類,在 psubscribe模式下,重點重寫onPMessage方法,該方法爲接收到模板訂閱後處理事件的重要代碼。pattern爲在綁定訂閱時使用的通配模板,channel爲通配後符合條件的實際通道名稱,message就不用多說了,就是事件消息內容。

3.實戰

經過Redis自帶的redis-cli命令,咱們能夠在服務端經過命令行的方式直接操做。咱們運行上面的示例代碼,而後迅速切換到redis-cli命令中,創建一個生命週期很短暫的數據:
[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. 127.0.0.1:6379> set chaijunkun 123 PX 100  
PX參數指定生命週期單位爲毫秒,100即聲明週期,即100毫秒。key爲chaijunkun的數據,其值爲123。
當執行語句後,回顯:
[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. OK  

這時咱們看實例程序的輸出:
[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. *=__keyevent@0__:expired=chaijunkun  
從輸出能夠看出,以前指定的通配符爲*,通配任何通道;以後是實際的通道名稱:__keyevent@0__:expired,這裏咱們能夠看到訂閱收到了一個keyevent位於數據庫0,事件類型爲:expired,是一個過時事件;最後是chaijunkun,這個是過時數據的key。
在官方文檔中,keyevent通道的格式永遠是這樣的:
__keyevent@<db>__:prefix
對於數據過時事件,咱們在綁定訂閱時通配模板也能夠精確地寫成:
__keyevent@*__:expired
經過示例代碼,咱們能夠看到確實印證了以前的構想,實現了數據過時的事件觸發(event)或者說回調(callback)。

4.其餘應用

以前的代碼中,對於事件的發佈都是由Redis本身生成的。實際上在命令中主動發佈自定義消息也是能夠的,在publish命令的幫助中咱們看到:
[plain]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. 127.0.0.1:6379> help publish  
  2.   
  3.   PUBLISH channel message  
  4.   summary: Post a message to a channel  
  5.   since: 2.0.0  
  6.   group: pubsub  

經過參數,能夠自定義通道名稱和通道消息。而在Jedis中,發佈API甚至作到了字節數據的級別:
jedis.publish(byte[] channel, byte[] message)
所以咱們能夠構想,自定一套通信協議,channel爲命令字,message爲消息體,咱們能夠經過redis這種簡單的發佈/訂閱機制實現消息的分發。
相關文章
相關標籤/搜索