在第二篇教程中,咱們學習瞭如何使用工做隊列在多個工做人員之間分配耗時的任務。html
可是若是咱們須要在遠程計算機上運行一個函數並等待結果呢?嗯,這是一個不一樣的故事。此模式一般稱爲遠程過程調用或RPC。java
在本教程中,咱們將使用RabbitMQ構建RPC系統:客戶端和可伸縮的RPC服務器。程序員
因爲咱們沒有任何值得分發的耗時任務,咱們將建立一個返回Fibonacci數字的虛擬RPC服務。spring
爲了說明如何使用RPC服務,咱們將從「Sender」和「Receiver to」Client「和」Server「更改咱們的配置文件的名稱。當咱們調用服務器時,咱們將返回參數的fibonacci咱們打電話給。json
Integer response = (Integer) template.convertSendAndReceive (exchange.getName(), "rpc", start++); System.out.println(" [.] Got '" + response + "'");
關於RPC的說明
儘管RPC在計算中是一種很是常見的模式,但它常常受到批評。當程序員不知道函數調用是本地的仍是慢的RPC時,會出現問題。這樣的混淆致使系統不可預測,並增長了調試的沒必要要的複雜性。錯誤使用RPC能夠致使不可維護的代碼,而不是簡化軟件。服務器
考慮到這一點,請考慮如下建議:app
- 確保明顯哪一個函數調用是本地的,哪一個是遠程的。
- 記錄您的系統。使組件之間的依賴關係變得清晰。
- 處理錯誤案例。當RPC服務器長時間停機時,客戶端應該如何反應?
若有疑問,請避免使用RPC。若是能夠,您應該使用異步管道 - 而不是相似RPC的阻塞,將結果異步推送到下一個計算階段。異步
通常來講,經過RabbitMQ進行RPC很容易。客戶端發送請求消息,服務器回覆響應消息。爲了接收響應,咱們須要發送帶有請求的「回調」隊列地址。當咱們使用上面的'convertSendAndReceive()'方法時,Spring-amqp的RabbitTemplate爲咱們處理回調隊列。使用RabbitTemplate時無需進行任何其餘設置。有關詳細說明,請參閱請求/回覆消息。ide
消息屬性
AMQP 0-9-1協議預約義了一組帶有消息的14個屬性。大多數屬性不多使用,但如下狀況除外:函數
- deliveryMode:將消息標記爲持久性(值爲2)或瞬態(任何其餘值)。您可能會記住第二個教程中的這個屬性。
- contentType:用於描述編碼的mime類型。例如,對於常用的JSON編碼,將此屬性設置爲:application / json是一種很好的作法。
- replyTo:一般用於命名回調隊列。
- correlationId:用於將RPC響應與請求相關聯。
Spring-amqp容許您專一於您正在使用的消息樣式,並隱藏支持此樣式所需的消息管道的詳細信息。例如,一般本機客戶端會爲每一個RPC請求建立一個回調隊列。這很是低效,所以另外一種方法是爲每一個客戶端建立一個回調隊列。
這引起了一個新問題,在該隊列中收到響應後,不清楚響應屬於哪一個請求。那是在使用correlationId屬性的時候 。Spring-amqp會自動爲每一個請求設置一個惟一值。此外,它還處理將響應與正確的correlationID匹配的詳細信息。
spring-amqp使rpc樣式更容易的一個緣由是,有時您可能但願忽略回調隊列中的未知消息,而不是失敗並出現錯誤。這是因爲服務器端可能存在競爭條件。雖然不太可能,但RPC服務器可能會在向咱們發送答案以後,但在發送請求的確認消息以前死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理請求。spring-amqp客戶端優雅地處理重複的響應,理想狀況下RPC應該是冪等的。
咱們的RPC將這樣工做:
啓動類RabbitMq0x06SpringAmqpRpcSampleApplication.java
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.EnableScheduling; import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner; @SpringBootApplication @EnableScheduling public class RabbitMq0x06SpringAmqpRpcSampleApplication { public static void main(String[] args) { SpringApplication.run(RabbitMq0x06SpringAmqpRpcSampleApplication.class, args); } @Profile("usage_message") @Bean public CommandLineRunner usage() { return new CommandLineRunner() { @Override public void run(String... arg0) throws Exception { System.out.println("This app uses Spring Profiles to control its behavior.\n"); System.out.println("Sample usage: java -jar " + "RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar " + "--spring.profiles.active=rpc" + ",server"); } }; } @Profile("!usage_message") @Bean public CommandLineRunner tutorial() { return new RabbitAmqpTutorialsRunner(); } }
啓動輔助類RabbitAmqpTutorialsRunner.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ConfigurableApplicationContext; public class RabbitAmqpTutorialsRunner implements CommandLineRunner { /** * application.properties文件中配置tutorial.client.duration=10000 須要 * */ @Value("${tutorial.client.duration:0}") private int duration; @Autowired private ConfigurableApplicationContext ctx; @Override public void run(String... args) throws Exception { // TODO Auto-generated method stub System.out.println("Ready ... running for " + duration + "ms"); Thread.sleep(duration); ctx.close(); } }
配置類
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import com.xingyun.springamqp.business.Tut6Client; import com.xingyun.springamqp.business.Tut6Server; @Profile({ "tut6", "rpc" }) @Configuration public class Tut6Config { @Profile("client") private static class ClientConfig { @Bean public DirectExchange exchange() { return new DirectExchange("tut.rpc"); } @Bean public Tut6Client client() { return new Tut6Client(); } } @Profile("server") private static class ServerConfig { @Bean public Queue queue() { return new Queue("tut.rpc.requests"); } @Bean public DirectExchange exchange() { return new DirectExchange("tut.rpc"); } @Bean public Binding binding(DirectExchange exchange, Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("rpc"); } @Bean public Tut6Server server() { return new Tut6Server(); } } }
Server 端
import org.springframework.amqp.rabbit.annotation.RabbitListener; public class Tut6Server { @RabbitListener(queues = "tut.rpc.requests") // @SendTo("tut.rpc.replies") used when the // client doesn't set replyTo. public int fibonacci(int n) { System.out.println(" [x] Received request for " + n); int result = fib(n); System.out.println(" [.] Returned " + result); return result; } public int fib(int n) { return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2)); } }
Client 端
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; public class Tut6Client { @Autowired private RabbitTemplate template; @Autowired private DirectExchange exchange; int start = 0; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { System.out.println(" [x] Requesting fib(" + start + ")"); Integer response = (Integer) template.convertSendAndReceive (exchange.getName(), "rpc", start++); System.out.println(" [.] Got '" + response + "'"); } }
java -jar RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar
Client 端
java -jar RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=rpc,client
Server 端
java -jar RabbitMQ_0x06_SpringAMQP_RPC_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=rpc,server