Redis應用----消息傳遞

一、摘要

  消息傳遞這一應用普遍存在於各個網站中,這個功能也是一個網站必不可少的。常見的消息傳遞應用有,新浪微博中的@我呀、給你評論而後的提示呀、贊贊贊提示、私信呀、甚至是發微博分享的新鮮事;知乎中的私信呀、live發送過來的消息、知乎團隊消息呀等等。php

 

二、實現方法

  消息傳遞即兩個或者多個客戶端在相互發送和接收消息。html

  一般有兩種方法實現:redis

  第一種爲消息推送。Redis內置有這種機制,publish往頻道推送消息、subscribe訂閱頻道。這種方法有一個缺點就是必須保證接收者時刻在線(便是此時程序不能停下來,一直保持監控狀態,倘若斷線後就會出現客戶端丟失信息)json

  第二種爲消息拉取。所謂消息拉取,就是客戶端自主去獲取存儲在服務器中的數據。Redis內部沒有實現消息拉取這種機制。所以咱們須要本身手動編寫代碼去實現這個功能。數組

  在這裏咱們,咱們進一步將消息傳遞再細分爲一對一的消息傳遞,多對多的消息傳遞(羣組消息傳遞)。服務器

【注:兩個類的代碼相對較多,所以將其摺疊起來了】併發

 

三、一對一消息傳遞

  例子1:一對一消息發送與獲取分佈式

  模塊要求:ide

  一、提示有多少個聯繫人發來新消息函數

  二、信息包含發送人、時間、信息內容

  三、可以獲取以前的舊消息

  四、而且消息可以保持7天,過時將會被動觸發刪除

  Redis實現思路:

  一、新消息與舊消息分別採用兩個鏈表來存儲

  二、原始消息的結構採用數組的形式存放,而且含有發送人、時間戳、信息內容

  三、在推入redis的鏈表前,須要將數據轉換爲json類型而後再進行存儲

  四、在取出新信息時應該使用rpoplpush來實現,將已讀的新消息推入舊消息鏈表中

  五、取出舊消息時,應該用舊消息的時間與如今的時間進行對比,若超時,則直接刪除後面的所有數據(由於數據是按時間一個一個壓進鏈表中的,因此對於時間是有序排列的)

數據存儲結構圖:

 

PHP的實現代碼:

#SinglePullMessage.class.php

  1 <?php
  2 #單接接收者接收消息
  3 class SinglePullMessage
  4 {
  5     private $redis='';  #存儲redis對象
  6     /**
  7     * @desc 構造函數
  8     * 
  9     * @param $host string | redis主機
 10     * @param $port int    | 端口
 11     */
 12     public function __construct($host,$port=6379)
 13     {
 14         $this->redis=new Redis();
 15         $this->redis->connect($host,$port);
 16     } 
 17 
 18     /**
 19     * @desc 發送消息(一我的)
 20     * 
 21     * @param $toUser   string    | 接收人
 22     * @param $messageArr array   | 發送的消息數組,包含sender、message、time 
 23     *
 24     * @return bool
 25     */
 26     public function sendSingle($toUser,$messageArr)
 27     {
 28         $json_message=json_encode($messageArr);    #編碼成json數據
 29         return $this->redis->lpush($toUser,$json_message);      #將數據推入鏈表 
 30     }
 31 
 32     /**
 33     * @desc 用戶獲取新消息
 34     *
 35     * @param $user string | 用戶名
 36     *
 37     * @return array 返回數組,包含多少個用戶發來新消息,以及具體消息
 38     */
 39     public function getNewMessage($user)
 40     {
 41         #接收新信息數據,而且將數據推入舊信息數據鏈表中,而且在原鏈表中刪除
 42         $messageArr=array();
 43         while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user))
 44         {
 45             $temp=json_decode($json_message);   #將json數據變成對象
 46             $messageArr[$temp->sender][]=$temp;        #轉換成數組信息
 47         }
 48         if($messageArr)
 49         {
 50             $arr['count']=count($messageArr);   #統計有多少個用戶發來信息
 51             $arr['messageArr']=$messageArr;
 52             return $arr;
 53         }
 54         return false;
 55     }
 56 
 57     public function getPreMessage($user)
 58     {
 59         ##取出舊消息
 60         $messageArr=array();
 61         $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1);    #一次性將所有舊消息取出來
 62         foreach ($json_pre as $k => $v) 
 63         {
 64             $temp=json_decode($v);            #json反編碼
 65             $timeout=$temp->time+60*60*24*7;  #數據過時時間  七天過時
 66             if($timeout<time())               #判斷數據是否過時
 67             {
 68                 if($k==0)                     #如果最遲插入的數據都過時了,則將全部數據刪除
 69                 {
 70                     $this->redis->del('preMessage_'.$user);
 71                     break;
 72                 }
 73                 $this->redis->ltrim('preMessage_'.$user, 0, $k);  #若檢測出有過時的,則將比它以前插入的全部數據刪除
 74                 break;
 75             }
 76             $messageArr[$temp->sender][]=$temp;
 77         }
 78         return $messageArr;
 79     }
 80 
 81     /**
 82     * @desc 消息處理,沒什麼特別的做用。在這裏這是用來處理數組信息,而後將其輸出。 
 83     *
 84     * @param $arr array | 須要處理的信息數組
 85     *
 86     * @return 返回打印輸出
 87     */
 88     public function dealArr($arr)
 89     {
 90         foreach ($arr as $k => $v) 
 91         {
 92             foreach ($v as $k1 => $v2) 
 93             {
 94                 echo '發送人:'.$v2->sender.'    發送時間:'.date('Y-m-d h:i:s',$v2->time).'<br/>';
 95                 echo '消息內容:'.$v2->message.'<br/>';
 96             }
 97             echo "<hr/>";
 98         }
 99     }
