php-經過共享內存實現消息隊列和進程通訊

Memcache 通常用於緩存服務。可是不少時候,好比一個消息廣播系統,須要一個消息隊列。直接從數據庫取消息,負載每每不行。若是將整個消息隊列用一個key緩存到memcache裏面。對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費 。下面是我用PHP 實現的一個消息隊列,只須要在尾部插入一個數據,就操做尾部,不用操做整個消息隊列進行讀取,與操做。php

 

php-經過共享內存實現消息隊列和進程通訊的兩個類html

<?php  

class MQ{  

    public static $client;  

    private static $m_real;  

    private static $m_front;  

    private static $m_data = array();  

    const QUEUE_MAX_NUM = 100000000;  

    const QUEUE_FRONT_KEY = '_queue_item_front';  

    const QUEUE_REAL_KEY = '_queue_item_real';  

    public static function setupMq($conf) {  

        self::$client = memcache_pconnect($conf);  

        self::$m_real = memcache_get(self::$client, self::QUEUE_REAL_KEY);  

        self::$m_front = memcache_get(self::$client, self::QUEUE_FRONT_KEY);  

        if (!isset(self::$m_real) || empty(self::$m_real)) {  

            self::$real= 0;  

        }  

        if (!isset(self::$m_front) || empty(self::$m_front)) {  

            self::$m_front = 0;  

        }  

        return self::$client;  

    }  

    public static function add($queue, $data) {  

        $result = false;  

        if (self::$m_real < self::QUEUE_MAX_NUM) {  

            if (memcache_add(self::$client, $queue.self::$m_real, $data)) {  

                self::mqRealChange();  

                $result = true;  

            }  

        }  

          

        return $result;  

    }  

    public static function get($key, $count) {  

        $num = 0;  

        for ($i=self::$m_front;$i<self::$m_front + $count;$i++) {  

            if ($dataTmp = memcache_get(self::$client, $key.$i)) {  

                self::$m_data[] = $dataTmp;  

                memcache_delete(self::$client, $key.$i);  

                $num++;  

            }  

        }  

        if ($num>0) {  

            self::mqFrontChange($num);  

        }  

        return self::$m_data;  

    }  

    private static function mqRealChange() {  

        memcache_add(self::$client, self::QUEUE_REAL_KEY, 0);  

        self::$m_real = memcache_increment(self::$client, self::QUEUE_REAL_KEY, 1);  

    }  

      

    private static function mqFrontChange($num) {  

        memcache_add(self::$client, self::QUEUE_FRONT_KEY, 0);  

        self::$m_front = memcache_increment(self::$client, self::QUEUE_FRONT_KEY, $num);  

    }  

    public static function mflush($memcache_obj) {  

        memcache_flush($memcache_obj);  

    }  

    public static function Debug() {  

        echo 'real:'.self::$m_real."<br>/r/n";  

        echo 'front:'.self::$m_front."<br>/r/n";  

        echo 'wait for process data:'.intval(self::$m_real - self::$m_front);  

        echo "<br>/r/n";  

        echo '<pre>';  

        print_r(self::$m_data);  

        echo '<pre>';  

    }  

}  

define('FLUSH_MQ',0);//CLEAN ALL DATA  

define('IS_ADD',0);//SET DATA  

$mobj = MQ::setupMq('127.0.0.1','11211');  

if (FLUSH_MQ) {  

    MQ::mflush($mobj);  

} else {  

    if (IS_ADD) {  

        MQ::add('user_sync', '1test');  

        MQ::add('user_sync', '2test');  

        MQ::add('user_sync', '3test');  

        MQ::add('user_sync', '4test');  

        MQ::add('user_sync', '5test');  

        MQ::add('user_sync', '6test');  

    } else {  

        MQ::get('user_sync', 10);  

    }  

      

}  

MQ::Debug();  

 

 

實現消息隊列,能夠使用比較專業的工具,例如:Apache ActiveMQ、memcacheq…..,下面是兩個基本簡單的實現方式:linux

使用memcache方法來實現redis

<?php  

/* 

 * memcache隊列類 

 * 支持多進程併發寫入、讀取 

 * 邊寫邊讀,AB面輪值替換 

 * @author lkk/lianq.net 

 * @create on 9:25 2012-9-28 

 * 

 * @example: 

    $obj = new memcacheQueue('duilie'); 

    $obj->add('1asdf'); 

    $obj->getQueueLength(); 

    $obj->read(11); 

    $obj->get(8); 

 */  

  

class memcacheQueue{  

    public static   $client;            //memcache客戶端鏈接  

    public          $access;            //隊列是否可更新     

    private         $currentSide;       //當前輪值的隊列面:A/B  

    private         $lastSide;          //上一輪值的隊列面:A/B  

    private         $sideAHead;         //A面隊首值  

    private         $sideATail;         //A面隊尾值  

    private         $sideBHead;         //B面隊首值  

    private         $sideBTail;         //B面隊尾值  

    private         $currentHead;       //當前隊首值  

    private         $currentTail;       //當前隊尾值  

    private         $lastHead;          //上輪隊首值  

    private         $lastTail;          //上輪隊尾值   

    private         $expire;            //過時時間,秒,1~2592000,即30天內;0爲永不過時  

    private         $sleepTime;         //等待解鎖時間,微秒  

    private         $queueName;         //隊列名稱,惟一值  

    private         $retryNum;          //重試次數,= 10 * 理論併發數  

      

    const   MAXNUM      = 2000;                 //(單面)最大隊列數,建議上限10K  

    const   HEAD_KEY    = '_lkkQueueHead_';     //隊列首key  

    const   TAIL_KEY    = '_lkkQueueTail_';     //隊列尾key  

    const   VALU_KEY    = '_lkkQueueValu_';     //隊列值key  

    const   LOCK_KEY    = '_lkkQueueLock_';     //隊列鎖key  

    const   SIDE_KEY    = '_lkkQueueSide_';     //輪值面key  

      

    /* 

     * 構造函數 

     * @param   [config]    array   memcache服務器參數 

     * @param   [queueName] string  隊列名稱 

     * @param   [expire]    string  過時時間 

     * @return  NULL 

     */  

    public function __construct($queueName ='',$expire='',$config =''){  

        if(empty($config)){  

            self::$client = memcache_pconnect('localhost',11211);  

        }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')  

            self::$client = memcache_pconnect($config['host'],$config['port']);  

        }elseif(is_string($config)){//"127.0.0.1:11211"  

            $tmp = explode(':',$config);  

            $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';  

            $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';  

            self::$client = memcache_pconnect($conf['host'],$conf['port']);       

        }  

        if(!self::$client) return false;  

          

        ignore_user_abort(TRUE);//當客戶斷開鏈接,容許繼續執行  

        set_time_limit(0);//取消腳本執行延時上限  

          

        $this->access = false;  

        $this->sleepTime = 1000;  

        $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;  

        $this->expire = $expire;  

        $this->queueName = $queueName;  

        $this->retryNum = 10000;  

          

