譯: 6. RabbitMQ Spring AMQP 之 RPC

Remote procedure call (RPC)

第二篇教程中,咱們學習瞭如何使用工做隊列在多個工做人員之間分配耗時的任務。html

可是若是咱們須要在遠程計算機上運行一個函數並等待結果呢?嗯,這是一個不一樣的故事。此模式一般稱爲遠程過程調用RPCjava

在本教程中,咱們將使用RabbitMQ構建RPC系統:客戶端和可伸縮的RPC服務器。程序員

因爲咱們沒有任何值得分發的耗時任務,咱們將建立一個返回Fibonacci數字的虛擬RPC服務。spring

客戶端界面

爲了說明如何使用RPC服務,咱們將從「Sender」和「Receiver to」Client「和」Server「更改咱們的配置文件的名稱。當咱們調用服務器時,咱們將返回參數的fibonacci咱們打電話給。json

Integer response = (Integer) template.convertSendAndReceive
    (exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");

關於RPC的說明

儘管RPC在計算中是一種很是常見的模式,但它常常受到批評。當程序員不知道函數調用是本地的仍是慢的RPC時,會出現問題。這樣的混淆致使系統不可預測,並增長了調試的沒必要要的複雜性。錯誤使用RPC能夠致使不可維護的代碼,而不是簡化軟件。服務器

考慮到這一點,請考慮如下建議:app

  • 確保明顯哪一個函數調用是本地的,哪一個是遠程的。
  • 記錄您的系統。使組件之間的依賴關係變得清晰。
  • 處理錯誤案例。當RPC服務器長時間停機時,客戶端應該如何反應?

若有疑問,請避免使用RPC。若是能夠,您應該使用異步管道 - 而不是相似RPC的阻塞,將結果異步推送到下一個計算階段。異步

回調隊列

通常來講,經過RabbitMQ進行RPC很容易。客戶端發送請求消息,服務器回覆響應消息。爲了接收響應,咱們須要發送帶有請求的「回調」隊列地址。當咱們使用上面的'convertSendAndReceive()'方法時,Spring-amqp的RabbitTemplate爲咱們處理回調隊列。使用RabbitTemplate時無需進行任何其餘設置。有關詳細說明,請參閱請求/回覆消息ide

消息屬性

AMQP 0-9-1協議預約義了一組帶有消息的14個屬性。大多數屬性不多使用,但如下狀況除外:函數

  • deliveryMode:將消息標記爲持久性(值爲2)或瞬態(任何其餘值)。您可能會記住第二個教程中的這個屬性
  • contentType:用於描述編碼的mime類型。例如,對於常用的JSON編碼,將此屬性設置爲:application / json是一種很好的作法
  • replyTo:一般用於命名回調隊列。
  • correlationId:用於將RPC響應與請求相關聯。

相關ID

Spring-amqp容許您專一於您正在使用的消息樣式,並隱藏支持此樣式所需的消息管道的詳細信息。例如,一般本機客戶端會爲每一個RPC請求建立一個回調隊列。這很是低效,所以另外一種方法是爲每一個客戶端建立一個回調隊列。

這引起了一個新問題,在該隊列中收到響應後,不清楚響應屬於哪一個請求。那是在使用correlationId屬性的時候 Spring-amqp會自動爲每一個請求設置一個惟一值。此外,它還處理將響應與正確的correlationID匹配的詳細信息。

spring-amqp使rpc樣式更容易的一個緣由是,有時您可能但願忽略回調隊列中的未知消息,而不是失敗並出現錯誤。這是因爲服務器端可能存在競爭條件。雖然不太可能,但RPC服務器可能會在向咱們發送答案以後,但在發送請求的確認消息以前死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理請求。spring-amqp客戶端優雅地處理重複的響應,理想狀況下RPC應該是冪等的。

 摘要

 

 

咱們的RPC將這樣工做:

  • Tut6Config將設置一個新的DirectExchange和一個客戶端
  • 客戶端將利用convertSendAndReceive傳遞交換名稱,routingKey和消息。
  • 請求被髮送到rpc_queue(「tut.rpc」)隊列。
  • RPC worker(aka:server)正在等待該隊列上的請求。當請求出現時,它執行任務並使用來自replyTo字段的隊列將帶有結果的消息發送回客戶端
  • 客戶端等待回調隊列上的數據。出現消息時,它會檢查correlationId屬性。若是它與請求中的值匹配,則返回對應用程序的響應。一樣,這是經過RabbitTemplate自動完成的。

 總體來看

啓動類RabbitMq0x06SpringAmqpRpcSampleApplication.java

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner;

@SpringBootApplication
@EnableScheduling
public class RabbitMq0x06SpringAmqpRpcSampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMq0x06SpringAmqpRpcSampleApplication.class, args);
    }
    
    @Profile("usage_message")
    @Bean
    public CommandLineRunner usage() {
        return new CommandLineRunner() {

            @Override
            public void run(String... arg0) throws Exception {
                System.out.println("This app uses Spring Profiles to control its behavior.\n");
                System.out.println("Sample usage: java -jar "
                        + "RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar "
                        + "--spring.profiles.active=rpc"
                        + ",server");
            }
        };
    }
    
    @Profile("!usage_message")
    @Bean
    public CommandLineRunner tutorial() {
        return new RabbitAmqpTutorialsRunner();
    }
}

 

