在上篇教程中,咱們搭建了一個工做隊列。每一個任務之分發給一個工做者(worker)。在本篇教程中,咱們要作的以前徹底不同——分發一個消息給多個消費者(consumers)。這種模式被稱爲「發佈/訂閱」。php
爲了描述這種模式,咱們將會構建一個簡單的日誌系統。它包括兩個程序——第一個程序負責發送日誌消息,第二個程序負責獲取消息並輸出內容。web
在咱們的這個日誌系統中,全部正在運行的接收方程序都會接受消息。咱們用其中一個接收者(receiver)把日誌寫入硬盤中,另一個接受者(receiver)把日誌輸出到屏幕上。服務器
最終,日誌消息被廣播給全部的接受者(receivers)。spa
前面的教程,咱們發送消息到隊列並從中取出消息。如今是時候介紹RabbitMq中完整的消息模型了。日誌
讓咱們簡單的歸納一下以前的教程:code
RabbitMQ消息模型的核心理念是:發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。教程
發佈者(producer)只須要把消息發送給一個交換器(exchange)。交換器很是簡單,它一邊從發佈者方接收消息,一邊把消息推入隊列。交換器必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是是多個隊列,或者是直接忽略消息。這些規則是經過exchange type來定義的。rabbitmq
有幾個可供選擇的交換器類型:AMQPEXTYPEDIRECT, AMQPEXTYPEFANOUT,AMQPEXTYPEHEADER or AMQPEXTYPETOPIC。咱們在這裏主要說明AMQPEXTYPE_FANOUT。先建立一個fanout類型的交換器,命名爲logs:隊列
$exchange->setName('logs'); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare();
fanout交換器很簡單,你可能從名字上就能猜想出來,它把消息發送給它所知道的全部隊列。這正是咱們的日誌系統所須要的。字符串
交換器列表
rabbitmqctl可以列出服務器上全部的交換器:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.
這個列表中有一些叫作amq.*的交換器。這些都是默認建立的,不過這時候你還不須要使用他們。
匿名的交換器
前面的教程中咱們對交換器一無所知,但仍然可以發送消息到隊列中。由於咱們使用了命名爲空字符串(「」)默認的交換器。
回想咱們以前是如何發佈一則消息:
``` $exchange->publish($message, $routeKey);
```
exchange參數就是交換器的名稱。空字符串表明默認或者匿名交換器:消息將會根據指定的routing_key分發到指定的隊列。
在PHP的AMQP中若是exchange設置爲匿名的話,是報錯的:PHP Fatal error: Uncaught exception ‘AMQPExchangeException’ with message ‘Invalid exchange name given, must be between 1 and 255 characters long.’
如今,咱們就能夠發送消息到一個具名交換器了:
$exchange->publish($message, '');
你還記得以前咱們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——咱們須要把工做者(workers)指向正確的隊列。若是你打算在發佈者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。
可是這並不適用於咱們的日誌系統。咱們打算接收全部的日誌消息,而不只僅是一小部分。咱們關心的是最新的消息而不是舊的。爲了解決這個問題,咱們須要作兩件事情。
首先,當咱們鏈接上RabbitMQ的時候,咱們須要一個全新的、空的隊列。咱們能夠手動建立一個隨機的隊列名,或者讓服務器爲咱們選擇一個隨機的隊列名(推薦)。咱們只要在調用$queue->declare();方法的時候,不提供queue參數就能夠了:
$queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare();
這時候咱們能夠經過$queue->getName();得到已經生成的隨機隊列名。它多是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,當與消費者(consumer)斷開鏈接的時候,這個隊列應當被刪除。咱們可使用exclusive標識。
$queue->setFlags(AMQP_EXCLUSIVE);
咱們已經建立了一個fanout類型的交換器和一個隊列。如今咱們須要告訴交換器如何發送消息給咱們的隊列。交換器和隊列之間的關係咱們稱之爲綁定(binding)。
$queue->bind($exchangeName, $queue->getName());
如今,logs交換器將會把消息添加到咱們的隊列中。
綁定列表。
你可使用rabbitmqctl list_bindings隊列出全部存在的綁定。.
發佈日誌消息的程序看起來和以前的沒有太大區別。最重要的改變就是咱們把消息發送給logs交換器而不是匿名交換器。在發送的時候咱們須要提供routingkey參數,可是它的值會被fanout交換器忽略。如下是emitlog.php腳本:
<?php /** * PHP amqp(RabbitMQ) Demo-3 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'logs'; $message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1]; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare(); $exchange->publish($message, ''); var_dump("[x] Sent $message"); $connection->disconnect();
正如你看到的那樣,在鏈接成功以後,咱們聲明瞭一個交換器,這一個是很重要的,由於不容許發佈消息到不存在的交換器。
若是沒有綁定隊列到交換器,消息將會丟失。但這個沒有所謂,若是沒有消費者監聽,那麼消息就會被忽略。
receive_logs.php的代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-3 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'logs'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare(); $queue->bind($exchangeName, ''); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); $queue->nack($envelope->getDeliveryTag()); }
這樣咱們就完成了。若是你想把日誌保存到文件中,只須要打開控制檯輸入:
$ php receive_logs.php > logs_from_rabbit.log
若是你想在屏幕中查看日誌,那麼打開一個新的終端而後運行:
$ php receive_logs.php
固然還要發送日誌:
$ php emit_log.php
使用rabbitmqctl listbindings你可確認已經建立了隊列的綁定。你能夠看到運行中的兩個receivelogs.php程序:
$ sudo rabbitmqctl list_bindings Listing bindings ... ... logs amq.gen-TJWkez28YpImbWdRKMa8sg== [] logs amq.gen-x0kymA4yPzAT6BoC/YP+zw== [] ...done.
顯示結果很直觀:logs交換器把數據發送給兩個系統命名的隊列。這就是咱們所指望的。
如何監聽消息的子集呢?讓咱們移步教程4