        $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);  

        $this->getHeadNTail($queueName);  

        if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;  

        if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;  

        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  

        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  

    }  

      

    /* 

     * 獲取隊列首尾值 

     * @param   [queueName] string  隊列名稱 

     * @return  NULL 

     */  

    private function getHeadNTail($queueName){  

        $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);  

        $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);  

        $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);  

        $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);  

    }  

      

    /* 

     * 獲取當前輪值的隊列面 

     * @return  string  隊列面名稱 

     */  

    public function getCurrentSide(){  

        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  

        if($currentSide == 'A'){  

            $this->currentSide = 'A';  

            $this->lastSide = 'B';     

  

            $this->currentHead   = $this->sideAHead;  

            $this->currentTail   = $this->sideATail;  

            $this->lastHead      = $this->sideBHead;  

            $this->lastTail      = $this->sideBTail;            

        }else{  

            $this->currentSide = 'B';  

            $this->lastSide = 'A';  

  

            $this->currentHead   = $this->sideBHead;  

            $this->currentTail   = $this->sideBTail;  

            $this->lastHead      = $this->sideAHead;  

            $this->lastTail      = $this->sideATail;                        

        }  

          

        return $this->currentSide;  

    }  

      

    /* 

     * 隊列加鎖 

     * @return boolean 

     */  

    private function getLock(){  

        if($this->access === false){  

            while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){  

                usleep($this->sleepTime);  

                @$i++;  

                if($i > $this->retryNum){//嘗試等待N次  

                    return false;  

                    break;  

                }  

            }  

            return $this->access = true;  

        }  

        return false;  

    }  

      

    /* 

     * 隊列解鎖 

     * @return NULL 

     */  

    private function unLock(){  

        memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);  

        $this->access = false;  

    }  

      

    /* 

     * 添加數據 

     * @param   [data]  要存儲的值 

     * @return  boolean 

     */  

    public function add($data){  

        $result = false;  

        if(!$this->getLock()){  

            return $result;  

        }   

        $this->getHeadNTail($this->queueName);  

        $this->getCurrentSide();  

          

        if($this->isFull()){  

            $this->unLock();  

            return false;  

        }  

          

        if($this->currentTail < self::MAXNUM){  

            $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;  

            if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){  

                $this->changeTail();  

                $result = true;  

            }  

        }else{//當前隊列已滿,更換輪值面  

            $this->unLock();  

            $this->changeCurrentSide();  

            return $this->add($data);  

        }  

  

        $this->unLock();  

        return $result;  

    }  

      

    /* 

     * 取出數據 

     * @param   [length]    int 數據的長度 

     * @return  array 

     */  

    public function get($length=0){  

        if(!is_numeric($length)) return false;  

        if(empty($length)) $length = self::MAXNUM * 2;//默認讀取全部  

        if(!$this->getLock()) return false;  

  

        if($this->isEmpty()){  

            $this->unLock();  

            return false;  

        }  

          

        $keyArray   = $this->getKeyArray($length);  

        $lastKey    = $keyArray['lastKey'];  

        $currentKey = $keyArray['currentKey'];  

        $keys       = $keyArray['keys'];  

        $this->changeHead($this->lastSide,$lastKey);  

        $this->changeHead($this->currentSide,$currentKey);  

          

        $data   = @memcache_get(self::$client, $keys);  

        foreach($keys as $v){//取出以後刪除  

            @memcache_delete(self::$client, $v, 0);  

        }  

        $this->unLock();  

  

        return $data;  

    }  

      

    /* 

     * 讀取數據 

     * @param   [length]    int 數據的長度 

     * @return  array 

     */  

    public function read($length=0){  

        if(!is_numeric($length)) return false;  

        if(empty($length)) $length = self::MAXNUM * 2;//默認讀取全部  

        $keyArray   = $this->getKeyArray($length);  

        $data   = @memcache_get(self::$client, $keyArray['keys']);  

        return $data;  

    }  

      

    /* 

     * 獲取隊列某段長度的key數組 

     * @param   [length]    int 隊列長度 

     * @return  array 

     */  

    private function getKeyArray($length){  

        $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());  

        $this->getHeadNTail($this->queueName);  

        $this->getCurrentSide();  

        if(empty($length)) return $result;  

          

        //先取上一面的key  

        $i = $result['lastKey'] = 0;  

        for($i=0;$i<$length;$i++){  

            $result['lastKey'] = $this->lastHead + $i;  

            if($result['lastKey'] >= $this->lastTail) break;  

            $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];  

        }  

          

        //再取當前面的key  

        $j = $length - $i;  

        $k = $result['currentKey'] = 0;  

        for($k=0;$k<$j;$k++){  

            $result['currentKey'] = $this->currentHead + $k;  

            if($result['currentKey'] >= $this->currentTail) break;  

            $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];  

        }  

  

        return $result;  

    }  

      

    /* 

     * 更新當前輪值面隊列尾的值 

     * @return  NULL 

     */  

    private function changeTail(){  

        $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;  

        memcache_add(self::$client, $tail_key, 0,false, $this->expire);//若是沒有,則插入;有則false;  

        //memcache_increment(self::$client, $tail_key, 1);//隊列尾+1  

        $v = memcache_get(self::$client, $tail_key) +1;  

        memcache_set(self::$client, $tail_key,$v,false,$this->expire);  

    }  

      

    /* 

     * 更新隊列首的值 

     * @param   [side]      string  要更新的面 

     * @param   [headValue] int     隊列首的值 

     * @return  NULL 

     */  

    private function changeHead($side,$headValue){  

        if($headValue < 1) return false;  

        $head_key = $this->queueName .$side . self::HEAD_KEY;  

        $tail_key = $this->queueName .$side . self::TAIL_KEY;  

        $sideTail = memcache_get(self::$client, $tail_key);  

        if($headValue < $sideTail){  

            memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);  

        }elseif($headValue >= $sideTail){  

            $this->resetSide($side);  

        }  

    }  

      

    /* 

     * 重置隊列面,即將該隊列面的隊首、隊尾值置爲0 

     * @param   [side]  string  要重置的面 

     * @return  NULL 

     */  

    private function resetSide($side){  

        $head_key = $this->queueName .$side . self::HEAD_KEY;  

        $tail_key = $this->queueName .$side . self::TAIL_KEY;  

        memcache_set(self::$client, $head_key,0,false,$this->expire);  

        memcache_set(self::$client, $tail_key,0,false,$this->expire);  

    }  

      

      

    /* 

     * 改變當前輪值隊列面 

     * @return  string 

     */  

    private function changeCurrentSide(){  

        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  

        if($currentSide == 'A'){  

            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);  

            $this->currentSide = 'B';  

        }else{  

            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);  

            $this->currentSide = 'A';  

        }  

        return $this->currentSide;  

    }  

      

    /* 

     * 檢查當前隊列是否已滿 

     * @return  boolean 

     */  

    public function isFull(){  

        $result = false;  

        if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){  

            $result = true;  

        }  

        return $result;  

    }  

      

    /* 

     * 檢查當前隊列是否爲空 

     * @return  boolean 

     */  

    public function isEmpty(){  

        $result = true;  

        if($this->sideATail > 0 || $this->sideBTail > 0){  

            $result = false;  

        }  

        return $result;  

    }  

      

    /* 

     * 獲取當前隊列的長度 

     * 該長度爲理論長度,某些元素因爲過時失效而丟失,真實長度小於或等於該長度 

     * @return  int 

     */  

    public function getQueueLength(){  

        $this->getHeadNTail($this->queueName);  

        $this->getCurrentSide();  

  

        $sideALength = $this->sideATail - $this->sideAHead;  

        $sideBLength = $this->sideBTail - $this->sideBHead;  

        $result = $sideALength + $sideBLength;  

          

        return $result;  

    }  

      

  

    /* 

     * 清空當前隊列數據,僅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三個key 

     * @return  boolean 

     */  

    public function clear(){  

        if(!$this->getLock()) return false;  

        for($i=0;$i<self::MAXNUM;$i++){  

            @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);  

            @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);  

        }  

        $this->unLock();  

        $this->resetSide('A');  

        $this->resetSide('B');  

        return true;  

    }  

      

    /* 

     * 清除全部memcache緩存數據 

     * @return  NULL 

     */  

    public function memFlush(){  

        memcache_flush(self::$client);  

    }  

}  

