什麼是消息隊列?php
就是生產者生產一條消息,發送到這個rabbitmq,消費者鏈接rabbitmq而且進行消費,生產者和消費者並須要知道對方是如何工做的,從而實現程序之間的解耦,異步和削峯,這也就是消息隊列的做用。json
使用的場景也有不少,好比用戶支付購買以後的發送短信,增長用戶積分等等,只要能將業務邏輯抽象出來,就能很好得使用它。服務器
下面進入正題:架構
先來介紹一下基本概念和參與生命週期的各個成員。併發
publisher:消息生產者,負責建立消息,併發送到代理服務器(rabbitmq)異步
message:發送的消息,由 有效負載(payload) 和 標籤 (label) 組成tcp
exchange:交換器,負責接收消息並路由給服務器的隊列函數
queue:消息隊列,就是消息最後要去的地方。而後等待消費者取走並消費性能
consumer:消息消費者,與生產者對應,程序的另一方,負責消費信息,並完成相應的業務邏輯測試
channel:信道,在tcp之上創建的通道,負責傳送消息。隊列的傳輸都是基於信道來完成的。
broker:消息隊列服務器實體
下面來解析一下這張圖,這張圖是網上找的,雖然不夠詳細,可是勉強能用。
準備前提,開啓rabbitmq服務,生命隊列和交換器,並將二者進行綁定。
生產邏輯:
publisher 經過 broker服務器ip 和 端口 嘗試創建與 broker 的 tcp 鏈接,鏈接成功以後,會嘗試驗證驗證用戶名和密碼,若是錯誤,則拒絕訪問。
驗證成功以後,在tcp上創建一個 信道(channel) ,publisher 經過信道,發送消息到 broker,進入指定的虛擬主機virtual host。而後查找到交換器綁定的隊列,並將消息推送進入隊列。
消費邏輯:
consumer 創建鏈接和驗證和生產者同樣。接下來經過隊列名,直接找到隊列進行監聽並消費。
有幾個比較重要的知識點:
1. rabbitmq的交換器並非真正意義的交換器,它本質上其實就是一張表,裏面存放和交換器名稱和消息隊列的映射關係。因此說隊列的傳輸都是經過信道來完成的。
2. 信道存在的意義在於 建立和銷燬tcp鏈接很是消耗資源
交換器的類型
當生產的消息進入虛擬主機時,會去尋找一張表,就是交換器和隊列映射關係的一張表。而交換器的類型也分了好多種,能夠根據不一樣的場景自由選擇。
目前總共分了4種,direct,fanout,topic,headers。其中headers由於性能問題幾乎不在使用,這裏就不作過多的討論。
1.direct
direct是直接,徹底匹配,單播的模式。
php簡單代碼實現:
//建立隊列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 $q->declare(); //建立交換機對象 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 $ex->declare(); //綁定交換機與隊列,並指定路由鍵 $q->bind($e_name, $k_route);
//生產消息
$ex->publish($message, $k_route)
特性: 當生產消息時,未指定交換器,則會默認使用 (AMQP default) 交換器,而後路由到 和路由鍵名稱相同的隊列中去
2.fanout
若是說 direct 是 單對單 的關係,那麼 fanout就是單對多的關係,即一個交換器對應多個隊列。
fanout交換器不經過路由鍵路由到隊列,而是經過將隊列綁定在交換器上,當消息進來時,直接路由到該交換器綁定的隊列去。
3.topic
topic交換器經過路由鍵,將自動匹配容許匹配的隊列,相比fanout,更加靈活,不過對架構要求更高。以下圖所示
$e_name = 'logs-exchange'; //交換機名 $q_name = 'msg-inbox-errors'; //隊列名 //建立隊列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 $q->declare(); //建立交換機對象 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_TOPIC); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 $ex->declare(); $q->bind($e_name, '*.msg-inbox'); $ex->publish($message, 'wonima.msg-inbox');
高級特性:
爲了確保消息可靠性,有兩種處理方式.
1.rabbitmq事務
事務主要是對信道進行設置,示例代碼以下
$channel->startTransaction(); //開始事務 for($i=0; $i<5; ++$i){ $message = "TEST MESSAGE! 測試消息!"; $message = $message.$i."---"; echo "Send Message:".$ex->publish($message, 'xxxx')."\n"; } $channel->commitTransaction(); //提交事務
經測驗,使用事務以後,性能會形成至關大的影響,與不實用事務相比,性能能夠相差百倍以上。
2.confirm 模式
當信道設置未 confirm 模式的時候,每一條消息都會獲的惟一的id。當消費者接收到消息的時候,自動發送 或 手動發送消息 進行消息確認。
//建立隊列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()."\n"; //第一種:自動應答 //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
//第二種:手動應答 $q->consume('processMessage'); /** * 消費回調函數 * 處理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); sleep(2); $myfile = fopen("newfile2.txt", "a+") or die("Unable to open file!"); $txt = $msg.time()."\n"; fwrite($myfile, $txt); fclose($myfile); echo $msg.time()."\n"; //處理消息 $q->ack($envelope->getDeliveryTag()); //手動發送ACK應答 }
應答模式最大的好處是就是異步,執行效率高。事務和應答模式相比,後者使用更加頻繁,前者幾乎沒有見到過。
延遲隊列
首先聲明rabbitmq是不支持延遲隊列的,可是咱們能夠利用死信隊列來完成。
實現延遲隊列也有多種方式:
第一種:設置死信隊列,並將 過時時間 加到隊列裏面
try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { echo 'rabbit-mq 鏈接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//持久化 $exchange->setName($params['exchangeName'] ?: ''); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName'] ?: ''); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'last_exchange', 'x-dead-letter-routing-key' => 'last_route', 'x-message-ttl' => 10000, )); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); $exchange2 = new AMQPExchange($channel); $exchange2->setFlags(AMQP_DURABLE);//持久化 $exchange2->setName('last_exchange'); $exchange2->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $exchange2->declareExchange(); $queue2 = new AMQPQueue($channel); $queue2->setName('last_queue'); $queue2->setFlags(AMQP_DURABLE); $queue2->declareQueue(); $queue2->bind('last_exchange', 'last_queue'); } catch (Exception $e) { } $time = time(); //生成消息 $exchange->publish((string)$time, $params['routeKey'], AMQP_MANDATORY, [ 'delivery_mode' => 2, ]);
第二種:設置死信隊列,並將 過時時間 加到消息裏面,這一種更加自由。
$msg = [ 'x-message-ttl' => 5, 'ttl' => 5, 'body' => time() ]; $msg = json_encode($msg); $exchange->publish($msg, '', AMQP_MANDATORY, ['delivery_mode' => 2]);
第三種:使用延遲插件
集羣
先來談談rabbitmq的集羣是如何運行的
當你開啓來兩個rabbitmq(節點)服務,並將其組成爲一個集羣。每一個節點並不會將全部的隊列進行拷貝,元數據依舊保存在單個節點當中,其餘節點則是經過指針。
舉個例子:節點a和節點b組成了一個集羣,節點a保存着一堆元數據 c 和 元數據d的指針,用來指向節點b,節點b保存一堆元數據d 和 元數據 c的指針,用來指向節點a。
這樣作有兩個緣由
1 存儲空間 :若是一個節點存儲了1gb的數據,再添加節點,只會帶來一摸同樣的1gb的數據,很是浪費磁盤空間
2 性能:對於持久化消息來講,每一條消息都會觸發磁盤io,每次新增節點,網路和磁盤負載都會增長,相對於單機來講,性能不但不會提高,反而可能降低。
可是因爲交換器只是一張查詢表,並不是實際的路由器,所以將交換器在整個集羣進行復制也不會損耗太多的性能,因此交換器在每一個節點都會保存一份,以便於查詢。