啓動輔助類RabbitAmqpTutorialsRunner.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;

public class RabbitAmqpTutorialsRunner implements CommandLineRunner {

    /**
     * application.properties文件中配置tutorial.client.duration=10000 須要
     * */
    @Value("${tutorial.client.duration:0}")
    private int duration;

    @Autowired
    private ConfigurableApplicationContext ctx;

    @Override
    public void run(String... args) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("Ready ... running for " + duration + "ms");
        Thread.sleep(duration);
        ctx.close();
    }

}

 

配置類

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

import com.xingyun.springamqp.business.Tut6Client;
import com.xingyun.springamqp.business.Tut6Server;

@Profile({ "tut6", "rpc" })
@Configuration
public class Tut6Config {
    @Profile("client")
    private static class ClientConfig {

        @Bean
        public DirectExchange exchange() {
            return new DirectExchange("tut.rpc");
        }

        @Bean
        public Tut6Client client() {
            return new Tut6Client();
        }

    }

    @Profile("server")
    private static class ServerConfig {

        @Bean
        public Queue queue() {
            return new Queue("tut.rpc.requests");
        }

        @Bean
        public DirectExchange exchange() {
            return new DirectExchange("tut.rpc");
        }

        @Bean
        public Binding binding(DirectExchange exchange, Queue queue) {
            return BindingBuilder.bind(queue).to(exchange).with("rpc");
        }

        @Bean
        public Tut6Server server() {
            return new Tut6Server();
        }

    }
}

 

Server 端

import org.springframework.amqp.rabbit.annotation.RabbitListener;

public class Tut6Server {
    
    @RabbitListener(queues = "tut.rpc.requests")
    // @SendTo("tut.rpc.replies") used when the 
    // client doesn't set replyTo.
    public int fibonacci(int n) {
        System.out.println(" [x] Received request for " + n);
        int result = fib(n);
        System.out.println(" [.] Returned " + result);
        return result;
    }

    public int fib(int n) {
        return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
    }
}

 

Client 端

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut6Client {
    
    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange exchange;

    int start = 0;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        System.out.println(" [x] Requesting fib(" + start + ")");
        Integer response = (Integer) template.convertSendAndReceive
            (exchange.getName(), "rpc", start++);
        System.out.println(" [.] Got '" + response + "'");
    }
}

查看用法

java -jar RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar

Client 端

java -jar RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=rpc,client

 

Server 端

java -jar RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=rpc,server

 

相關文章
相關標籤/搜索