利用PHP操做Linux消息隊列完成進程間通訊

當咱們開發的系統須要使用多進程方式運行時,進程間通訊便成了相當重要的環節。消息隊列(message queue)是Linux系統進程間通訊的一種方式。
  關於Linux系統進程通訊的概念及實現可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/
  關於Linux系統消息隊列的概念及實現可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/part4/
  PHP的sysvmsg模塊是對Linux系統支持的System V IPC中的System V消息隊列函數族的封裝。咱們須要利用sysvmsg模塊提供的函數來進進程間通訊。先來看一段示例代碼_1:

Java代碼  
<?php  

$message_queue_key = ftok(__FILE__, 'a');  

  

$message_queue = msg_get_queue($message_queue_key, 0666);  

var_dump($message_queue);  

  

$message_queue_status = msg_stat_queue($message_queue);  

print_r($message_queue_status);  

  

//向消息隊列中寫  

msg_send($message_queue, 1, "Hello,World!");  

  

$message_queue_status = msg_stat_queue($message_queue);  

print_r($message_queue_status);  

  

//從消息隊列中讀  

msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT);  

print_r($message."\r\n");  

  

msg_remove_queue($message_queue);  

?>  

 這段代碼的運行結果以下:

Java代碼  
resource(4) of type (sysvmsg queue)  

Array  

