RabbitMQ+PHP 教程六(RPC)

(using php-amqplib)php

前提必讀

本教程假設RabbitMQ是安裝在標準端口上運行(5672)。若是您使用不一樣的主機、端口或憑據,則鏈接設置須要調整。html

若是您在本教程中遇到困難,能夠經過郵件列表與咱們聯繫。git

開始

在第二個教程中,咱們學習瞭如何使用工做隊列在多個工人之間分配耗時的任務。程序員

可是若是咱們須要在遠程計算機上運行一個函數並等待結果呢?嗯,那是另外一回事了。這種模式一般稱爲遠程過程調用或RPC。github

在本教程中咱們將使用RabbitMQ搭建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。因爲咱們沒有任何值得分配的耗時的任務,因此咱們將建立一個返回Fibonacci數的模擬一個RPC服務。安全

Client interface

爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶類。它將公開一個名爲調用的方法,該方法發送一個RPC請求並阻塞直到接收到結果爲止:服務器

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

關於RPC的一些建議

雖然RPC是計算中很是常見的模式,但它常常遭到批評。當程序員不知道函數調用是本地的,或者它是一個緩慢的RPC時,問題就出現了。這樣的混亂致使了不可預知的系統,並給調試增長了沒必要要的複雜性。而簡化軟件,濫用會致使難以維護的RPC代碼。網絡

考慮到這一點,請考慮如下建議:併發

確保很明顯哪一個函數調用是本地調用,而且它是遠程的。
記錄系統。使組件之間的依賴關係清晰。
處理錯誤案例。RPC服務器長時間處於下行狀態時,客戶端應如何響應?
有疑問時避免RPC。若是能夠,則應該使用異步管道,而不是像阻塞這樣的RPC,結果被異步推送到下一個計算階段。異步

回調隊列(Callback queue)

通常在RabbitMQ作RPC是容易的。客戶端發送一條請求消息和一個響應消息的服務器回覆。爲了接收響應,咱們須要向請求發送一個「回調」隊列地址。咱們可使用默認隊列。讓咱們試試看:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(
    $payload,
    array('reply_to' => $queue_name));

$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協議(0-9-1 protocol)預約義了一套14個屬性,去一個消息。大多數屬性不多使用,除了如下內容:

delivery_mode: 將消息標記爲持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。
content_type:用來描述編碼的MIME類型。例如,對於經常使用的JSON編碼,將此屬性設置爲應用程序/ JSON是一個很好的作法。
reply_to:經常使用的名字一個回調隊列。
correlation_id:有助於將RPC響應與請求關聯起來。

Correlation Id

在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,但幸運的是有一個更好的方法——讓咱們爲每一個客戶機建立一個回調隊列。

這引起了一個新問題,在隊列中收到了響應,不清楚響應的請求屬於哪一個。那時候correlation_id屬性用於。咱們將把它設置爲每一個請求的惟一值。稍後,當咱們在回調隊列中接收消息時,咱們將查看這個屬性,並在此基礎上,咱們將可以將響應與請求匹配。若是咱們看到一個未知的correlation_id值,咱們能夠安全地忽略信息-它不屬於咱們的請求。

您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是失敗出錯呢?這是因爲服務器端可能出現競爭狀況。雖然不太可能,RPC服務器可能在發送完答案後死亡,但在發出請求的確認消息以前。若是發生這種狀況,從新啓動的RPC服務器將再次處理請求。這就是爲何在客戶機上咱們必須優雅地處理重複響應,而RPC應該理想地是冪等的。

總結

clipboard.png

咱們的RPC會像這樣工做:

當客戶端啓動時,它建立一個匿名的獨佔回調隊列。

一個RPC請求,客戶端發送消息,兩個屬性:reply_to,設置回調隊列和correlation_id,它被設置爲每一個請求的惟一值。

請求被髮送到一個rpc_queue隊列。