100 
101 
102 }
View Code

 

 測試:

  一、發送消息

  #創建test1.php

1 include './SinglePullMessage.class.php';
2 $object=new SinglePullMessage('192.168.95.11');
3 #發送消息
4 $sender='boss';     #發送者
5 $to='jane';         #接收者
6 $message='How are you';    #信息
7 $time=time();
8 $arr=array('sender'=>$sender,'message'=>$message,'time'=>$time);
9 echo $object->sendSingle($to,$arr);

 

  二、獲取新消息

  #創建test2.php

 1 include './SinglePullMessage.class.php';
 2 $object=new SinglePullMessage('192.168.95.11');
 3 #獲取新消息
 4 $arr=$object->getNewMessage('jane');
 5 if($arr)
 6 {
 7     echo $arr['count']."個聯繫人發來新消息<br/><hr/>";
 8     $object->dealArr($arr['messageArr']);   
 9 }
10 else
11     echo "無新消息";

 

   訪問結果:

  三、獲取舊消息

  #創建test3.php

 1 include './SinglePullMessage.class.php';
 2 $object=new SinglePullMessage('192.168.95.11');
 3 #獲取舊消息
 4 $arr=$object->getPreMessage('jane');
 5 if($arr)
 6 {
 7     $object->dealArr($arr);
 8 }
 9 else
10     echo "無舊數據";

 

 

 

四、多對多消息傳遞

  例子2:多對多消息發送與獲取(便是羣組)

  模塊要求:

  一、用戶可以自行建立羣組,併成爲羣主

  二、羣主能夠拉人進來做爲羣組成員、而且能夠踢人

  三、用戶能夠直接退出羣組

  四、能夠發送消息,每一位成員均可以拉取消息

  五、羣組的消息最大容納量爲5000條

  六、成員能夠拉取新消息,並提示有多少新消息

  七、成員能夠分頁獲取以前已讀的舊消息

  。。。。。功能就寫這幾個吧,有須要或者想練習的同窗們能夠增長其餘功能,例如禁言、匿名消息發送、文件發送等等。

  Redis實現思路:

   一、羣組的消息以及羣組的成員組成採用有序集合進行存儲。羣組消息有序集合的member存儲用戶發送的json數據消息,score存儲惟一值,將採用原子操做incr獲取string中的自增加值進行存儲;羣組成員有序集合的member存儲user,score存儲非零數字(在這裏這個score意義不大,個人例子代碼中使用數字1爲羣主的score,其餘的存儲爲2。固然這使用這個數據還能夠擴展別的功能,例如羣組中成員等級)可參考下面數據存儲結構簡圖。

  二、用戶所加入的羣組也是採用有序集合進行存儲。其中,member存儲羣組ID,score存儲用戶已經獲取該羣組的最大消息分值(對應羣組消息的score值)

  三、用戶建立羣組的時候,經過原子操做incr從而獲取一個惟一ID

  四、用戶在羣中發送消息時,也是經過原子操做incr獲取一個惟一自增加有序ID

  五、在執行incr時,爲防止併發致使競爭關係,所以須要進行加鎖操做【redis詳細鎖的講解能夠參考:Redis構建分佈式鎖http://www.cnblogs.com/phpstudy2015-6/p/6575775.html

  六、建立羣組方法簡要思路,任何一個用戶均可以建立羣組聊天,在建立的同時,能夠選擇時是否添加羣組成員(參數經過數組的形式)。建立過程將會爲這個羣組創建一個羣組成員有序集合(羣組信息有序集合暫時不建立),接着將羣主添加進去,再將羣ID添加用戶所參加的羣組有序集合中。