(  

    [msg_perm.uid] => 1000  

    [msg_perm.gid] => 1000  

    [msg_perm.mode] => 438  

    [msg_stime] => 0  

    [msg_rtime] => 0  

    [msg_ctime] => 1279849495  

    [msg_qnum] => 0  

    [msg_qbytes] => 16384  

    [msg_lspid] => 0  

    [msg_lrpid] => 0  

)  

Array  

(  

    [msg_perm.uid] => 1000  

    [msg_perm.gid] => 1000  

    [msg_perm.mode] => 438  

    [msg_stime] => 1279849495  

    [msg_rtime] => 0  

    [msg_ctime] => 1279849495  

    [msg_qnum] => 1  

    [msg_qbytes] => 16384  

    [msg_lspid] => 2184  

    [msg_lrpid] => 0  

)  

Hello,World!  

 能夠看到已成功從消息隊列中讀取「Hello,World!」字符串

下面列舉一下示例代碼中的主要函數:

Java代碼  
ftok ( string $pathname , string $proj )   

    手冊上給出的解釋是:Convert a pathname and a project identifier to a System V IPC key。這個函數返回的鍵值惟一對應linux系統中一個消息隊列。在得到消息隊列的引用以前都須要調用這個函數。  

  

msg_get_queue ( int $key [, int $perms ] )  

    msg_get_queue()會根據傳入的鍵值返回一個消息隊列的引用。若是linux系統中沒有消息隊列與鍵值對應,msg_get_queue()將會建立一個新的消息隊列。函數的第二個參數須要傳入一個int值,做爲新建立的消息隊列的權限值,默認爲0666。這個權限值與linux命令chmod中使用的數值是同一個意思,由於在linux系統中一切皆是文件。  

  

msg_send ( resource $queue , int $msgtype , mixed $message [, bool $serialize [, bool $blocking [, int &$errorcode ]]] )  

    顧名思義,該函數用來向消息隊列中寫數據。  

  

msg_stat_queue ( resource $queue )   

    這個函數會返回消息隊列的元數據。消息隊列元數據中的信息很完整,包括了消息隊列中待讀取的消息數、最後讀寫隊列的進程ID等。示例代碼在第8行調用該函數返回的數組中隊列中待讀取的消息數msg_qnum值爲0。  

  

msg_receive ( resource $queue , int $desiredmsgtype , int &$msgtype , int $maxsize , mixed &$message [, bool $unserialize [, int $flags [, int &$errorcode ]]] )   

    msg_receive用於讀取消息隊列中的數據。  

  

msg_remove_queue ( resource $queue )   

    msg_remove_queue用於銷燬一個隊列。  

 示例代碼_1只是展現了PHP操做消息隊列函數的應用。下面的代碼具體描述了進程間通訊的場景

Java代碼  
<?php  

$message_queue_key = ftok ( __FILE__, 'a' );  

$message_queue = msg_get_queue ( $message_queue_key, 0666 );  

  

$pids = array ();  

for($i = 0; $i < 5; $i ++) {  

    //建立子進程  

    $pids [$i] = pcntl_fork ();  

      

    if ($pids [$i]) {  

        echo "No.$i child process was created, the pid is $pids[$i]\r\n";  

    } elseif ($pids [$i] == 0) {  

        $pid = posix_getpid ();  

        echo "process.$pid is writing now\r\n";  

          

        msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );  

        posix_kill ( $pid, SIGTERM );  

    }  

}  

  

do {  

    msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );  

    echo $message;  

  

    //須要判斷隊列是否爲空,若是爲空就退出  

//break;  

} while ( true )  

?>  

 

 

運行結果爲:數據庫

No.0 child process was created, the pid is 5249  

No.1 child process was created, the pid is 5250  

No.2 child process was created, the pid is 5251  

No.3 child process was created, the pid is 5252  

No.4 child process was created, the pid is 5253  

process.5251 is writing now  

this is process.5251's data  

process.5253 is writing now  

process.5252 is writing now  

process.5250 is writing now  

this is process.5253's data  

this is process.5252's data  

this is process.5250's data  

process.5249 is writing now  

this is process.5249's data  

 

redis
http://www.neatstudio.com/show-976-1.shtml數組

 

php自帶的三個消息隊列相關的函數
http://www.zhangguangda.com/?p=89 緩存

相關文章
相關標籤/搜索