Memcache 通常用於緩存服務。可是不少時候,好比一個消息廣播系統,須要一個消息隊列。直接從數據庫取消息,負載每每不行。若是將整個消息隊列用一個key緩存到memcache裏面。對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費 。下面是我用PHP 實現的一個消息隊列,只須要在尾部插入一個數據,就操做尾部,不用操做整個消息隊列進行讀取,與操做。php
php-經過共享內存實現消息隊列和進程通訊的兩個類html
<?php class MQ{ public static $client; private static $m_real; private static $m_front; private static $m_data = array(); const QUEUE_MAX_NUM = 100000000; const QUEUE_FRONT_KEY = '_queue_item_front'; const QUEUE_REAL_KEY = '_queue_item_real'; public static function setupMq($conf) { self::$client = memcache_pconnect($conf); self::$m_real = memcache_get(self::$client, self::QUEUE_REAL_KEY); self::$m_front = memcache_get(self::$client, self::QUEUE_FRONT_KEY); if (!isset(self::$m_real) || empty(self::$m_real)) { self::$real= 0; } if (!isset(self::$m_front) || empty(self::$m_front)) { self::$m_front = 0; } return self::$client; } public static function add($queue, $data) { $result = false; if (self::$m_real < self::QUEUE_MAX_NUM) { if (memcache_add(self::$client, $queue.self::$m_real, $data)) { self::mqRealChange(); $result = true; } } return $result; } public static function get($key, $count) { $num = 0; for ($i=self::$m_front;$i<self::$m_front + $count;$i++) { if ($dataTmp = memcache_get(self::$client, $key.$i)) { self::$m_data[] = $dataTmp; memcache_delete(self::$client, $key.$i); $num++; } } if ($num>0) { self::mqFrontChange($num); } return self::$m_data; } private static function mqRealChange() { memcache_add(self::$client, self::QUEUE_REAL_KEY, 0); self::$m_real = memcache_increment(self::$client, self::QUEUE_REAL_KEY, 1); } private static function mqFrontChange($num) { memcache_add(self::$client, self::QUEUE_FRONT_KEY, 0); self::$m_front = memcache_increment(self::$client, self::QUEUE_FRONT_KEY, $num); } public static function mflush($memcache_obj) { memcache_flush($memcache_obj); } public static function Debug() { echo 'real:'.self::$m_real."<br>/r/n"; echo 'front:'.self::$m_front."<br>/r/n"; echo 'wait for process data:'.intval(self::$m_real - self::$m_front); echo "<br>/r/n"; echo '<pre>'; print_r(self::$m_data); echo '<pre>'; } } define('FLUSH_MQ',0);//CLEAN ALL DATA define('IS_ADD',0);//SET DATA $mobj = MQ::setupMq('127.0.0.1','11211'); if (FLUSH_MQ) { MQ::mflush($mobj); } else { if (IS_ADD) { MQ::add('user_sync', '1test'); MQ::add('user_sync', '2test'); MQ::add('user_sync', '3test'); MQ::add('user_sync', '4test'); MQ::add('user_sync', '5test'); MQ::add('user_sync', '6test'); } else { MQ::get('user_sync', 10); } } MQ::Debug();
實現消息隊列,能夠使用比較專業的工具,例如:Apache ActiveMQ、memcacheq…..,下面是兩個基本簡單的實現方式:linux
使用memcache方法來實現redis
<?php /* * memcache隊列類 * 支持多進程併發寫入、讀取 * 邊寫邊讀,AB面輪值替換 * @author lkk/lianq.net * @create on 9:25 2012-9-28 * * @example: $obj = new memcacheQueue('duilie'); $obj->add('1asdf'); $obj->getQueueLength(); $obj->read(11); $obj->get(8); */ class memcacheQueue{ public static $client; //memcache客戶端鏈接 public $access; //隊列是否可更新 private $currentSide; //當前輪值的隊列面:A/B private $lastSide; //上一輪值的隊列面:A/B private $sideAHead; //A面隊首值 private $sideATail; //A面隊尾值 private $sideBHead; //B面隊首值 private $sideBTail; //B面隊尾值 private $currentHead; //當前隊首值 private $currentTail; //當前隊尾值 private $lastHead; //上輪隊首值 private $lastTail; //上輪隊尾值 private $expire; //過時時間,秒,1~2592000,即30天內;0爲永不過時 private $sleepTime; //等待解鎖時間,微秒 private $queueName; //隊列名稱,惟一值 private $retryNum; //重試次數,= 10 * 理論併發數 const MAXNUM = 2000; //(單面)最大隊列數,建議上限10K const HEAD_KEY = '_lkkQueueHead_'; //隊列首key const TAIL_KEY = '_lkkQueueTail_'; //隊列尾key const VALU_KEY = '_lkkQueueValu_'; //隊列值key const LOCK_KEY = '_lkkQueueLock_'; //隊列鎖key const SIDE_KEY = '_lkkQueueSide_'; //輪值面key /* * 構造函數 * @param [config] array memcache服務器參數 * @param [queueName] string 隊列名稱 * @param [expire] string 過時時間 * @return NULL */ public function __construct($queueName ='',$expire='',$config =''){ if(empty($config)){ self::$client = memcache_pconnect('localhost',11211); }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211') self::$client = memcache_pconnect($config['host'],$config['port']); }elseif(is_string($config)){//"127.0.0.1:11211" $tmp = explode(':',$config); $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1'; $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211'; self::$client = memcache_pconnect($conf['host'],$conf['port']); } if(!self::$client) return false; ignore_user_abort(TRUE);//當客戶斷開鏈接,容許繼續執行 set_time_limit(0);//取消腳本執行延時上限 $this->access = false; $this->sleepTime = 1000; $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire; $this->expire = $expire; $this->queueName = $queueName; $this->retryNum = 10000; $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire); $this->getHeadNTail($queueName); if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0; if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0; if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0; if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0; } /* * 獲取隊列首尾值 * @param [queueName] string 隊列名稱 * @return NULL */ private function getHeadNTail($queueName){ $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY); $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY); $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY); $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY); } /* * 獲取當前輪值的隊列面 * @return string 隊列面名稱 */ public function getCurrentSide(){ $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY); if($currentSide == 'A'){ $this->currentSide = 'A'; $this->lastSide = 'B'; $this->currentHead = $this->sideAHead; $this->currentTail = $this->sideATail; $this->lastHead = $this->sideBHead; $this->lastTail = $this->sideBTail; }else{ $this->currentSide = 'B'; $this->lastSide = 'A'; $this->currentHead = $this->sideBHead; $this->currentTail = $this->sideBTail; $this->lastHead = $this->sideAHead; $this->lastTail = $this->sideATail; } return $this->currentSide; } /* * 隊列加鎖 * @return boolean */ private function getLock(){ if($this->access === false){ while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){ usleep($this->sleepTime); @$i++; if($i > $this->retryNum){//嘗試等待N次 return false; break; } } return $this->access = true; } return false; } /* * 隊列解鎖 * @return NULL */ private function unLock(){ memcache_delete(self::$client, $this->queueName .self::LOCK_KEY); $this->access = false; } /* * 添加數據 * @param [data] 要存儲的值 * @return boolean */ public function add($data){ $result = false; if(!$this->getLock()){ return $result; } $this->getHeadNTail($this->queueName); $this->getCurrentSide(); if($this->isFull()){ $this->unLock(); return false; } if($this->currentTail < self::MAXNUM){ $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail; if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){ $this->changeTail(); $result = true; } }else{//當前隊列已滿,更換輪值面 $this->unLock(); $this->changeCurrentSide(); return $this->add($data); } $this->unLock(); return $result; } /* * 取出數據 * @param [length] int 數據的長度 * @return array */ public function get($length=0){ if(!is_numeric($length)) return false; if(empty($length)) $length = self::MAXNUM * 2;//默認讀取全部 if(!$this->getLock()) return false; if($this->isEmpty()){ $this->unLock(); return false; } $keyArray = $this->getKeyArray($length); $lastKey = $keyArray['lastKey']; $currentKey = $keyArray['currentKey']; $keys = $keyArray['keys']; $this->changeHead($this->lastSide,$lastKey); $this->changeHead($this->currentSide,$currentKey); $data = @memcache_get(self::$client, $keys); foreach($keys as $v){//取出以後刪除 @memcache_delete(self::$client, $v, 0); } $this->unLock(); return $data; } /* * 讀取數據 * @param [length] int 數據的長度 * @return array */ public function read($length=0){ if(!is_numeric($length)) return false; if(empty($length)) $length = self::MAXNUM * 2;//默認讀取全部 $keyArray = $this->getKeyArray($length); $data = @memcache_get(self::$client, $keyArray['keys']); return $data; } /* * 獲取隊列某段長度的key數組 * @param [length] int 隊列長度 * @return array */ private function getKeyArray($length){ $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array()); $this->getHeadNTail($this->queueName); $this->getCurrentSide(); if(empty($length)) return $result; //先取上一面的key $i = $result['lastKey'] = 0; for($i=0;$i<$length;$i++){ $result['lastKey'] = $this->lastHead + $i; if($result['lastKey'] >= $this->lastTail) break; $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey']; } //再取當前面的key $j = $length - $i; $k = $result['currentKey'] = 0; for($k=0;$k<$j;$k++){ $result['currentKey'] = $this->currentHead + $k; if($result['currentKey'] >= $this->currentTail) break; $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey']; } return $result; } /* * 更新當前輪值面隊列尾的值 * @return NULL */ private function changeTail(){ $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY; memcache_add(self::$client, $tail_key, 0,false, $this->expire);//若是沒有,則插入;有則false; //memcache_increment(self::$client, $tail_key, 1);//隊列尾+1 $v = memcache_get(self::$client, $tail_key) +1; memcache_set(self::$client, $tail_key,$v,false,$this->expire); } /* * 更新隊列首的值 * @param [side] string 要更新的面 * @param [headValue] int 隊列首的值 * @return NULL */ private function changeHead($side,$headValue){ if($headValue < 1) return false; $head_key = $this->queueName .$side . self::HEAD_KEY; $tail_key = $this->queueName .$side . self::TAIL_KEY; $sideTail = memcache_get(self::$client, $tail_key); if($headValue < $sideTail){ memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire); }elseif($headValue >= $sideTail){ $this->resetSide($side); } } /* * 重置隊列面,即將該隊列面的隊首、隊尾值置爲0 * @param [side] string 要重置的面 * @return NULL */ private function resetSide($side){ $head_key = $this->queueName .$side . self::HEAD_KEY; $tail_key = $this->queueName .$side . self::TAIL_KEY; memcache_set(self::$client, $head_key,0,false,$this->expire); memcache_set(self::$client, $tail_key,0,false,$this->expire); } /* * 改變當前輪值隊列面 * @return string */ private function changeCurrentSide(){ $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY); if($currentSide == 'A'){ memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire); $this->currentSide = 'B'; }else{ memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire); $this->currentSide = 'A'; } return $this->currentSide; } /* * 檢查當前隊列是否已滿 * @return boolean */ public function isFull(){ $result = false; if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){ $result = true; } return $result; } /* * 檢查當前隊列是否爲空 * @return boolean */ public function isEmpty(){ $result = true; if($this->sideATail > 0 || $this->sideBTail > 0){ $result = false; } return $result; } /* * 獲取當前隊列的長度 * 該長度爲理論長度,某些元素因爲過時失效而丟失,真實長度小於或等於該長度 * @return int */ public function getQueueLength(){ $this->getHeadNTail($this->queueName); $this->getCurrentSide(); $sideALength = $this->sideATail - $this->sideAHead; $sideBLength = $this->sideBTail - $this->sideBHead; $result = $sideALength + $sideBLength; return $result; } /* * 清空當前隊列數據,僅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三個key * @return boolean */ public function clear(){ if(!$this->getLock()) return false; for($i=0;$i<self::MAXNUM;$i++){ @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0); @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0); } $this->unLock(); $this->resetSide('A'); $this->resetSide('B'); return true; } /* * 清除全部memcache緩存數據 * @return NULL */ public function memFlush(){ memcache_flush(self::$client); } } 利用PHP操做Linux消息隊列完成進程間通訊 當咱們開發的系統須要使用多進程方式運行時,進程間通訊便成了相當重要的環節。消息隊列(message queue)是Linux系統進程間通訊的一種方式。 關於Linux系統進程通訊的概念及實現可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/ 關於Linux系統消息隊列的概念及實現可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/part4/ PHP的sysvmsg模塊是對Linux系統支持的System V IPC中的System V消息隊列函數族的封裝。咱們須要利用sysvmsg模塊提供的函數來進進程間通訊。先來看一段示例代碼_1: Java代碼 <?php $message_queue_key = ftok(__FILE__, 'a'); $message_queue = msg_get_queue($message_queue_key, 0666); var_dump($message_queue); $message_queue_status = msg_stat_queue($message_queue); print_r($message_queue_status); //向消息隊列中寫 msg_send($message_queue, 1, "Hello,World!"); $message_queue_status = msg_stat_queue($message_queue); print_r($message_queue_status); //從消息隊列中讀 msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT); print_r($message."\r\n"); msg_remove_queue($message_queue); ?> 這段代碼的運行結果以下: Java代碼 resource(4) of type (sysvmsg queue) Array ( [msg_perm.uid] => 1000 [msg_perm.gid] => 1000 [msg_perm.mode] => 438 [msg_stime] => 0 [msg_rtime] => 0 [msg_ctime] => 1279849495 [msg_qnum] => 0 [msg_qbytes] => 16384 [msg_lspid] => 0 [msg_lrpid] => 0 ) Array ( [msg_perm.uid] => 1000 [msg_perm.gid] => 1000 [msg_perm.mode] => 438 [msg_stime] => 1279849495 [msg_rtime] => 0 [msg_ctime] => 1279849495 [msg_qnum] => 1 [msg_qbytes] => 16384 [msg_lspid] => 2184 [msg_lrpid] => 0 ) Hello,World! 能夠看到已成功從消息隊列中讀取「Hello,World!」字符串 下面列舉一下示例代碼中的主要函數: Java代碼 ftok ( string $pathname , string $proj ) 手冊上給出的解釋是:Convert a pathname and a project identifier to a System V IPC key。這個函數返回的鍵值惟一對應linux系統中一個消息隊列。在得到消息隊列的引用以前都須要調用這個函數。 msg_get_queue ( int $key [, int $perms ] ) msg_get_queue()會根據傳入的鍵值返回一個消息隊列的引用。若是linux系統中沒有消息隊列與鍵值對應,msg_get_queue()將會建立一個新的消息隊列。函數的第二個參數須要傳入一個int值,做爲新建立的消息隊列的權限值,默認爲0666。這個權限值與linux命令chmod中使用的數值是同一個意思,由於在linux系統中一切皆是文件。 msg_send ( resource $queue , int $msgtype , mixed $message [, bool $serialize [, bool $blocking [, int &$errorcode ]]] ) 顧名思義,該函數用來向消息隊列中寫數據。 msg_stat_queue ( resource $queue ) 這個函數會返回消息隊列的元數據。消息隊列元數據中的信息很完整,包括了消息隊列中待讀取的消息數、最後讀寫隊列的進程ID等。示例代碼在第8行調用該函數返回的數組中隊列中待讀取的消息數msg_qnum值爲0。 msg_receive ( resource $queue , int $desiredmsgtype , int &$msgtype , int $maxsize , mixed &$message [, bool $unserialize [, int $flags [, int &$errorcode ]]] ) msg_receive用於讀取消息隊列中的數據。 msg_remove_queue ( resource $queue ) msg_remove_queue用於銷燬一個隊列。 示例代碼_1只是展現了PHP操做消息隊列函數的應用。下面的代碼具體描述了進程間通訊的場景 Java代碼 <?php $message_queue_key = ftok ( __FILE__, 'a' ); $message_queue = msg_get_queue ( $message_queue_key, 0666 ); $pids = array (); for($i = 0; $i < 5; $i ++) { //建立子進程 $pids [$i] = pcntl_fork (); if ($pids [$i]) { echo "No.$i child process was created, the pid is $pids[$i]\r\n"; } elseif ($pids [$i] == 0) { $pid = posix_getpid (); echo "process.$pid is writing now\r\n"; msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" ); posix_kill ( $pid, SIGTERM ); } } do { msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT ); echo $message; //須要判斷隊列是否爲空,若是爲空就退出 //break; } while ( true ) ?>
運行結果爲:數據庫
No.0 child process was created, the pid is 5249 No.1 child process was created, the pid is 5250 No.2 child process was created, the pid is 5251 No.3 child process was created, the pid is 5252 No.4 child process was created, the pid is 5253 process.5251 is writing now this is process.5251's data process.5253 is writing now process.5252 is writing now process.5250 is writing now this is process.5253's data this is process.5252's data this is process.5250's data process.5249 is writing now this is process.5249's data
redis
http://www.neatstudio.com/show-976-1.shtml數組
php自帶的三個消息隊列相關的函數
http://www.zhangguangda.com/?p=89 緩存