數據存儲結構圖:

 

PHP的代碼實現:

#ManyPullMessage.class.php

  1 <?php
  2 class ManyPullMessage
  3 {
  4     private $redis='';  #存儲redis對象
  5     /**
  6     * @desc 構造函數
  7     * 
  8     * @param $host string | redis主機
  9     * @param $port int    | 端口
 10     */
 11     public function __construct($host,$port=6379)
 12     {
 13         $this->redis=new Redis();
 14         $this->redis->connect($host,$port);
 15     } 
 16 
 17     /**
 18     * @desc 用於建立羣組的方法,在建立的同時還能夠拉人進羣組
 19     * 
 20     * @param $user   string   | 用戶名,建立羣組的主人
 21     * @param $addUser array   | 其餘用戶構成的數組
 22     *
 23     * @param $lockName string | 鎖的名字,用於獲取羣組ID的時候用
 24     * @return int 返回羣組ID
 25     */
 26     public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock')
 27     {
 28         $identifier=$this->getLock($lockName);  #獲取鎖
 29         if($identifier)
 30         {
 31             $id=$this->redis->incr('groupChatID');       #獲取羣組ID
 32             $this->releaseLock($lockName,$identifier);   #釋放鎖
 33         }
 34         else
 35             return false;
 36         $messageCount=$this->redis->set('countMessage_'.$id, 0);  #初始化這個羣組消息計數器
 37         #開啓非事務型流水線,一次性將全部redis命令傳給redis,減小與redis的鏈接
 38         $pipe=$this->redis->pipeline();   
 39         $this->redis->zadd('groupChat_'.$id, 1, $user);  #建立羣組成員有序集合,並添加羣主
 40         #將這個羣組添加到user所參加的羣組有序集合中
 41         $this->redis->zadd('hasGroupChat_'.$user, 0, $id);  
 42         foreach ($addUser as $v)    #建立羣組的同時須要添加的用戶成員
 43         {
 44             $this->redis->zadd('groupChat_'.$id, 2, $v);
 45             $this->redis->zadd('hasGroupChat_'.$v, 0, $id);
 46         }
 47         $pipe->exec();
 48         return $id;    #返回羣組ID
 49     }
 50 
 51     /**
 52     * @desc 羣主主動拉人進羣
 53     *
 54     * @param $user       string | 羣主名
 55     * @param $groupChatID   int | 羣組ID
 56     * @param $addMembers array  | 須要拉進羣的用戶
 57     *
 58     * @return bool
 59     */
 60     public function addMembers($user, $groupChatID, $addMembers=array())
 61     {
 62         $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user);  #將groupChatName的羣主取出來
 63         if($groupMasterScore==1)     #判斷user是不是羣主
 64         {
 65             $pipe=$this->redis->pipeline(); #開啓非事務流水線
 66             foreach ($addMembers as $v) 
 67             {
 68                 $this->redis->zadd('groupChat_'.$groupChatID, 2, $v);                 #添加進羣
 69                 $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #添加羣名到用戶的有序集合中
 70             }
 71             $pipe->exec();
 72             return true;
 73         }
 74         return false;
 75     }
 76 
 77     /**
 78     * @desc 羣主刪除成員
 79     *
 80     * @param $user       string | 羣主名
 81     * @param $groupChatID   int | 羣組ID
 82     * @param $delMembers  array | 須要刪除的成員名字
 83     *
 84     * @return bool
 85     */
 86     public function delMembers($user, $groupChatID, $delMembers=array())
 87     {
 88         $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); 
 89         if($groupMasterScore==1)     #判斷user是不是羣主
 90         {
 91             $pipe=$this->redis->pipeline(); #開啓非事務流水線
 92             foreach ($delMembers as $v) 
 93             {
 94                 $this->redis->zrem('groupChat_'.$groupChatID, $v);                 
 95                 $this->redis->zrem('hasGroupChat_'.$v, $groupChatID); 
 96             }
 97             $pipe->exec();
 98             return true;
 99         }
