通訊過程
假設P1和C1註冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。基本的通訊流程大概以下所示:分佈式
- P1生產消息,發送給服務器端的Exchange
- Exchange收到消息,根據ROUTINKEY,將消息轉發給匹配的Queue1
- Queue1收到消息,將消息發送給訂閱者C1
- C1收到消息,發送ACK給隊列確認收到消息
- Queue1收到ACK,刪除隊列中緩存的此條消息
Consumer收到消息時須要顯式的向rabbit broker發送basic.ack消息或者consumer訂閱消息時設置auto_ack參數爲true。在通訊過程當中,隊列對ACK的處理有如下幾種狀況:
- 若是consumer接收了消息,發送ack,rabbitmq會刪除隊列中這個消息,發送另外一條消息給consumer。
- 若是cosumer接受了消息, 但在發送ack以前斷開鏈接,rabbitmq會認爲這條消息沒有被deliver,在consumer在次鏈接的時候,這條消息會被redeliver。
- 若是consumer接受了消息,可是程序中有bug,忘記了ack,rabbitmq不會重複發送消息。
- rabbitmq2.0.0和以後的版本支持consumer reject某條(類)消息,能夠經過設置requeue參數中的reject爲true達到目地,那麼rabbitmq將會把消息發送給下一個註冊的consumer。
php 生產者、消費者示例
先用 composer 加載 mq 拓展文件
{ "require": { "php-amqplib/php-amqplib": "2.7.*" //增長這行 } }
class RabbitMq extends Command
{
protected $config = [
'host' => '192.168.1.18',
'port' => '5672',
'user' => 'admin',
'pwd' => '123456',
'vhost'=> '/',
];
protected $exchangeName = 'kd_sms_send_ex'; //交換機名
protected $queueName = 'kd_sms_send_q'; //隊列名稱
protected $routingKey = 'sms_send'; //路由關鍵字(也能夠省略)
protected function configure()
{
$this->setName('mq')
->addOption('type', null, Option::VALUE_REQUIRED, 'date yyyymmdd', 'con') // pro
->setDescription('Mq test');
}
protected function execute(Input $input, Output $output)
{
$type = $input->getOption('type');
if ($type=='con') {
// 消費
$this->consumption();
}elseif ($type=='pro'){
// 生產
$this->production();
}
echo 'mq test end' .PHP_EOL;
}
// 消費
protected function consumption()
{
//鏈接RabbitMQ
$conn = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pwd']
);
// 開啓一個通道
$channel = $conn->channel();
// 對於正在繁忙的客戶端,沒獲得迴應以前,不向其發送新消息
$channel->basic_qos(null,1,null);
// 聲明一個隊列 第三個參數爲聲明隊列持久性
$channel->queue_declare($this->queueName, false