白話RabbitMQ(六): RPC

推廣

RabbitMQ專題講座

https://segmentfault.com/l/15...java

CoolMQ開源項目

咱們利用消息隊列實現了分佈式事務的最終一致性解決方案,請你們圍觀。能夠參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面git

聲明RPC接口

爲了闡述RPC咱們先創建一個客戶端接口,它有一個方法,會發起一個RPC請求,並且會一直阻塞直到有結果返回github

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

留意RPC
雖然RPC很常見,但必定要很是當心的使用它,假設rpc調用的是一個很是慢的程序,將致使結果不可預料,並且很是難以調試。json

使用RPC時你能夠參考下列一些規範segmentfault

  1. 系統設計上要有詳細的文檔描述,使組件間的依賴講清晰,作到有據可查
  2. 作好錯誤的異常處理,特別是當RPC服務掛掉或很長時間沒有響應時
  3. 儘可能少用RPC,而使用異步管道,而非阻塞式的RPC,下降系統間的耦合

回調隊列(Callback queue)

用RabbitMQ實現RPC比較簡單,客戶端發起請求,服務端返回對這個請求的響應。爲了實現這個功能咱們須要一個可以"回調"的隊列,咱們直接用默認的隊列便可服務器

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the     callback_queue ...

消息屬性(Message properties)

AMQP 0-9-1 協議爲每一個消息定義了14個屬性,不少屬性不多會被用到,但咱們要特別留意以下幾個app

  1. 分發模式(deliveryMode): 標記一個消息是否須要持久化(persistent)或者是須要事務(transient)等,在第二章中有描述
  2. 消息體類型(contentType): 描述消息中傳遞具體內容的編碼方式,好比咱們常用的JSON能夠設置成:application/json
  3. 消息迴應(replyTo):用於回調隊列
  4. 關係Id(correlationId): 用於將RPC的返回值關聯到對應的請求。

咱們須要引入相應的包負載均衡

import com.rabbitmq.client.AMQP.BasicProperties;

關係Id(Correlation Id)

在前面的方法中咱們爲每個RPC請求都生成了一個隊列,這是徹底沒有必要的,咱們爲每個客戶端創建一個隊列就能夠了。dom

這會引發一個新的問題,由於全部的RPC都是用一個隊列,一旦有消息返回,你怎麼知道返回的消息對應的是哪一個請求呢?因此咱們就用到了Correlation Id,做爲每一個請求獨一無二的標識,當咱們收到返回值後,會檢查這個Id,匹配對應的響應。若是找不到Id所對應的請求,會直接拋棄它。異步

這裏你可能會有疑問,爲何要拋棄掉未知消息呢?而不是拋出異常啥的。這跟咱們服務端的競態條件(possibility of a race condition )會有關係。好比假設咱們RabbitMQ服務掛掉了,它剛給咱們回覆消息,還沒等到迴應,服務器就掛掉了,那麼當RabbitMQ服務重啓時,會重發消息,客戶端會收到一條重複的消息,爲了冥等性的考慮,咱們須要仔細的處理返回後的處理方式。

小結

clipboard.png

RPC工做過程以下

當客戶端啓動時,它會建立一個獨立的匿名回調隊列,而後發送RPC請求,這個RPC
請求會帶兩個屬性:replyTo - RPC調用成功後須要返回的隊列名稱;correlationId - 每一個請求獨一無二的標識。RPC服務提供者會等在隊列上,一旦有請求到達,它會當即響應,把本身的活幹完,而後返回一個結果,根據replyTo返回到對應的隊列。而客戶端也會等着隊列中的信息返回,一旦有一個消息出現,會檢查correlationId,將結果返回給響應的請求發起者

整合

Fibonacci級數

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

咱們定義個一個fibonacci級數,只能接受正整數,並且是效率不怎麼高的那種。
rpc.java以下所示

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection      = factory.newConnection();
            final Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    String response = "";

                    try {
                        String message = new String(body,"UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    }
                    catch (RuntimeException e){
                        System.out.println(" [.] " + e.toString());
                    }
                    finally {
                        channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                        channel.basicAck(envelope.getDeliveryTag(), false);

            // RabbitMq consumer worker thread notifies the RPC server owner thread 
                    synchronized(this) {
                        this.notify();
                    }
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

            // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized(consumer) {
        try {
              consumer.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();          
            }
            }
         }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
        try {
                connection.close();
             } catch (IOException _ignore) {}
         }
    }
}

服務端的代碼比較直接,首先創建鏈接,創建channel以及聲明隊列。咱們以後可能會創建多個消費者,爲了更好的負載均衡,須要在channel.basicQos中設置prefetchCount,而後設置一個basicConsume監聽隊列,提供一個回調函數來處理請求以及返回值

RPCClient.java

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
    }

    public String call(String message) throws IOException, InterruptedException {
        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    public void close() throws IOException {
        connection.close();
    }

    //...
}

客戶端代碼以下,咱們創建一個鏈接,聲明一個'callback'隊列,咱們將會往'callback'隊列提交消息,並接收RPC的返回值,具體步驟以下:

咱們首先生成一個惟一的correlation Id,並保存,咱們將會使用它來區分以後所接受到的信息。而後發出這個消息,消息會包含兩個屬性: replyTo以及collelationId。由於消費消息是在另一個進程中,咱們須要阻塞咱們的進程直到結果返回,使用阻塞隊列BlockingQueue是一種很是好的方式,這裏咱們使用了長度爲1的ArrayBlockQueue,handleDelivery的功能是檢查消息的的correlationId是否是咱們以前所發送的,若是是,將返回值返回到BlockingQueue。此時主線程會等待返回並從ArrayBlockQueue取到返回值

從客戶端發起請求

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

源代碼參考RPCClient.javaRPCServer.java
編譯

javac -cp $CP RPCClient.java RPCServer.java

咱們的rpc服務端好了,啓動服務

java -cp $CP RPCServer
# => [x] Awaiting RPC requests

爲了獲取fibonacci級數咱們只須要運行客戶端:

java -cp $CP RPCClient
# => [x] Requesting fib(30)

以上的實現方式並不是創建RPC請求惟一的方式,可是它有不少優勢:若是一個RPC服務過於緩慢,你能夠很是方便的水平擴展,只須要增長消費者的個數便可,咱們的代碼仍是比較簡單的,有些負責的問題並未解決,好比

  1. 若是服務所有掛了,客戶端要如何處理
  2. 若是服務超時該如何處理
  3. 非法信息該如何處理

基礎章節的內容到此就結束了,到這裏,你就可以基本明白消息隊列的基本用法,接下來咱們能夠進入中級內容內容的學習了。

相關文章
相關標籤/搜索