(using php-amqplib)php
本教程假設RabbitMQ是安裝在標準端口上運行(5672)。若是您使用不一樣的主機、端口或憑據,則鏈接設置須要調整。html
在哪裏獲得幫助git
若是您在本教程中遇到困難,能夠經過郵件列表與咱們聯繫。github
在前面的教程中,咱們改進了日誌系統。咱們使用的是一種直接廣播方式,而不是隻使用一種直接(direct)廣播方式的fanout交換機,從而得到了有選擇地接收日誌的可能性。segmentfault
雖然使用直接direct
交換機改進了咱們的系統,但它仍然有侷限性——它不能根據多個標準進行路由。工具
在咱們的日誌系統中,咱們可能但願訂閱基於嚴重性的日誌,但也要基於發出日誌的源。你可能從syslog UNIX工具知道這個概念,路由日誌基於嚴重性(info/warn/crit…)和設備(auth/cron/kern…)。ui
這會給咱們很大的靈活性,咱們可能要聽關鍵的錯誤來自kern
, 全部日誌來自kern
」。spa
爲了在日誌系統中實現這一點,咱們須要瞭解一個更復雜的主題topic
交換機。翻譯
發送到一個話題交換機(topic exchange)信息,不能是任意routing_key -它必須是一個單詞的列表,用逗號分隔。這些詞能夠是任何東西,但一般它們指定鏈接到消息的某些特性。一些有效的路由鍵的例子:stock.usd.nyse
、nyse.vmw
、"quick.orange.rabbit"
。在你喜歡的路由鍵中,最多能夠有255個字節的單詞。日誌
綁定鍵也必須是相同的形式。主題交換背後的邏輯相似於一個直接的消息,用特定的路由鍵發送的消息將被髮送到綁定到綁定鍵的全部隊列中。可是有兩個重要的綁定鍵的特殊狀況:
*(星號)能夠代替一個詞。
#(哈希)能夠代替零個或更多的單詞。
在一個例子中解釋這一點是最容易的:
在這個示例中,咱們將發送全部描述動物的消息。消息將用一個包含三個單詞(兩個點)的路由鍵發送。路由鍵中的第一個字將描述速度,第二個顏色和第三個種:<speed>.<colour>.<species>
。
咱們建立三的綁定:Q1綁定綁定鍵*.orange.*
和 Q2 with *.*.rabbit
和 lazy.#
。
這些綁定能夠歸納爲:
Q1對全部的橙色(orange
)動物很感興趣。
Q2想聽關於兔子(rabbits
)的一切,關於懶惰(lazy
)動物的一切。
帶有quick.orange.rabbit
的路由鍵的消息將傳送到兩個隊列中。信息lazy.orange.elephant
也將去他們倆。另外一方面,quick.orange.fox
只會進入第一排,而lazy.brown.fox
只到第二個。lazy.pink.rabbit
將被送到第二個隊列只有一次,即便它匹配兩個綁定。quick.brown.fox
不匹配任何綁定,因此它將被丟棄。
若是咱們違背合同,用一個或四個詞,如orange
或quick.orange.male.rabbit
?那麼,這些消息將不匹配任何綁定並將丟失。
另外一方面,lazy.orange.male.rabbit
,即便它有四個詞,將匹配最後的綁定,並將交付給第二個隊列。
Topic exchange
主題交換(Topic exchange)功能強大,能夠像其餘交換機同樣。
當隊列綁定
#
(hash)綁定鍵-它將收到的全部郵件,無論路由關鍵同樣的fanout交換機。當特殊字符
*
(star)和#
(hash)中不使用綁定,主題交換機會表現的像一個direct交換機。
咱們將在日誌系統中使用主題交換機(topic exchange)。咱們將從一個工做假設開始,假設日誌的路由鍵有兩個詞:<facility>.<severity>
。
代碼與前面的教程幾乎相同。
emit_log_topic.php代碼:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo " [x] Sent ",$routing_key,':',$data," \n"; $channel->close(); $connection->close(); ?>
receive_logs_topic.php代碼:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $binding_keys = array_slice($argv, 1); if( empty($binding_keys )) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
接收全部日誌:
php receive_logs_topic.php "#"
接受全部的日誌來自kern
:
php receive_logs_topic.php "kern.*"
或者,若是您只想聽關於critical
的日誌:
php receive_logs_topic.php "*.critical"
你能夠建立多個綁定:
php receive_logs_topic.php "kern.*" "*.critical"
觸發一個日誌來自路由鍵kern.critical
類型
php emit_log_topic.php "kern.critical" "A critical kernel error"
這些程序讓咱們以爲很好玩。請注意,代碼對路由或綁定鍵不做任何假設,您可能但願使用兩個以上的路由鍵參數。
(所有源碼:emit_log_topic.php 和 receive_logs_topic)
接下來,瞭解如何經過一個遠程過程調用來執行往返消息,你能夠閱讀下一章節:RabbitMQ+PHP 教程六(RPC)。