主題模式與路由模式相似,發送到 topic
交換機的路由鍵必須是一個單詞列表,由點號分隔。php
例如:stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。算法
路由鍵中能夠有任意多個單詞,最多 255 個字節。json
同時綁定鍵也必須使用相同的形式。composer
# composer.json { "require": { "php-amqplib/php-amqplib": ">=3.0" } } > composer.phar install
topic
交換機背後的路由算法相似於 direct
交換,使用特定路由鍵發送的消息將被傳遞到使用匹配綁定鍵綁定的全部隊列。函數
綁定鍵有兩個重要的特殊狀況:ui
上圖的綁定能夠總結爲:spa
路由鍵設置爲 quick.orange.rabbit
的消息將發送給兩個隊列。code
lazy.orange.elephant
也會發送給兩個隊列。rabbitmq
quick.orange.fox
只會進入第一個隊列。隊列
lazy.brown.fox
只會進入第二個隊列。
lazy.pink.rabbit
只會進入第二個隊列一次,即便命中了兩次綁定規則。
quick.brown.fox
不會進入任何隊列。
orange
和 quick.orange.male.rabbit
不會進入任何隊列,會被丟棄。
lazy.orange.male.rabbit
會進入第二個隊列,命中了最後一個規則。
主題交換機功能強大,支持其餘全部交換機的功能。
當隊列與 \#(井號)綁定鍵綁定時將接收全部消息,和
fanout
交換機同樣。當綁定中不使用特殊字符 *(星號)和 \#(井號)時,和
direct
交換機同樣。
生產者鏈接到RabbitMQ,發送一條消息,而後退出。
# send.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定義一個名爲 topic_logs 的 topic 交換機 $channel->exchange_declare('topic_logs', 'topic', false, false, false); // 從參數中獲取交換機的路由鍵 $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if (empty($data)) { $data = "Hello World!"; } // $msg = new AMQPMessage($data); // 經過名爲 topic_logs 的 topic 交換機發送消息到隊列 (消息內容, 交換機, 路由鍵); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo ' [x] Sent ', $routing_key, ':', $data, "\n"; $channel->close(); $connection->close();
消費者監聽來自RabbitMQ的消息,一般須要一直保持運行狀態以監聽消息。
# receive.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立鏈接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 建立通道 $channel = $connection->channel(); // 定義一個名爲 topic_logs 的 topic 交換機 $channel->exchange_declare('topic_logs', 'topic', false, false, false); // 建立一個隨機命名的新隊列,第三個參數爲關閉隊列持久化,第四個參數爲當聲明它的鏈接關閉時隊列會被自動刪除 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); // 定義全部的綁定鍵 $binding_keys = array_slice($argv, 1); if (empty($binding_keys)) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } // 將隨機命名的隊列綁定到 topic 交換機,生產者向交換機發送消息將被放到綁定的隊列中 foreach ($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo " [*] Waiting for logs. To exit press CTRL+C\n"; // 定義回調函數 $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; // 第四個參數設爲true開啓自動消息確認,即投遞消息後馬上標記爲刪除 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();
打開一個終端,運行消費者,接收全部消息:
php receive.php "#"
打開另外一個終端,運行消費者,接收 kern.*
的消息:
php receive.php "kern.*"
打開另外一個終端,運行消費者,接收 *.critical
的消息:
php receive.php "*.critical"
打開另外一個終端,運行消費者,接收 kern.*
和 *.critical
的消息:
php receive.php "kern.*" "*.critical"
打開另外一個終端,運行生產者,發送一條 kern.critical
消息:
php send.php "kern.critical" "A critical kernel error"
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings