本文出處:http://blog.csdn.net/chaijunkun/article/details/27361453,轉載請註明。因爲本人不按期會整理相關博文,會對相應內容做出完善。所以強烈建議在原始出處查看此文。 html
最近在作一套系統,其中要求若干個Worker服務器將心跳信息都上報給中央服務器。當必定時間中央服務器沒有獲得心跳信息時則認爲該Worker失效了,發出告警。 java
知足這種需求的解決方法多種多樣,我開始想到了memcache,上報一次心跳信息就刷新一次緩存,當緩存心裏跳信息對象超時被刪除,即認爲對應的Worker失效。然而因爲memcache的工做原理,刪除都是被動的,咱們沒法及時判斷數據是否過時,即使知道了數據過時,也沒有一種機制來回調方法來執行自定義的處理動做。難道緩存架構就真的不行了嗎? redis
答案是否認的。在Redis 2.8.0版本起,加入了「Keyspace notifications」(即「鍵空間通知」)的功能。如何理解該功能呢?咱們來看下官方是怎麼說的: 數據庫
鍵空間通知,容許Redis客戶端從「發佈/訂閱」通道中創建訂閱關係,以便客戶端可以在Redis中的數據因某種方式受到影響時收到相應事件。
緩存
可能接收到的事件舉例以下:
服務器
影響一個給出的鍵的全部命令(會告訴你哪一個鍵被執行了一個命令,這個命令是什麼);
架構
接收到了一個LPUSH操做的全部鍵(LPUSH命令:key v1 [v2 v3..]將指定的全部值從左到右進行壓棧操做,造成一個棧,並將該棧命名爲指定的key);
app
在數據庫0中失效的全部鍵(不必定非得是數據庫0,這裏這樣表述其實想表達能夠知道影響的哪一個數據庫)。 ide
看到這裏我聯想到,若是一條緩存數據失效了,經過訂閱關係,客戶端會收到消息,經過分析消息能夠得知何種消息,分析消息內容能夠知道是哪一個key失效了。這樣就能夠間接實現開頭所描述的功能。 測試
接下來咱們來看下實驗的步驟:
1.準備redis服務器
做爲開源軟件,redis下載後獲得的是源代碼,使用tar -xzvf redis-2.8.9.tar.gz解壓後對其進行編譯,過程也很簡單,make就能夠了。編譯完成以後可使用自帶的runtest進行測試,看是否編譯成功。而後就是安裝了,執行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX參數指定的就是安裝路徑。如今安裝的只有可執行文件,尚未配置文件。其實在源碼目錄中有一個模板redis.conf,咱們對它進行改動就能夠了。
其餘配置咱們不關心,可是官方文檔中說Keyspace notifications功能默認是關閉的(默認地,Keyspace 時間通知功能是禁用的,由於它或多或少會使用一些CPU的資源),咱們須要打開它。打開的方法也很簡單,配置屬性:notify-keyspace-events
默認配置是這樣的:notify-keyspace-events ""
根據文檔中的說明:
- K Keyspace events, published with __keyspace@<db>__ prefix.
- E Keyevent events, published with __keyevent@<db>__ prefix.
- g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
- $ String commands
- l List commands
- s Set commands
- h Hash commands
- z Sorted set commands
- x Expired events (events generated every time a key expires)
- e Evicted events (events generated when a key is evicted for maxmemory)
- A Alias for g$lshzxe, so that the "AKE" string means all the events.
咱們配置爲:
notify-keyspace-events Ex,含義爲:發佈key事件,使用過時事件(當每個key失效時,都會生成該事件)。
2.準備客戶端和鏈接配置
本文中使用的客戶端是Jedis,版本爲2.4.2,爲了代碼的通用性,我使用Spring來管理鏈接:
- <bean id="pool" class="redis.clients.jedis.JedisPool">
- <constructor-arg>
- <bean id="config" class="redis.clients.jedis.JedisPoolConfig">
- <property name="maxIdle" value="0" />
- <property name="maxTotal" value="20" />
- <property name="maxWaitMillis" value="1000" />
- <property name="testOnBorrow" value="true" />
- </bean>
- </constructor-arg>
- <constructor-arg>
- <value>192.168.1.100</value>
- </constructor-arg>
- <constructor-arg>
- <value>6379</value>
- </constructor-arg>
- </bean>
而後使用Spring Test和Junit來測試代碼
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration("/applicationContext*.xml")
- public class RedisSubscribeDemo {
-
- private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);
-
- @Resource
- private JedisPool pool;
-
- @Test
- public void doTest() throws InterruptedException {
- for (int i = 0; i < 1; i++) {
- TestThread thread= new TestThread(pool);
- thread.start();
- }
- Thread.sleep(50000L);
- Log.info("Test finish...");
- }
- }
因爲要使用必定的延遲,咱們把主要測試代碼放到了TestThread中。當測試線程啓動後,主線程停滯50秒,讓咱們有足夠的時間來操做。
- public class TestThread extends Thread {
-
- private Log log= LogFactory.getLog(TestThread.class);
-
- private JedisPool pool;
-
- public TestThread(JedisPool pool){
- log.info("loading test thread");
- this.pool= pool;
- }
-
- @Override
- public void run() {
- Jedis jedis= pool.getResource();
- jedis.psubscribe(new MySubscribe(), "*");
- try {
- Thread.sleep(10000L);
- } catch (InterruptedException e) {
- log.info("延時失敗", e);
- }
- jedis.close();
- log.info("Test run finished");
- }
- }
在測試線程中,咱們將自定義的MySubscribe加入到了Jedis的模板訂閱(即psubscribe,由於模板訂閱的channel是支持星號'*'通配的,這樣能夠收集到多個通配通道的消息,而與之相反的還有一個subscribe,此訂閱只能指定嚴格匹配的通道)中,一樣爲了測試過程可以將結果顯示出來,在綁定了訂閱後,對該線程進行了延時10秒。
- public class MySubscribe extends JedisPubSub {
-
- private static final Log log= LogFactory.getLog(MySubscribe.class);
-
- // 初始化按表達式的方式訂閱時候的處理
- public void onPSubscribe(String pattern, int subscribedChannels) {
- log.info(pattern + "=" + subscribedChannels);
- }
-
- // 取得按表達式的方式訂閱的消息後的處理
- public void onPMessage(String pattern, String channel, String message) {
- log.info(pattern + "=" + channel + "=" + message);
- }
-
- ...其餘未用到的重寫方法忽略
- }
做爲Jedis自定義訂閱,必須繼承redis.clients.jedis.JedisPubSub類,在
psubscribe模式下,重點重寫onPMessage方法,該方法爲接收到模板訂閱後處理事件的重要代碼。pattern爲在綁定訂閱時使用的通配模板,channel爲通配後符合條件的實際通道名稱,message就不用多說了,就是事件消息內容。
3.實戰
經過Redis自帶的redis-cli命令,咱們能夠在服務端經過命令行的方式直接操做。咱們運行上面的示例代碼,而後迅速切換到redis-cli命令中,創建一個生命週期很短暫的數據:
- 127.0.0.1:6379> set chaijunkun 123 PX 100
PX參數指定生命週期單位爲毫秒,100即聲明週期,即100毫秒。key爲chaijunkun的數據,其值爲123。
當執行語句後,回顯:
- *=__keyevent@0__:expired=chaijunkun
從輸出能夠看出,以前指定的通配符爲*,通配任何通道;以後是實際的通道名稱:__keyevent@0__:expired,這裏咱們能夠看到訂閱收到了一個keyevent位於數據庫0,事件類型爲:expired,是一個過時事件;最後是chaijunkun,這個是過時數據的key。
在官方文檔中,keyevent通道的格式永遠是這樣的:
__keyevent@<db>__:prefix
對於數據過時事件,咱們在綁定訂閱時通配模板也能夠精確地寫成:
__keyevent@*__:expired
經過示例代碼,咱們能夠看到確實印證了以前的構想,實現了數據過時的事件觸發(event)或者說回調(callback)。
4.其餘應用
以前的代碼中,對於事件的發佈都是由Redis本身生成的。實際上在命令中主動發佈自定義消息也是能夠的,在publish命令的幫助中咱們看到:
- 127.0.0.1:6379> help publish
-
- PUBLISH channel message
- summary: Post a message to a channel
- since: 2.0.0
- group: pubsub
經過參數,能夠自定義通道名稱和通道消息。而在Jedis中,發佈API甚至作到了字節數據的級別:
jedis.publish(byte[] channel, byte[] message)
所以咱們能夠構想,自定一套通信協議,channel爲命令字,message爲消息體,咱們能夠經過redis這種簡單的發佈/訂閱機制實現消息的分發。