rabbitmq - (消息隊列) 的基本原理介紹

介紹php

MQ全稱爲Message Queue, 是一種分佈式應用程序的的通訊方法,它是消費-生產者模型的一個典型的表明,producer往消息隊列中不斷寫入消息,而另外一端consumer則能夠讀取或者訂閱隊列中的消息。RabbitMQ是MQ產品的典型表明,是一款基於AMQP協議可複用的企業消息系統html

系統架構算法

Rabbitmq系統最核心的組件是Exchange和Queue,Exchange和Queue是在rabbitmq server(又叫作broker)端,producer和consumer在應用端。緩存

原理大體圖(MQ:Message Queue):服務器

Queue

消息隊列,提供了FIFO的處理機制,具備緩存消息的能力。rabbitmq中,隊列消息能夠設置爲持久化,臨時或者自動刪除。markdown

  1. 設置爲持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
  2. 設置爲臨時隊列,queue中的數據在系統重啓以後就會丟失
  3. 設置爲自動刪除的隊列,當不存在用戶鏈接到server,隊列中的數據會被自動刪除

Exchange

Exchange相似於數據通訊網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是經過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange能夠和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue同樣,Exchange也可設置爲持久化,臨時或者自動刪除。網絡

Exchange有4種類型:direct(默認),fanout, topic, 和headers,不一樣類型的Exchange轉發消息的策略有所區別:架構

  1. Direct直接交換器,工做方式相似於單播,Exchange會將消息發送徹底匹配ROUTING_KEY的Queue
  2. fanout廣播是式交換器,無論消息的ROUTING_KEY設置爲何,Exchange都會將消息轉發給全部綁定的Queue
  3. topic主題交換器,工做方式相似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的全部隊列,好比,ROUTING_KEY爲user.stock的Message會轉發給綁定匹配模式爲 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)
  4. headers消息體的header匹配(ignore)

Binding

所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定能夠是多對多的關係composer

通訊過程

假設P1和C1註冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。基本的通訊流程大概以下所示:分佈式

  1. P1生產消息,發送給服務器端的Exchange
  2. Exchange收到消息,根據ROUTINKEY,將消息轉發給匹配的Queue1
  3. Queue1收到消息,將消息發送給訂閱者C1
  4. C1收到消息,發送ACK給隊列確認收到消息
  5. Queue1收到ACK,刪除隊列中緩存的此條消息

Consumer收到消息時須要顯式的向rabbit broker發送basic.ack消息或者consumer訂閱消息時設置auto_ack參數爲true。在通訊過程當中,隊列對ACK的處理有如下幾種狀況:

  1. 若是consumer接收了消息,發送ack,rabbitmq會刪除隊列中這個消息,發送另外一條消息給consumer。
  2. 若是cosumer接受了消息, 但在發送ack以前斷開鏈接,rabbitmq會認爲這條消息沒有被deliver,在consumer在次鏈接的時候,這條消息會被redeliver。
  3. 若是consumer接受了消息,可是程序中有bug,忘記了ack,rabbitmq不會重複發送消息。
  4. rabbitmq2.0.0和以後的版本支持consumer reject某條(類)消息,能夠經過設置requeue參數中的reject爲true達到目地,那麼rabbitmq將會把消息發送給下一個註冊的consumer。

php 生產者、消費者示例

先用 composer 加載 mq 拓展文件

{ 
  "require": { 
    "php-amqplib/php-amqplib": "2.7.*" //增長這行 
  } 
}

 

 
  
class RabbitMq extends Command
{
protected $config = [
'host' => '192.168.1.18',
'port' => '5672',
'user' => 'admin',
'pwd' => '123456',
'vhost'=> '/',
];

protected $exchangeName = 'kd_sms_send_ex'; //交換機名
protected $queueName = 'kd_sms_send_q'; //隊列名稱
protected $routingKey = 'sms_send'; //路由關鍵字(也能夠省略)

protected function configure()
{
$this->setName('mq')
->addOption('type', null, Option::VALUE_REQUIRED, 'date yyyymmdd', 'con') // pro
->setDescription('Mq test');
}

protected function execute(Input $input, Output $output)
{
$type = $input->getOption('type');

if ($type=='con') {
// 消費
$this->consumption();
}elseif ($type=='pro'){
// 生產
$this->production();
}

echo 'mq test end' .PHP_EOL;
}


// 消費
protected function consumption()
{
//鏈接RabbitMQ
$conn = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pwd']
);
// 開啓一個通道
$channel = $conn->channel();

// 對於正在繁忙的客戶端,沒獲得迴應以前,不向其發送新消息
$channel->basic_qos(null,1,null);

// 聲明一個隊列 第三個參數爲聲明隊列持久性
$channel->queue_declare($this->queueName, false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg){
echo date('Y-m-d H:i:s') . ' body is '. $msg->body ."\n";
};
// 開始隊列消費 第四個參數表示通知服務端消費狀況
$channel->basic_consume($this->queueName, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

// 關閉通道
$channel->close();
// 關閉鏈接
$conn->close();
}

// 生產
protected function production()
{
//創建生產者與mq之間的鏈接
$conn = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pwd']
);
//在已鏈接基礎上創建生產者與mq之間的通道
$channel = $conn->channel();

//聲明初始化交換機
$channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
//聲明初始化一條隊列 第三個參數爲聲明隊列持久性
$channel->queue_declare($this->queueName, false, true, false, false);
//將隊列與某個交換機進行綁定,並使用路由關鍵字
$channel->queue_bind($this->queueName, $this->exchangeName, $this->routingKey);

//生成消息
$msgBody = serialize(["name" => "dxx", "age" => 18]);
$param = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息設爲持久化
];
$msg = new AMQPMessage($msgBody, $param);
//推送消息到某個交換機
$channel->basic_publish($msg, $this->exchangeName, $this->routingKey);

    // 關閉通道
    $channel->close();
    // 關閉鏈接
    $conn->close();
  }

}

 

轉自:https://www.cnblogs.com/jun-ma/p/4840869.html
相關文章
相關標籤/搜索