Redis訂閱廣播實現多級緩存

Redis應用場景不少,如今介紹一下它的幾大特性之一   發佈訂閱(pub/sub) redis

特性介紹: 設計模式

  什麼是redis的發佈訂閱(pub/sub)?   Pub/Sub功能(means Publish, Subscribe)即發佈及訂閱功能。基於事件的系統中,Pub/Sub是目前普遍使用的通訊模型,它採用事件做爲基本的通訊機制,提供大規模系統所要求的鬆散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發佈者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。熟悉設計模式的朋友應該瞭解這與23種設計模式中的觀察者模式極爲類似。  api

     一樣,Redis的pub/sub是一種消息通訊模式,主要的目的是解除消息發佈者和消息訂閱者之間的耦合,  Redis做爲一個pub/sub的server, 在訂閱者和發佈者之間起到了消息路由的功能。 服務器

  上面的都是概念,不知道不要緊,其實我也看不懂。 ide

簡單來說,這裏面還有個channel的概念,這裏就是頻道的意思,好比你訂閱了銀行的頻道,當你的資金髮生變更時,銀行就會經過它的頻道給你發送信息,在這裏,你是屬於被動接收的,而不是向銀行索要信息,這個例子中,你就是sub(訂閱者),而銀行就是pub(發佈者)。 測試

 

代碼: ui

先引入依賴 this

  1. <dependency>  
  2.     <groupId>redis.clients</groupId>  
  3.     <artifactId>jedis</artifactId>  
  4.     <version>2.9.0</version>  
  5. </dependency>  

新建一個發佈者 spa

  1. public class Publisher extends Thread{  
  2.     private final JedisPool jedisPool;  
  3.     
  4.     public Publisher(JedisPool jedisPool) {  
  5.         this.jedisPool = jedisPool;  
  6.     }  
  7.     
  8.     @Override  
  9.     public void run() {  
  10.         BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));  
  11.         Jedis jedis = jedisPool.getResource();   //鏈接池中取出一個鏈接  
  12.         while (true) {  
  13.             String line = null;  
  14.             try {  
  15.                 line = reader.readLine();  
  16.                 if (!"quit".equals(line)) {  
  17.                     jedis.publish("mychannel", line);   // mychannel 的頻道上推送消息  
  18.                 } else {  
  19.                     break;  
  20.                 }  
  21.             } catch (IOException e) {  
  22.                 e.printStackTrace();  
  23.             }  
  24.         }  
  25.     }  
  26. }  

新建一個訂閱者 設計

  1. public class Subscriber extends JedisPubSub {  
  2.     
  3.     public Subscriber(){}  
  4.     @Override  
  5.     public void onMessage(String channel, String message) {       //收到消息會調用  
  6.         System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));  
  7.     }  
  8.     @Override  
  9.     public void onSubscribe(String channel, int subscribedChannels) {    //訂閱了頻道會調用  
  10.         System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",  
  11.                 channel, subscribedChannels));  
  12.     }  
  13.     @Override  
  14.     public void onUnsubscribe(String channel, int subscribedChannels) {   //取消訂閱 會調用  
  15.         System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",  
  16.                 channel, subscribedChannels));  
  17.     
  18.     }  
  19. }

訂閱頻道

  1. public class SubThread extends Thread {  
  2.     private final JedisPool jedisPool;  
  3.     private final Subscriber subscriber = new Subscriber();  
  4.     
  5.     private final String channel = "mychannel";  
  6.     
  7.     public SubThread(JedisPool jedisPool) {  
  8.         super("SubThread");  
  9.         this.jedisPool = jedisPool;  
  10.     }  
  11.     
  12.     @Override  
  13.     public void run() {  
  14.         System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));  
  15.         Jedis jedis = null;  
  16.         try {  
  17.             jedis = jedisPool.getResource();   //取出一個鏈接  
  18.             jedis.subscribe(subscriber, channel);    //經過subscribe api去訂閱,入參是訂閱者和頻道名  
  19.         } catch (Exception e) {  
  20.             System.out.println(String.format("subsrcibe channel error, %s", e));  
  21.         } finally {  
  22.             if (jedis != null) {  
  23.                 jedis.close();  
  24.             }  
  25.         }  
  26.     }  
  27. }  
  28.    

測試下

  1. public class PubSubDemo {  
  2.     public static void main( String[] args )  
  3.     {  
  4.         // 鏈接redis服務端  
  5.         JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "172.31.12.18"7379);  
  6.     
  7.         System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d""127.0.0.1"7379));  
  8.     
  9.         SubThread subThread = new SubThread(jedisPool);  //訂閱者  
  10.         subThread.start();  
  11.     
  12.         Publisher publisher = new Publisher(jedisPool);    //發佈者  
  13.         publisher.start();  
  14.     }  
  15. }  

 

打印信息

相關文章
相關標籤/搜索