nodejs redis 發佈訂閱機制封裝

   最近項目使用redis,對publish 和 subscribe的使用進行了瞭解,並進行了封裝。redis

  

 1 var config = require('../config/config');
 2 var log = require("./loghelp");
 3 var redis = require("redis");
 4 function initialclient(param) {
 5     var option={ host: config.redis.host, port: config.redis.port};
 6     if(param)
 7     {
 8         option=Object.assign(option,param);
 9     }
10     redis.print
11     let client = redis.createClient(option);
12     client.on("error", function(err) {
13         log.error(err);
14     });
15     return client;
16 }

 

 1 /*example:
 2 * let channel="ryan";
 3  redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
 4  redis.pubSub.subscribe(channel);
 5 
 6  redis.pubSub.publish(channel,"hello from chen");*/
 7 class PubSub
 8 {
 9     constructor(){
10         this.sub=initialclient();
11         this.handlers=new Map();
12         this.subAction=(channle,message)=>{
13             let actions= this.handlers.get(channle)||new Set();
14             for(let action of actions)
15             {
16                 action(message);
17             }
18         }
19         this.alredyPublishs=[];
20         this.subConnected=false;
21     }
22 
23     publish(channel,message)
24     {
25         let action=()=>{
26             let pub=initialclient();
27             pub.publish(channel,message);
28         };
29         if(this.subConnected===false)
30         {
31             this.alredyPublishs.push(action);
32         }
33         else
34             action();
35     }
36     registerHandlers(channel,action)
37     {
38        var actions=this.handlers.get(channel)||new Set();
39        actions.add(action);
40        this.handlers.set(channel,actions);
41     }
42     subscribe(channel)
43     {
44         let self=this;
45         this.sub.subscribe(channel,function (err,reply) {
46             if(err)
47                 log.error(err);
48             self.subConnected=true;
49             for(let publish of self.alredyPublishs)
50                 publish();
51             console.log(reply);
52         });
53 
54         this.sub.on("message", function (channel, message) {
55             self.subAction(channel,message);
56         });
57     }
58 
59     tearDown()
60     {
61         this.sub.quit();
62     }
63 }

而後經過exports.pubsub=new PubSub() 將其暴漏,可保證是單例。在程序啓動時,調用分佈式

registerHandlers  註冊特定通道的處理邏輯,而後調用
subscribe  訂閱通道。在合適時機調用publish,這個機制能夠實現分佈式下全部客戶端watch 同一個數據的更改。
相關文章
相關標籤/搜索