消息隊列-一篇讀懂rabbitmq(生命週期,confirm模式,延遲隊列,集羣)

 

什麼是消息隊列?php

就是生產者生產一條消息,發送到這個rabbitmq,消費者鏈接rabbitmq而且進行消費,生產者和消費者並須要知道對方是如何工做的,從而實現程序之間的解耦,異步和削峯,這也就是消息隊列的做用。json

使用的場景也有不少,好比用戶支付購買以後的發送短信,增長用戶積分等等,只要能將業務邏輯抽象出來,就能很好得使用它。服務器

 

下面進入正題:架構

先來介紹一下基本概念和參與生命週期的各個成員。併發

publisher:消息生產者,負責建立消息,併發送到代理服務器(rabbitmq)異步

message:發送的消息,由 有效負載(payload) 和 標籤 (label) 組成tcp

exchange:交換器,負責接收消息並路由給服務器的隊列函數

queue:消息隊列,就是消息最後要去的地方。而後等待消費者取走並消費性能

consumer:消息消費者,與生產者對應,程序的另一方,負責消費信息,並完成相應的業務邏輯測試

channel:信道,在tcp之上創建的通道,負責傳送消息。隊列的傳輸都是基於信道來完成的。

broker:消息隊列服務器實體

下面來解析一下這張圖,這張圖是網上找的,雖然不夠詳細,可是勉強能用。

 

準備前提,開啓rabbitmq服務,生命隊列和交換器,並將二者進行綁定。

 

生產邏輯:

publisher 經過 broker服務器ip 和 端口 嘗試創建與 broker 的 tcp 鏈接,鏈接成功以後,會嘗試驗證驗證用戶名和密碼,若是錯誤,則拒絕訪問。

驗證成功以後,在tcp上創建一個 信道(channel) ,publisher 經過信道,發送消息到 broker,進入指定的虛擬主機virtual host。而後查找到交換器綁定的隊列,並將消息推送進入隊列。

 

消費邏輯:

consumer 創建鏈接和驗證和生產者同樣。接下來經過隊列名,直接找到隊列進行監聽並消費。

 

有幾個比較重要的知識點:

1.  rabbitmq的交換器並非真正意義的交換器,它本質上其實就是一張表,裏面存放和交換器名稱和消息隊列的映射關係。因此說隊列的傳輸都是經過信道來完成的。

2.  信道存在的意義在於 建立和銷燬tcp鏈接很是消耗資源

 

 

交換器的類型

當生產的消息進入虛擬主機時,會去尋找一張表,就是交換器和隊列映射關係的一張表。而交換器的類型也分了好多種,能夠根據不一樣的場景自由選擇。

目前總共分了4種,direct,fanout,topic,headers。其中headers由於性能問題幾乎不在使用,這裏就不作過多的討論。

1.direct

direct是直接,徹底匹配,單播的模式。

php簡單代碼實現:

//建立隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declare();

//建立交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declare();

//綁定交換機與隊列,並指定路由鍵
$q->bind($e_name, $k_route);

//生產消息
$ex->publish($message, $k_route)

特性: 當生產消息時,未指定交換器,則會默認使用 (AMQP default) 交換器,而後路由到 和路由鍵名稱相同的隊列中去

 

 

2.fanout

若是說 direct 是 單對單 的關係,那麼 fanout就是單對多的關係,即一個交換器對應多個隊列。

fanout交換器不經過路由鍵路由到隊列,而是經過將隊列綁定在交換器上,當消息進來時,直接路由到該交換器綁定的隊列去。

 

 

3.topic

topic交換器經過路由鍵,將自動匹配容許匹配的隊列,相比fanout,更加靈活,不過對架構要求更高。以下圖所示

$e_name = 'logs-exchange'; //交換機名
$q_name = 'msg-inbox-errors'; //隊列名

//建立隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declare();

//建立交換機對象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_TOPIC); //direct類型
$ex->setFlags(AMQP_DURABLE); //持久化
$ex->declare();


$q->bind($e_name, '*.msg-inbox');

$ex->publish($message, 'wonima.msg-inbox');

 

高級特性:

爲了確保消息可靠性,有兩種處理方式.

1.rabbitmq事務

事務主要是對信道進行設置,示例代碼以下