100         return false;
101     }
102 
103     /**
104     * @desc 退出羣組
105     *
106     * @param $user string     | 用戶名
107     * @param $groupChatID int | 羣組名
108     */
109     public function quitGroupChat($user, $groupChatID)
110     {
111         $this->redis->zrem('groupChat_'.$groupChatID, $user);
112         $this->redis->zrem('hasGroupChat_'.$user, $groupChatID);
113         return true;
114     }
115 
116     /**
117     * @desc 發送消息
118     *
119     * @param $user string        | 用戶名
120     * @param $groupChatID int    | 羣組ID
121     * @param $messageArr array   | 包含發送消息的數組
122     * @param $preLockName string | 羣消息鎖前綴,羣消息鎖全名爲countLock_羣ID
123     *
124     * @return bool
125     */
126     public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_')
127     {
128         $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成員score
129         if($memberScore)
130         {
131             $identifier=$this->getLock($preLockName.$groupChatID);  #獲取鎖
132             if($identifier)     #判斷獲取鎖是否成功
133             {
134                 $messageCount=$this->redis->incr('countMessage_'.$groupChatID);
135                 $this->releaseLock($preLockName.$groupChatID,$identifier);  #釋放鎖
136             }
137             else
138                 return false;
139             $json_message=json_encode($messageArr);
140             $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message);
141             $count=$this->redis->zcard('groupChatMessage_'.$groupChatID);   #查看信息量大小
142             if($count>5000) #判斷數據量有沒有達到5000條
143             {   #數據量超5000,則須要清除舊數據
144                 $start=5000-$count;
145                 $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count);
146             }
147             return true;
148         }
149         return false;
150     }
151 
152     /**
153     * @desc 獲取新信息
154     *
155     * @param $user string | 用戶名
156     *
157     * @return 成功則放回json數據數組,無新信息返回false
158     */
159     public function getNewMessage($user)
160     {
161         $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores');    #獲取用戶擁有的羣組ID
162         $json_message=array();  #初始化
163         foreach ($arrID as $k => $v)    #遍歷循環全部羣組,查看是否有新消息
164         {
165             $messageCount=$this->redis->get('countMessage_'.$k);    #羣組最大信息分值數
166             if($messageCount>$v)    #判斷用戶是否存在未讀新消息
167             {
168                 $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount);
169                 $json_message[$k]['count']=count($json_message[$k]['message']);  #統計新消息數量
170                 $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k);    #更新已獲取消息
171             }   
172         }
173         if($json_message)
174             return $json_message;
175         return false;
176     }
177 
178     /**
179     * @desc 分頁獲取羣組信息
180     *
181     * @param $user    string  | 用戶名 
182     * @param $groupChatID int | 羣組ID
183     * @param $page        int | 第幾頁
184     * @param $size        int | 每頁多少條數據
185     *
186     * @return 成功返回json數據,失敗返回false
187     */
188     public function getPartMessage($user, $groupChatID, $page=1, $size=10)
189     {
190         $start=$page*$size-$size;   #開始截取數據位置
191         $stop=$page*$size-1;        #結束截取數據位置
192         $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop);
193         if($json_message)
194             return $json_message;
195         return false;
196     }
197 
198 
199     /**
200     * @desc 加鎖方法
201     *
202     * @param $lockName string | 鎖的名字
203     * @param $timeout int | 鎖的過時時間
204     *
205     * @return 成功返回identifier/失敗返回false
206     */
207     public function getLock($lockName, $timeout=2)
208     {
209         $identifier=uniqid();       #獲取惟一標識符
210         $timeout=ceil($timeout);    #確保是整數
211         $end=time()+$timeout;
212         while(time()<$end)          #循環獲取鎖
213         {
214             /*
215             #這裏的set操做能夠等同於下面那個if操做,而且能夠減小一次與redis通信
216             if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout)))
217                 return $identifier;
218             */
219             if($this->redis->setnx($lockName, $identifier))    #查看$lockName是否被上鎖
220             {
221                 $this->redis->expire($lockName, $timeout);     #爲$lockName設置過時時間
222                 return $identifier;                             #返回一維標識符
223             }
224             elseif ($this->redis->ttl($lockName)===-1) 
225             {
226                 $this->redis->expire($lockName, $timeout);     #檢測是否有設置過時時間,沒有則加上
227             }
228             usleep(0.001);         #中止0.001ms
229         }
230         return false;
231     }
232 
233     /**
234     * @desc 釋放鎖
235     *
236     * @param $lockName string   | 鎖名
237     * @param $identifier string | 鎖的惟一值
238     *
239     * @param bool
240     */
241     public function releaseLock($lockName,$identifier)
242     {
243         if($this->redis->get($lockName)==$identifier)   #判斷是鎖有沒有被其餘客戶端修改
244         { 
245             $this->redis->multi();
246             $this->redis->del($lockName);   #釋放鎖
247             $this->redis->exec();
248             return true;
249         }
250         else
251         {
252             return false;   #其餘客戶端修改了鎖,不能刪除別人的鎖
253         }
254     }
255 
256 
257 }
258 
259 ?>
View Code

 

