(using php-amqplib)php
本教程假設RabbitMQ是安裝在標準端口上運行(5672)。若是您使用不一樣的主機、端口或憑據,則鏈接設置須要調整。html
若是您在本教程中遇到困難,能夠經過郵件列表與咱們聯繫。git
在第二個教程中,咱們學習瞭如何使用工做隊列在多個工人之間分配耗時的任務。程序員
可是若是咱們須要在遠程計算機上運行一個函數並等待結果呢?嗯,那是另外一回事了。這種模式一般稱爲遠程過程調用或RPC。github
在本教程中咱們將使用RabbitMQ搭建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。因爲咱們沒有任何值得分配的耗時的任務,因此咱們將建立一個返回Fibonacci
數的模擬一個RPC服務。安全
爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶類。它將公開一個名爲調用的方法,該方法發送一個RPC請求並阻塞直到接收到結果爲止:服務器
$fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo " [.] Got ", $response, "\n";
關於RPC的一些建議
雖然RPC是計算中很是常見的模式,但它常常遭到批評。當程序員不知道函數調用是本地的,或者它是一個緩慢的RPC時,問題就出現了。這樣的混亂致使了不可預知的系統,並給調試增長了沒必要要的複雜性。而簡化軟件,濫用會致使難以維護的RPC代碼。網絡
考慮到這一點,請考慮如下建議:併發
確保很明顯哪一個函數調用是本地調用,而且它是遠程的。
記錄系統。使組件之間的依賴關係清晰。
處理錯誤案例。RPC服務器長時間處於下行狀態時,客戶端應如何響應?
有疑問時避免RPC。若是能夠,則應該使用異步管道,而不是像阻塞這樣的RPC,結果被異步推送到下一個計算階段。異步
通常在RabbitMQ作RPC是容易的。客戶端發送一條請求消息和一個響應消息的服務器回覆。爲了接收響應,咱們須要向請求發送一個「回調」隊列地址。咱們可使用默認隊列。讓咱們試試看:
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array('reply_to' => $queue_name)); $channel->basic_publish($msg, '', 'rpc_queue'); # ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協議(0-9-1 protocol)預約義了一套14個屬性,去一個消息。大多數屬性不多使用,除了如下內容:
delivery_mode
: 將消息標記爲持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。content_type
:用來描述編碼的MIME類型。例如,對於經常使用的JSON編碼,將此屬性設置爲應用程序/ JSON是一個很好的作法。reply_to
:經常使用的名字一個回調隊列。correlation_id
:有助於將RPC響應與請求關聯起來。
在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,但幸運的是有一個更好的方法——讓咱們爲每一個客戶機建立一個回調隊列。
這引起了一個新問題,在隊列中收到了響應,不清楚響應的請求屬於哪一個。那時候correlation_id屬性用於。咱們將把它設置爲每一個請求的惟一值。稍後,當咱們在回調隊列中接收消息時,咱們將查看這個屬性,並在此基礎上,咱們將可以將響應與請求匹配。若是咱們看到一個未知的correlation_id值,咱們能夠安全地忽略信息-它不屬於咱們的請求。
您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是失敗出錯呢?這是因爲服務器端可能出現競爭狀況。雖然不太可能,RPC服務器可能在發送完答案後死亡,但在發出請求的確認消息以前。若是發生這種狀況,從新啓動的RPC服務器將再次處理請求。這就是爲何在客戶機上咱們必須優雅地處理重複響應,而RPC應該理想地是冪等的。
咱們的RPC會像這樣工做:
當客戶端啓動時,它建立一個匿名的獨佔回調隊列。
一個RPC請求,客戶端發送消息,兩個屬性:reply_to
,設置回調隊列和correlation_id
,它被設置爲每一個請求的惟一值。
請求被髮送到一個rpc_queue隊列。
RPC worker(又名:服務器)正在等待該隊列上的請求。當一個請求時,它的工做和發送消息的結果返回給客戶端,使用從reply_to隊列。
客戶機等待回調隊列上的數據。當消息出現時,它檢查correlation_id屬性。若是它與請求的值匹配,則返回對應用程序的響應。
Fibonacci 遞歸源碼:
function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } `` 咱們聲明fibonacci(斐波那契)函數。它只假設有效的正整數輸入。(不要期望這一個能爲大數字工做,並且這多是最慢的遞歸實現)。 咱們的RPC服務器rpc_server.php代碼看起來像這樣:
<?php
require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requestsn";
$callback = function($req) {
$n = intval($req->body); echo " [.] fib(", $n, ")\n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to')); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
服務器代碼至關簡單: 像往常同樣,咱們從創建鏈接、通道和聲明隊列開始。 咱們可能須要運行多個服務器進程。爲了分散負載一樣多的服務器須要設置`prefetch_count`, 設置`$channel.basic_qos`美圓。 咱們用`basic_consume`訪問隊列。而後,咱們進入while循環,在其中等待請求消息,完成工做併發送響應。 咱們rpc_client.php RPC客戶端代碼:
<?php
require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
class FibonacciRpcClient {
private $connection; private $channel; private $callback_queue; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } public function on_response($rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while(!$this->response) { $this->channel->wait(); } return intval($this->response); }
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";
?>
如今是一個很好的時間來讓咱們完整的示例源代碼rpc_client.php和rpc_server.php。 咱們的RPC服務如今準備好了。咱們能夠啓動服務器:
php rpc_server.php
# => [x] Awaiting RPC requests
請求斐波那契數運行客戶機:
php rpc_client.php
# => [x] Requesting fib(30)
``
這裏介紹的設計並非RPC服務的惟一實現,但它有一些重要的要點:
若是RPC服務器太慢,您能夠經過運行另外一個服務器來擴展。試着在一個新的控制檯再運行第一個:rpc_server.php。
在客戶端,RPC只須要發送和接收一條消息。不喜歡queue_declare須要同步調用。所以,RPC客戶機只須要一次RPC請求的一次網絡往返。
咱們的代碼仍然很是簡單,並無試圖解決更復雜(但重要)的問題,例如:
若是沒有服務器運行,客戶端應該如何反應?
客戶端應該對RPC有某種超時嗎?
若是服務器發生故障並引起異常,是否應該轉發給客戶端?
在處理前防止無效傳入消息(如檢查邊界、類型)。
若是您想進行實驗,您可能會發現management UI對於查看隊列很是有用。