$channel->startTransaction(); //開始事務
for($i=0; $i<5; ++$i){
    $message = "TEST MESSAGE! 測試消息!";
    $message = $message.$i."---";
    echo "Send Message:".$ex->publish($message, 'xxxx')."\n";

}
$channel->commitTransaction(); //提交事務

經測驗,使用事務以後,性能會形成至關大的影響,與不實用事務相比,性能能夠相差百倍以上。

 

2.confirm 模式

當信道設置未 confirm 模式的時候,每一條消息都會獲的惟一的id。當消費者接收到消息的時候,自動發送 或 手動發送消息  進行消息確認。

//建立隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";

//第一種:自動應答
//$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答

//第二種:手動應答 $q->consume('processMessage'); /** * 消費回調函數 * 處理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); sleep(2); $myfile = fopen("newfile2.txt", "a+") or die("Unable to open file!"); $txt = $msg.time()."\n"; fwrite($myfile, $txt); fclose($myfile); echo $msg.time()."\n"; //處理消息 $q->ack($envelope->getDeliveryTag()); //手動發送ACK應答 }

 

應答模式最大的好處是就是異步,執行效率高。事務和應答模式相比,後者使用更加頻繁,前者幾乎沒有見到過。

 

延遲隊列

首先聲明rabbitmq是不支持延遲隊列的,可是咱們能夠利用死信隊列來完成。

 

 

實現延遲隊列也有多種方式:

第一種:設置死信隊列,並將 過時時間 加到隊列裏面

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {

        echo 'rabbit-mq 鏈接錯誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setFlags(AMQP_DURABLE);//持久化
    $exchange->setName($params['exchangeName'] ?: '');
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange->declareExchange();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName'] ?: '');
    $queue->setFlags(AMQP_DURABLE);
    $queue->setArguments(array(
        'x-dead-letter-exchange' => 'last_exchange',
        'x-dead-letter-routing-key' => 'last_route',
        'x-message-ttl' => 10000,
    ));
    $queue->declareQueue();

    //綁定
    $queue->bind($params['exchangeName'], $params['routeKey']);

    $exchange2 = new AMQPExchange($channel);
    $exchange2->setFlags(AMQP_DURABLE);//持久化
    $exchange2->setName('last_exchange');
    $exchange2->setType(AMQP_EX_TYPE_DIRECT); //direct類型
    $exchange2->declareExchange();

    $queue2 = new AMQPQueue($channel);
    $queue2->setName('last_queue');
    $queue2->setFlags(AMQP_DURABLE);
    $queue2->declareQueue();

    $queue2->bind('last_exchange', 'last_queue');

} catch (Exception $e) {

}

$time = time();
//生成消息
$exchange->publish((string)$time, $params['routeKey'], AMQP_MANDATORY, [
    'delivery_mode' => 2,
]);

 

第二種:設置死信隊列,並將 過時時間 加到消息裏面,這一種更加自由。

$msg = [
    'x-message-ttl' => 5,
    'ttl' => 5,
    'body' => time()
];
$msg = json_encode($msg);
$exchange->publish($msg, '', AMQP_MANDATORY, ['delivery_mode' => 2]);

 

第三種:使用延遲插件

 

集羣

先來談談rabbitmq的集羣是如何運行的

 

當你開啓來兩個rabbitmq(節點)服務,並將其組成爲一個集羣。每一個節點並不會將全部的隊列進行拷貝,元數據依舊保存在單個節點當中,其餘節點則是經過指針。

舉個例子:節點a和節點b組成了一個集羣,節點a保存着一堆元數據 c 和 元數據d的指針,用來指向節點b,節點b保存一堆元數據d 和 元數據 c的指針,用來指向節點a。

這樣作有兩個緣由

1 存儲空間 :若是一個節點存儲了1gb的數據,再添加節點,只會帶來一摸同樣的1gb的數據,很是浪費磁盤空間

2 性能:對於持久化消息來講,每一條消息都會觸發磁盤io,每次新增節點,網路和磁盤負載都會增長,相對於單機來講,性能不但不會提高,反而可能降低。

可是因爲交換器只是一張查詢表,並不是實際的路由器,所以將交換器在整個集羣進行復制也不會損耗太多的性能,因此交換器在每一個節點都會保存一份,以便於查詢。

相關文章
相關標籤/搜索