Redis作消息隊列的好處在於它的輕量級,高併發,延遲敏感,應用場景有 即時數據分析、秒殺計數器、緩存等php
Redis作消息隊列待解決的問題:mysql
一、消息的可靠性: 沒有相應的機制保證消息的消費,當消費者消費失敗的時候,消息體丟失,須要手動處理。生產者只管向隊列中插入數據,無論消費者是否成功消費。redis
二、消費者掛掉消息不會丟失,可是須要從新觸發一下消費者,纔可以繼續消費消息。sql
代碼以下:數據庫
lib.php 是工具文件,裏面有數據庫的鏈接、Redis的鏈接:json
<?php /** * 獲取數據庫鏈接 * * @param $host * @param $username * @param $password * @param $database * @return mysqli */ function getDBConnection($host, $username, $password, $database){ $connection = new mysqli('p:'.$host, $username, $password, $database); if (!$connection) { echo "Error: Unable to connect to MySQL." . PHP_EOL; echo "Debugging errno: " . mysqli_connect_errno() . PHP_EOL; echo "Debugging error: " . mysqli_connect_error() . PHP_EOL; exit; } mysqli_query($connection, "set names 'utf8'"); return $connection; } /** * 獲取Redis鏈接 * * @param $host * @param $port * @param string $password * @param int $database * @return Redis */ function getRedis($host='127.0.0.1', $port='6379', $password=null, $database=0){ $redis = new Redis(); if(!$redis->connect($host, $port)){ die("Redis鏈接失敗:IP或端口有誤"); } if(!empty($password) && !$redis->auth($password)){ die("Redis鏈接失敗:密碼錯誤"); } if($database){ $redis->select($database); } // work中 subscribe 若是一段時間沒有接到消息,就會停掉而後停掉,因此加這個語句讓其永不超時 $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); return $redis; } /** * 打印消息日誌 * * @param $msg */ function stdout($msg=null){ $msg = '['.date('Y-m-d H:i:s').']'.$msg.chr(10);; fwrite(STDOUT, $msg); }
register.php 是消息發佈者,註釋的是將消息存入數據庫部分的代碼。緩存
首先想消息存入 register_users 隊列中,存入的 key是register_users;value是一個list,消息所有存入其中。用 redis-cli 查看數據的命令是:併發
LRANGE register_users 0 -1
register.php:高併發
<?php require './lib.php'; $name = $argv[1]; $mobile = $argv[2]; if(empty($name) || empty($mobile)){ die("參數錯誤"); } // $connection = getDBConnection('localhost:3306', 'root', 'root', 'blog'); // // 開啓事務 // mysqli_begin_transaction($connection); // $sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')"; // if(!mysqli_query($connection, $sql)){ // die("寫入用戶信息失敗,緣由:".$connection->error); // } $redis = getRedis(); // 添加消息 $result = $redis->lpush('register_users', json_encode(array('name'=>$name, 'mobile'=>$mobile), JSON_UNESCAPED_UNICODE)); if($result === false){ mysqli_rollback($connection); die("添加消息隊列失敗"); } // 發佈消息 $redis->publish('register_success', 'ok'); // 全部操做完成後提交事務 // mysqli_commit($connection); // $connection->close(); $redis->close();
work.php 作爲消息的消費者工具
<?php require './lib.php'; $redis = getRedis(); $redis->subscribe(['register_success'], function ($instance, $channelName, $message) { if($channelName == "register_success" && $message = "ok") { $redis = getRedis(); while($redis->lsize("register_users")>0) { $arr = $redis->brPop(['register_users'], 20); if(count($arr)) { $userInfo = json_decode($arr[1], true); stdout("新註冊用戶信息:"); stdout("姓名:".$userInfo['name']); stdout("手機號:".$userInfo['mobile']); stdout(); sleep(3); } } } });
register.php將消息放入redis 的 register_users隊列中,而後再使用 publish 將 register_success 消息發不出去。work.php 使用 subscribe 訂閱register_success 的消息。接收到 register_success 消息以後,讀取 register_users 的消息進行處理。