測試:

  一、創建createGroupChat.php(測試建立羣組功能)

  執行代碼並建立56八、569羣組(羣主爲jack)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 #建立羣組
4 $user='jack';
5 $arr=array('jane1','jane2');
6 $a=$object->createGroupChat($user,$arr);
7 echo "<pre>";
8 print_r($a);
9 echo "</pre>";die;

 

 

 

  二、創建addMembers.php(測試添加成員功能)

  執行代碼並添加新成員

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 $b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4'));
4 echo "<pre>";
5 print_r($b);
6 echo "</pre>";die;

  三、創建delete.php(測試羣主刪除成員功能)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 #羣主刪除成員
4 $c=$object->delMembers('jack', '568', array('jane1','jane4'));
5 echo "<pre>";
6 print_r($c);
7 echo "</pre>";die;

  四、創建sendMessage.php(測試發送消息功能)

  多執行幾遍,56八、569都發幾條

 1 include './ManyPullMessage.class.php';
 2 $object=new ManyPullMessage('192.168.95.11');
 3 #發送消息
 4 $user='jane2';
 5 $message='go go go';
 6 $groupChatID=568;
 7 $arr=array('sender'=>$user, 'message'=>$message, 'time'=>time());
 8 $d=$object->sendMessage($user,$groupChatID,$arr);
 9 echo "<pre>";
10 print_r($d);
11 echo "</pre>";die;

 

  五、創建getNewMessage.php(測試用戶獲取新消息功能)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 #用戶獲取新消息
4 $e=$object->getNewMessage('jane2');
5 echo "<pre>";
6 print_r($e);
7 echo "</pre>";die;

 

  六、創建getPartMessage.php(測試用戶獲取某個羣組部分消息)

  (多發送幾條消息,用於測試。568中共18條數據)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 #用戶獲取某個羣組部分消息
4 $f=$object->getPartMessage('jane2', 568, 1, 10); 
5 echo "<pre>";
6 print_r($f);
7 echo "</pre>";die;

 

page=1,size=10

page=2,size=10

測試完畢,還須要別的功能能夠本身進行修改添加測試。

此次整理這篇文章相對比較趕,內心已經想着快點整理完趕忙學習其餘的技術啦,哈哈22333。各位大神請留步,懇請各位給點學習redis的指導意見,本人職業方向是PHP

 

(以上是本身的一些看法,如有不足或者錯誤的地方請各位指出)

做者:那一葉隨風

 聲明:本博客文章爲原創,只表明本人在工做學習中某一時間內總結的觀點或結論。轉載時請在文章頁面明顯位置給出原文連接

相關文章
相關標籤/搜索