https://segmentfault.com/l/15...java
咱們利用消息隊列實現了分佈式事務的最終一致性解決方案,請你們圍觀。能夠參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面git
爲了闡述RPC咱們先創建一個客戶端接口,它有一個方法,會發起一個RPC請求,並且會一直阻塞直到有結果返回github
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
留意RPC
雖然RPC很常見,但必定要很是當心的使用它,假設rpc調用的是一個很是慢的程序,將致使結果不可預料,並且很是難以調試。json
使用RPC時你能夠參考下列一些規範segmentfault
用RabbitMQ實現RPC比較簡單,客戶端發起請求,服務端返回對這個請求的響應。爲了實現這個功能咱們須要一個可以"回調"的隊列,咱們直接用默認的隊列便可服務器
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
AMQP 0-9-1 協議爲每一個消息定義了14個屬性,不少屬性不多會被用到,但咱們要特別留意以下幾個app
咱們須要引入相應的包負載均衡
import com.rabbitmq.client.AMQP.BasicProperties;
在前面的方法中咱們爲每個RPC請求都生成了一個隊列,這是徹底沒有必要的,咱們爲每個客戶端創建一個隊列就能夠了。dom
這會引發一個新的問題,由於全部的RPC都是用一個隊列,一旦有消息返回,你怎麼知道返回的消息對應的是哪一個請求呢?因此咱們就用到了Correlation Id,做爲每一個請求獨一無二的標識,當咱們收到返回值後,會檢查這個Id,匹配對應的響應。若是找不到Id所對應的請求,會直接拋棄它。異步
這裏你可能會有疑問,爲何要拋棄掉未知消息呢?而不是拋出異常啥的。這跟咱們服務端的競態條件(possibility of a race condition )會有關係。好比假設咱們RabbitMQ服務掛掉了,它剛給咱們回覆消息,還沒等到迴應,服務器就掛掉了,那麼當RabbitMQ服務重啓時,會重發消息,客戶端會收到一條重複的消息,爲了冥等性的考慮,咱們須要仔細的處理返回後的處理方式。
RPC工做過程以下
當客戶端啓動時,它會建立一個獨立的匿名回調隊列,而後發送RPC請求,這個RPC
請求會帶兩個屬性:replyTo - RPC調用成功後須要返回的隊列名稱;correlationId - 每一個請求獨一無二的標識。RPC服務提供者會等在隊列上,一旦有請求到達,它會當即響應,把本身的活幹完,而後返回一個結果,根據replyTo返回到對應的隊列。而客戶端也會等着隊列中的信息返回,一旦有一個消息出現,會檢查correlationId,將結果返回給響應的請求發起者
Fibonacci級數
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
咱們定義個一個fibonacci級數,只能接受正整數,並且是效率不怎麼高的那種。
rpc.java以下所示
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
服務端的代碼比較直接,首先創建鏈接,創建channel以及聲明隊列。咱們以後可能會創建多個消費者,爲了更好的負載均衡,須要在channel.basicQos中設置prefetchCount,而後設置一個basicConsume監聽隊列,提供一個回調函數來處理請求以及返回值
RPCClient.java
import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //... }
客戶端代碼以下,咱們創建一個鏈接,聲明一個'callback'隊列,咱們將會往'callback'隊列提交消息,並接收RPC的返回值,具體步驟以下:
咱們首先生成一個惟一的correlation Id,並保存,咱們將會使用它來區分以後所接受到的信息。而後發出這個消息,消息會包含兩個屬性: replyTo以及collelationId。由於消費消息是在另一個進程中,咱們須要阻塞咱們的進程直到結果返回,使用阻塞隊列BlockingQueue是一種很是好的方式,這裏咱們使用了長度爲1的ArrayBlockQueue,handleDelivery的功能是檢查消息的的correlationId是否是咱們以前所發送的,若是是,將返回值返回到BlockingQueue。此時主線程會等待返回並從ArrayBlockQueue取到返回值
從客戶端發起請求
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
源代碼參考RPCClient.java 和 RPCServer.java
編譯
javac -cp $CP RPCClient.java RPCServer.java
咱們的rpc服務端好了,啓動服務
java -cp $CP RPCServer # => [x] Awaiting RPC requests
爲了獲取fibonacci級數咱們只須要運行客戶端:
java -cp $CP RPCClient # => [x] Requesting fib(30)
以上的實現方式並不是創建RPC請求惟一的方式,可是它有不少優勢:若是一個RPC服務過於緩慢,你能夠很是方便的水平擴展,只須要增長消費者的個數便可,咱們的代碼仍是比較簡單的,有些負責的問題並未解決,好比
基礎章節的內容到此就結束了,到這裏,你就可以基本明白消息隊列的基本用法,接下來咱們能夠進入中級內容內容的學習了。