RPC worker(又名:服務器)正在等待該隊列上的請求。當一個請求時,它的工做和發送消息的結果返回給客戶端,使用從reply_to隊列。

客戶機等待回調隊列上的數據。當消息出現時,它檢查correlation_id屬性。若是它與請求的值匹配,則返回對應用程序的響應。

彙總

Fibonacci 遞歸源碼:

function fib($n) {
    if ($n == 0)
        return 0;
    if ($n == 1)
        return 1;
    return fib($n-1) + fib($n-2);
}
``
咱們聲明fibonacci(斐波那契)函數。它只假設有效的正整數輸入。(不要期望這一個能爲大數字工做,並且這多是最慢的遞歸實現)。

咱們的RPC服務器rpc_server.php代碼看起來像這樣:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {

if ($n == 0)
    return 0;
if ($n == 1)
    return 1;
return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requestsn";
$callback = function($req) {

$n = intval($req->body);
echo " [.] fib(", $n, ")\n";

$msg = new AMQPMessage(
    (string) fib($n),
    array('correlation_id' => $req->get('correlation_id'))
    );

$req->delivery_info['channel']->basic_publish(
    $msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
    $req->delivery_info['delivery_tag']);

};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();
$connection->close();

?>

服務器代碼至關簡單:

像往常同樣,咱們從創建鏈接、通道和聲明隊列開始。

咱們可能須要運行多個服務器進程。爲了分散負載一樣多的服務器須要設置`prefetch_count`, 設置`$channel.basic_qos`美圓。

咱們用`basic_consume`訪問隊列。而後,咱們進入while循環,在其中等待請求消息,完成工做併發送響應。

咱們rpc_client.php RPC客戶端代碼:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class FibonacciRpcClient {

private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;

public function __construct() {
    $this->connection = new AMQPStreamConnection(
        'localhost', 5672, 'guest', 'guest');
    $this->channel = $this->connection->channel();
    list($this->callback_queue, ,) = $this->channel->queue_declare(
        "", false, false, true, false);
    $this->channel->basic_consume(
        $this->callback_queue, '', false, false, false, false,
        array($this, 'on_response'));
}
public function on_response($rep) {
    if($rep->get('correlation_id') == $this->corr_id) {
        $this->response = $rep->body;
    }
}

public function call($n) {
    $this->response = null;
    $this->corr_id = uniqid();

    $msg = new AMQPMessage(
        (string) $n,
        array('correlation_id' => $this->corr_id,
              'reply_to' => $this->callback_queue)
        );
    $this->channel->basic_publish($msg, '', 'rpc_queue');
    while(!$this->response) {
        $this->channel->wait();
    }
    return intval($this->response);
}

};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";

?>

如今是一個很好的時間來讓咱們完整的示例源代碼rpc_client.php和rpc_server.php。

咱們的RPC服務如今準備好了。咱們能夠啓動服務器:

php rpc_server.php
# => [x] Awaiting RPC requests

請求斐波那契數運行客戶機:

php rpc_client.php
# => [x] Requesting fib(30)
``
這裏介紹的設計並非RPC服務的惟一實現,但它有一些重要的要點:

若是RPC服務器太慢,您能夠經過運行另外一個服務器來擴展。試着在一個新的控制檯再運行第一個:rpc_server.php。

在客戶端,RPC只須要發送和接收一條消息。不喜歡queue_declare須要同步調用。所以,RPC客戶機只須要一次RPC請求的一次網絡往返。

咱們的代碼仍然很是簡單,並無試圖解決更復雜(但重要)的問題,例如:

若是沒有服務器運行,客戶端應該如何反應?

客戶端應該對RPC有某種超時嗎?

若是服務器發生故障並引起異常,是否應該轉發給客戶端?

在處理前防止無效傳入消息(如檢查邊界、類型)。

若是您想進行實驗,您可能會發現management UI對於查看隊列很是有用。

相關文章
相關標籤/搜索