消息傳遞這一應用普遍存在於各個網站中,這個功能也是一個網站必不可少的。常見的消息傳遞應用有,新浪微博中的@我呀、給你評論而後的提示呀、贊贊贊提示、私信呀、甚至是發微博分享的新鮮事;知乎中的私信呀、live發送過來的消息、知乎團隊消息呀等等。php
消息傳遞即兩個或者多個客戶端在相互發送和接收消息。html
一般有兩種方法實現:redis
第一種爲消息推送。Redis內置有這種機制,publish往頻道推送消息、subscribe訂閱頻道。這種方法有一個缺點就是必須保證接收者時刻在線(便是此時程序不能停下來,一直保持監控狀態,倘若斷線後就會出現客戶端丟失信息)json
第二種爲消息拉取。所謂消息拉取,就是客戶端自主去獲取存儲在服務器中的數據。Redis內部沒有實現消息拉取這種機制。所以咱們須要本身手動編寫代碼去實現這個功能。數組
在這裏咱們,咱們進一步將消息傳遞再細分爲一對一的消息傳遞,多對多的消息傳遞(羣組消息傳遞)。服務器
【注:兩個類的代碼相對較多,所以將其摺疊起來了】併發
例子1:一對一消息發送與獲取分佈式
模塊要求:ide
一、提示有多少個聯繫人發來新消息函數
二、信息包含發送人、時間、信息內容
三、可以獲取以前的舊消息
四、而且消息可以保持7天,過時將會被動觸發刪除
Redis實現思路:
一、新消息與舊消息分別採用兩個鏈表來存儲
二、原始消息的結構採用數組的形式存放,而且含有發送人、時間戳、信息內容
三、在推入redis的鏈表前,須要將數據轉換爲json類型而後再進行存儲
四、在取出新信息時應該使用rpoplpush來實現,將已讀的新消息推入舊消息鏈表中
五、取出舊消息時,應該用舊消息的時間與如今的時間進行對比,若超時,則直接刪除後面的所有數據(由於數據是按時間一個一個壓進鏈表中的,因此對於時間是有序排列的)
數據存儲結構圖:
PHP的實現代碼:
#SinglePullMessage.class.php
1 <?php 2 #單接接收者接收消息 3 class SinglePullMessage 4 { 5 private $redis=''; #存儲redis對象 6 /** 7 * @desc 構造函數 8 * 9 * @param $host string | redis主機 10 * @param $port int | 端口 11 */ 12 public function __construct($host,$port=6379) 13 { 14 $this->redis=new Redis(); 15 $this->redis->connect($host,$port); 16 } 17 18 /** 19 * @desc 發送消息(一我的) 20 * 21 * @param $toUser string | 接收人 22 * @param $messageArr array | 發送的消息數組,包含sender、message、time 23 * 24 * @return bool 25 */ 26 public function sendSingle($toUser,$messageArr) 27 { 28 $json_message=json_encode($messageArr); #編碼成json數據 29 return $this->redis->lpush($toUser,$json_message); #將數據推入鏈表 30 } 31 32 /** 33 * @desc 用戶獲取新消息 34 * 35 * @param $user string | 用戶名 36 * 37 * @return array 返回數組,包含多少個用戶發來新消息,以及具體消息 38 */ 39 public function getNewMessage($user) 40 { 41 #接收新信息數據,而且將數據推入舊信息數據鏈表中,而且在原鏈表中刪除 42 $messageArr=array(); 43 while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user)) 44 { 45 $temp=json_decode($json_message); #將json數據變成對象 46 $messageArr[$temp->sender][]=$temp; #轉換成數組信息 47 } 48 if($messageArr) 49 { 50 $arr['count']=count($messageArr); #統計有多少個用戶發來信息 51 $arr['messageArr']=$messageArr; 52 return $arr; 53 } 54 return false; 55 } 56 57 public function getPreMessage($user) 58 { 59 ##取出舊消息 60 $messageArr=array(); 61 $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1); #一次性將所有舊消息取出來 62 foreach ($json_pre as $k => $v) 63 { 64 $temp=json_decode($v); #json反編碼 65 $timeout=$temp->time+60*60*24*7; #數據過時時間 七天過時 66 if($timeout<time()) #判斷數據是否過時 67 { 68 if($k==0) #如果最遲插入的數據都過時了,則將全部數據刪除 69 { 70 $this->redis->del('preMessage_'.$user); 71 break; 72 } 73 $this->redis->ltrim('preMessage_'.$user, 0, $k); #若檢測出有過時的,則將比它以前插入的全部數據刪除 74 break; 75 } 76 $messageArr[$temp->sender][]=$temp; 77 } 78 return $messageArr; 79 } 80 81 /** 82 * @desc 消息處理,沒什麼特別的做用。在這裏這是用來處理數組信息,而後將其輸出。 83 * 84 * @param $arr array | 須要處理的信息數組 85 * 86 * @return 返回打印輸出 87 */ 88 public function dealArr($arr) 89 { 90 foreach ($arr as $k => $v) 91 { 92 foreach ($v as $k1 => $v2) 93 { 94 echo '發送人:'.$v2->sender.' 發送時間:'.date('Y-m-d h:i:s',$v2->time).'<br/>'; 95 echo '消息內容:'.$v2->message.'<br/>'; 96 } 97 echo "<hr/>"; 98 } 99 } 100 101 102 }
測試:
一、發送消息
#創建test1.php
1 include './SinglePullMessage.class.php'; 2 $object=new SinglePullMessage('192.168.95.11'); 3 #發送消息 4 $sender='boss'; #發送者 5 $to='jane'; #接收者 6 $message='How are you'; #信息 7 $time=time(); 8 $arr=array('sender'=>$sender,'message'=>$message,'time'=>$time); 9 echo $object->sendSingle($to,$arr);
二、獲取新消息
#創建test2.php
1 include './SinglePullMessage.class.php'; 2 $object=new SinglePullMessage('192.168.95.11'); 3 #獲取新消息 4 $arr=$object->getNewMessage('jane'); 5 if($arr) 6 { 7 echo $arr['count']."個聯繫人發來新消息<br/><hr/>"; 8 $object->dealArr($arr['messageArr']); 9 } 10 else 11 echo "無新消息";
訪問結果:
三、獲取舊消息
#創建test3.php
1 include './SinglePullMessage.class.php'; 2 $object=new SinglePullMessage('192.168.95.11'); 3 #獲取舊消息 4 $arr=$object->getPreMessage('jane'); 5 if($arr) 6 { 7 $object->dealArr($arr); 8 } 9 else 10 echo "無舊數據";
例子2:多對多消息發送與獲取(便是羣組)
模塊要求:
一、用戶可以自行建立羣組,併成爲羣主
二、羣主能夠拉人進來做爲羣組成員、而且能夠踢人
三、用戶能夠直接退出羣組
四、能夠發送消息,每一位成員均可以拉取消息
五、羣組的消息最大容納量爲5000條
六、成員能夠拉取新消息,並提示有多少新消息
七、成員能夠分頁獲取以前已讀的舊消息
。。。。。功能就寫這幾個吧,有須要或者想練習的同窗們能夠增長其餘功能,例如禁言、匿名消息發送、文件發送等等。
Redis實現思路:
一、羣組的消息以及羣組的成員組成採用有序集合進行存儲。羣組消息有序集合的member存儲用戶發送的json數據消息,score存儲惟一值,將採用原子操做incr獲取string中的自增加值進行存儲;羣組成員有序集合的member存儲user,score存儲非零數字(在這裏這個score意義不大,個人例子代碼中使用數字1爲羣主的score,其餘的存儲爲2。固然這使用這個數據還能夠擴展別的功能,例如羣組中成員等級)可參考下面數據存儲結構簡圖。
二、用戶所加入的羣組也是採用有序集合進行存儲。其中,member存儲羣組ID,score存儲用戶已經獲取該羣組的最大消息分值(對應羣組消息的score值)
三、用戶建立羣組的時候,經過原子操做incr從而獲取一個惟一ID
四、用戶在羣中發送消息時,也是經過原子操做incr獲取一個惟一自增加有序ID
五、在執行incr時,爲防止併發致使競爭關係,所以須要進行加鎖操做【redis詳細鎖的講解能夠參考:Redis構建分佈式鎖http://www.cnblogs.com/phpstudy2015-6/p/6575775.html】
六、建立羣組方法簡要思路,任何一個用戶均可以建立羣組聊天,在建立的同時,能夠選擇時是否添加羣組成員(參數經過數組的形式)。建立過程將會爲這個羣組創建一個羣組成員有序集合(羣組信息有序集合暫時不建立),接着將羣主添加進去,再將羣ID添加用戶所參加的羣組有序集合中。
數據存儲結構圖:
PHP的代碼實現:
#ManyPullMessage.class.php
1 <?php 2 class ManyPullMessage 3 { 4 private $redis=''; #存儲redis對象 5 /** 6 * @desc 構造函數 7 * 8 * @param $host string | redis主機 9 * @param $port int | 端口 10 */ 11 public function __construct($host,$port=6379) 12 { 13 $this->redis=new Redis(); 14 $this->redis->connect($host,$port); 15 } 16 17 /** 18 * @desc 用於建立羣組的方法,在建立的同時還能夠拉人進羣組 19 * 20 * @param $user string | 用戶名,建立羣組的主人 21 * @param $addUser array | 其餘用戶構成的數組 22 * 23 * @param $lockName string | 鎖的名字,用於獲取羣組ID的時候用 24 * @return int 返回羣組ID 25 */ 26 public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock') 27 { 28 $identifier=$this->getLock($lockName); #獲取鎖 29 if($identifier) 30 { 31 $id=$this->redis->incr('groupChatID'); #獲取羣組ID 32 $this->releaseLock($lockName,$identifier); #釋放鎖 33 } 34 else 35 return false; 36 $messageCount=$this->redis->set('countMessage_'.$id, 0); #初始化這個羣組消息計數器 37 #開啓非事務型流水線,一次性將全部redis命令傳給redis,減小與redis的鏈接 38 $pipe=$this->redis->pipeline(); 39 $this->redis->zadd('groupChat_'.$id, 1, $user); #建立羣組成員有序集合,並添加羣主 40 #將這個羣組添加到user所參加的羣組有序集合中 41 $this->redis->zadd('hasGroupChat_'.$user, 0, $id); 42 foreach ($addUser as $v) #建立羣組的同時須要添加的用戶成員 43 { 44 $this->redis->zadd('groupChat_'.$id, 2, $v); 45 $this->redis->zadd('hasGroupChat_'.$v, 0, $id); 46 } 47 $pipe->exec(); 48 return $id; #返回羣組ID 49 } 50 51 /** 52 * @desc 羣主主動拉人進羣 53 * 54 * @param $user string | 羣主名 55 * @param $groupChatID int | 羣組ID 56 * @param $addMembers array | 須要拉進羣的用戶 57 * 58 * @return bool 59 */ 60 public function addMembers($user, $groupChatID, $addMembers=array()) 61 { 62 $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #將groupChatName的羣主取出來 63 if($groupMasterScore==1) #判斷user是不是羣主 64 { 65 $pipe=$this->redis->pipeline(); #開啓非事務流水線 66 foreach ($addMembers as $v) 67 { 68 $this->redis->zadd('groupChat_'.$groupChatID, 2, $v); #添加進羣 69 $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #添加羣名到用戶的有序集合中 70 } 71 $pipe->exec(); 72 return true; 73 } 74 return false; 75 } 76 77 /** 78 * @desc 羣主刪除成員 79 * 80 * @param $user string | 羣主名 81 * @param $groupChatID int | 羣組ID 82 * @param $delMembers array | 須要刪除的成員名字 83 * 84 * @return bool 85 */ 86 public function delMembers($user, $groupChatID, $delMembers=array()) 87 { 88 $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); 89 if($groupMasterScore==1) #判斷user是不是羣主 90 { 91 $pipe=$this->redis->pipeline(); #開啓非事務流水線 92 foreach ($delMembers as $v) 93 { 94 $this->redis->zrem('groupChat_'.$groupChatID, $v); 95 $this->redis->zrem('hasGroupChat_'.$v, $groupChatID); 96 } 97 $pipe->exec(); 98 return true; 99 } 100 return false; 101 } 102 103 /** 104 * @desc 退出羣組 105 * 106 * @param $user string | 用戶名 107 * @param $groupChatID int | 羣組名 108 */ 109 public function quitGroupChat($user, $groupChatID) 110 { 111 $this->redis->zrem('groupChat_'.$groupChatID, $user); 112 $this->redis->zrem('hasGroupChat_'.$user, $groupChatID); 113 return true; 114 } 115 116 /** 117 * @desc 發送消息 118 * 119 * @param $user string | 用戶名 120 * @param $groupChatID int | 羣組ID 121 * @param $messageArr array | 包含發送消息的數組 122 * @param $preLockName string | 羣消息鎖前綴,羣消息鎖全名爲countLock_羣ID 123 * 124 * @return bool 125 */ 126 public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_') 127 { 128 $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成員score 129 if($memberScore) 130 { 131 $identifier=$this->getLock($preLockName.$groupChatID); #獲取鎖 132 if($identifier) #判斷獲取鎖是否成功 133 { 134 $messageCount=$this->redis->incr('countMessage_'.$groupChatID); 135 $this->releaseLock($preLockName.$groupChatID,$identifier); #釋放鎖 136 } 137 else 138 return false; 139 $json_message=json_encode($messageArr); 140 $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message); 141 $count=$this->redis->zcard('groupChatMessage_'.$groupChatID); #查看信息量大小 142 if($count>5000) #判斷數據量有沒有達到5000條 143 { #數據量超5000,則須要清除舊數據 144 $start=5000-$count; 145 $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count); 146 } 147 return true; 148 } 149 return false; 150 } 151 152 /** 153 * @desc 獲取新信息 154 * 155 * @param $user string | 用戶名 156 * 157 * @return 成功則放回json數據數組,無新信息返回false 158 */ 159 public function getNewMessage($user) 160 { 161 $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores'); #獲取用戶擁有的羣組ID 162 $json_message=array(); #初始化 163 foreach ($arrID as $k => $v) #遍歷循環全部羣組,查看是否有新消息 164 { 165 $messageCount=$this->redis->get('countMessage_'.$k); #羣組最大信息分值數 166 if($messageCount>$v) #判斷用戶是否存在未讀新消息 167 { 168 $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount); 169 $json_message[$k]['count']=count($json_message[$k]['message']); #統計新消息數量 170 $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k); #更新已獲取消息 171 } 172 } 173 if($json_message) 174 return $json_message; 175 return false; 176 } 177 178 /** 179 * @desc 分頁獲取羣組信息 180 * 181 * @param $user string | 用戶名 182 * @param $groupChatID int | 羣組ID 183 * @param $page int | 第幾頁 184 * @param $size int | 每頁多少條數據 185 * 186 * @return 成功返回json數據,失敗返回false 187 */ 188 public function getPartMessage($user, $groupChatID, $page=1, $size=10) 189 { 190 $start=$page*$size-$size; #開始截取數據位置 191 $stop=$page*$size-1; #結束截取數據位置 192 $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop); 193 if($json_message) 194 return $json_message; 195 return false; 196 } 197 198 199 /** 200 * @desc 加鎖方法 201 * 202 * @param $lockName string | 鎖的名字 203 * @param $timeout int | 鎖的過時時間 204 * 205 * @return 成功返回identifier/失敗返回false 206 */ 207 public function getLock($lockName, $timeout=2) 208 { 209 $identifier=uniqid(); #獲取惟一標識符 210 $timeout=ceil($timeout); #確保是整數 211 $end=time()+$timeout; 212 while(time()<$end) #循環獲取鎖 213 { 214 /* 215 #這裏的set操做能夠等同於下面那個if操做,而且能夠減小一次與redis通信 216 if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout))) 217 return $identifier; 218 */ 219 if($this->redis->setnx($lockName, $identifier)) #查看$lockName是否被上鎖 220 { 221 $this->redis->expire($lockName, $timeout); #爲$lockName設置過時時間 222 return $identifier; #返回一維標識符 223 } 224 elseif ($this->redis->ttl($lockName)===-1) 225 { 226 $this->redis->expire($lockName, $timeout); #檢測是否有設置過時時間,沒有則加上 227 } 228 usleep(0.001); #中止0.001ms 229 } 230 return false; 231 } 232 233 /** 234 * @desc 釋放鎖 235 * 236 * @param $lockName string | 鎖名 237 * @param $identifier string | 鎖的惟一值 238 * 239 * @param bool 240 */ 241 public function releaseLock($lockName,$identifier) 242 { 243 if($this->redis->get($lockName)==$identifier) #判斷是鎖有沒有被其餘客戶端修改 244 { 245 $this->redis->multi(); 246 $this->redis->del($lockName); #釋放鎖 247 $this->redis->exec(); 248 return true; 249 } 250 else 251 { 252 return false; #其餘客戶端修改了鎖,不能刪除別人的鎖 253 } 254 } 255 256 257 } 258 259 ?>
測試:
一、創建createGroupChat.php(測試建立羣組功能)
執行代碼並建立56八、569羣組(羣主爲jack)
1 include './ManyPullMessage.class.php'; 2 $object=new ManyPullMessage('192.168.95.11'); 3 #建立羣組 4 $user='jack'; 5 $arr=array('jane1','jane2'); 6 $a=$object->createGroupChat($user,$arr); 7 echo "<pre>"; 8 print_r($a); 9 echo "</pre>";die;
二、創建addMembers.php(測試添加成員功能)
執行代碼並添加新成員
1 include './ManyPullMessage.class.php'; 2 $object=new ManyPullMessage('192.168.95.11'); 3 $b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4')); 4 echo "<pre>"; 5 print_r($b); 6 echo "</pre>";die;
三、創建delete.php(測試羣主刪除成員功能)
1 include './ManyPullMessage.class.php'; 2 $object=new ManyPullMessage('192.168.95.11'); 3 #羣主刪除成員 4 $c=$object->delMembers('jack', '568', array('jane1','jane4')); 5 echo "<pre>"; 6 print_r($c); 7 echo "</pre>";die;
四、創建sendMessage.php(測試發送消息功能)
多執行幾遍,56八、569都發幾條
1 include './ManyPullMessage.class.php'; 2 $object=new ManyPullMessage('192.168.95.11'); 3 #發送消息 4 $user='jane2'; 5 $message='go go go'; 6 $groupChatID=568; 7 $arr=array('sender'=>$user, 'message'=>$message, 'time'=>time()); 8 $d=$object->sendMessage($user,$groupChatID,$arr); 9 echo "<pre>"; 10 print_r($d); 11 echo "</pre>";die;
五、創建getNewMessage.php(測試用戶獲取新消息功能)
1 include './ManyPullMessage.class.php'; 2 $object=new ManyPullMessage('192.168.95.11'); 3 #用戶獲取新消息 4 $e=$object->getNewMessage('jane2'); 5 echo "<pre>"; 6 print_r($e); 7 echo "</pre>";die;
六、創建getPartMessage.php(測試用戶獲取某個羣組部分消息)
(多發送幾條消息,用於測試。568中共18條數據)
1 include './ManyPullMessage.class.php'; 2 $object=new ManyPullMessage('192.168.95.11'); 3 #用戶獲取某個羣組部分消息 4 $f=$object->getPartMessage('jane2', 568, 1, 10); 5 echo "<pre>"; 6 print_r($f); 7 echo "</pre>";die;
page=1,size=10
page=2,size=10
測試完畢,還須要別的功能能夠本身進行修改添加測試。
此次整理這篇文章相對比較趕,內心已經想着快點整理完趕忙學習其餘的技術啦,哈哈22333。各位大神請留步,懇請各位給點學習redis的指導意見,本人職業方向是PHP
(以上是本身的一些看法,如有不足或者錯誤的地方請各位指出)
做者:那一葉隨風
聲明:本博客文章爲原創,只表明本人在工做學習中某一時間內總結的觀點或結論。轉載時請在文章頁面明顯位置給出原文連接