如今memcache在服務器緩存應用比較普遍,下面我來介紹memcache實現消息隊列等待的一個例子,有須要瞭解的朋友可參考。php
memche消息隊列的原理就是在key上作文章,用以作一個連續的數字加上前綴記錄序列化之後消息或者日誌。而後經過定時程序將內容落地到文件或者數據庫。html
php實現消息隊列的用處好比在作發送郵件時發送大量郵件很費時間的問題,那麼能夠採起隊列。
方便實現隊列的輕量級隊列服務器是:
starling支持memcache協議的輕量級持久化服務器
https://github.com/starling/starling
Beanstalkd輕量、高效,支持持久化,每秒可處理3000左右的隊列
http://kr.github.com/beanstalkd/
php中也可使用memcache/memcached來實現消息隊列git
<?php /** * Memcache 消息隊列類 */ class QMC { const PREFIX = 'ASDFASDFFWQKE'; /** * 初始化mc * @staticvar string $mc * @return Memcache */ static private function mc_init() { static $mc = null; if (is_null($mc)) { $mc = new Memcache; $mc->connect('127.0.0.1', 11211); } return $mc; } /** * mc 計數器,增長計數並返回新的計數 * @param string $key 計數器 * @param int $offset 計數增量,可爲負數.0爲不改變計數 * @param int $time 時間 * @return int/false 失敗是返回false,成功時返回更新計數器後的計數 */ static public function set_counter( $key, $offset, $time=0 ){ $mc = self::mc_init(); $val = $mc->get($key); if( !is_numeric($val) || $val < 0 ){ $ret = $mc->set( $key, 0, $time ); if( !$ret ) return false; $val = 0; } $offset = intval( $offset ); if( $offset > 0 ){ return $mc->increment( $key, $offset ); }elseif( $offset < 0 ){ return $mc->decrement( $key, -$offset ); } return $val; } /** * 寫入隊列 * @param string $key * @param mixed $value * @return bool */ static public function input( $key, $value ){ $mc = self::mc_init(); $w_key = self::PREFIX.$key.'W'; $v_key = self::PREFIX.$key.self::set_counter($w_key, 1); return $mc->set( $v_key, $value ); } /** * 讀取隊列裏的數據 * @param string $key * @param int $max 最多讀取條數 * @return array */ static public function output( $key, $max=100 ){ $out = array(); $mc = self::mc_init(); $r_key = self::PREFIX.$key.'R'; $w_key = self::PREFIX.$key.'W'; $r_p = self::set_counter( $r_key, 0 );//讀指針 $w_p = self::set_counter( $w_key, 0 );//寫指針 if( $r_p == 0 ) $r_p = 1; while( $w_p >= $r_p ){ if( --$max < 0 ) break; $v_key = self::PREFIX.$key.$r_p; $r_p = self::set_counter( $r_key, 1 ); $out[] = $mc->get( $v_key ); $mc->delete($v_key); } return $out; } } /** 使用方法: QMC::input($key, $value );//寫入隊列 $list = QMC::output($key);//讀取隊列 */ ?>
基於PHP共享內存實現的消息隊列:github
<?php /** * 使用共享內存的PHP循環內存隊列實現 * 支持多進程, 支持各類數據類型的存儲 * 注: 完成入隊或出隊操做,儘快使用unset(), 以釋放臨界區 * * @author wangbinandi@gmail.com * @created 2009-12-23 */ class ShmQueue { private $maxQSize = 0; // 隊列最大長度 private $front = 0; // 隊頭指針 private $rear = 0; // 隊尾指針 private $blockSize = 256; // 塊的大小(byte) private $memSize = 25600; // 最大共享內存(byte) private $shmId = 0; private $filePtr = './shmq.ptr'; private $semId = 0; public function __construct() { $shmkey = ftok(__FILE__, 't'); $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize ); $this->maxQSize = $this->memSize / $this->blockSize; // 申?一個信號量 $this->semId = sem_get($shmkey, 1); sem_acquire($this->semId); // 申請進入臨界區 $this->init(); } private function init() { if ( file_exists($this->filePtr) ){ $contents = file_get_contents($this->filePtr); $data = explode( '|', $contents ); if ( isset($data[0]) && isset($data[1])){ $this->front = (int)$data[0]; $this->rear = (int)$data[1]; } } } public function getLength() { return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize; } public function enQueue( $value ) { if ( $this->ptrInc($this->rear) == $this->front ){ // 隊滿 return false; } $data = $this->encode($value); shmop_write($this->shmId, $data, $this->rear ); $this->rear = $this->ptrInc($this->rear); return true; } public function deQueue() { if ( $this->front == $this->rear ){ // 隊空 return false; } $value = shmop_read($this->shmId, $this->front, $this->blockSize-1); $this->front = $this->ptrInc($this->front); return $this->decode($value); } private function ptrInc( $ptr ) { return ($ptr + $this->blockSize) % ($this->memSize); } private function encode( $value ) { $data = serialize($value) . "__eof"; echo ''; echo strlen($data); echo ''; echo $this->blockSize -1; echo ''; if ( strlen($data) > $this->blockSize -1 ){ throw new Exception(strlen($data)." is overload block size!"); } return $data; } private function decode( $value ) { $data = explode("__eof", $value); return unserialize($data[0]); } public function __destruct() { $data = $this->front . '|' . $this->rear; file_put_contents($this->filePtr, $data); sem_release($this->semId); // 出臨界區, 釋放信號量 } } /* // 進隊操做 $shmq = new ShmQueue(); $data = 'test data'; $shmq->enQueue($data); unset($shmq); // 出隊操做 $shmq = new ShmQueue(); $data = $shmq->deQueue(); unset($shmq); */ ?>
對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費。下面是我用PHP 實現的一個消息隊列,只須要在尾部插入一個數據,就操做尾部,不用操做整個消息隊列進行讀取,與操做。可是,這個消息隊列不是線程安全的,我只是儘可能的避免了衝突的可能性。若是消息不是很是的密集,好比幾秒鐘才一個,仍是能夠考慮這樣使用的。
若是你要實現線程安全的,一個建議是經過文件進行鎖定,而後進行操做。下面是代碼:
代碼以下:數據庫
class Memcache_Queue { private $memcache; private $name; private $prefix; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache object is null, new the object first."); } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); return $this; } function isEmpty() { return $this->size == 0; } function isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->front); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size < $offset) { return null; } $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("front"); $this->delete("size"); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } }