RabbitMQ入門:遠程過程調用(RPC)

假如咱們想要調用遠程的一個方法或函數並等待執行結果,也就是咱們一般說的遠程過程調用(Remote Procedure Call)。怎麼辦?dom

今天咱們就用RabbitMQ來實現一個簡單的RPC系統:客戶端發送一個請求消息,服務端以一個響應消息迴應。爲了可以接收到響應,客戶端在發送消息的同時發送一個回調隊列用來告訴服務端響應消息發送到哪一個隊列裏面。也就是說每一個消息一個回調隊列,在此基礎上咱們變下,將回調隊列定義成類的屬性,這個每一個客戶端一個隊列,同一個客戶端的請求共用一個隊列。那麼接下來有個問題,怎麼知道這個隊列裏面的響應消息是屬於哪一個隊列的呢?ide

咱們會用到關聯標識(correlationId),每一個請求咱們都會生成一個惟一的值做爲correlationId,這樣每次有響應消息來的時候,咱們就去看correlationId來肯定究竟是哪一個請求的響應消息,將請求和響應關聯起來。若是收到一個不知道的correlationId,就能夠肯定不是這個客戶端的請求的響應,能夠直接丟棄掉。函數

1、工做模型ui

  1. 客戶端發送啓動後,會建立獨特的回調隊列。對於一個請求發送配置了兩個屬性的消息:一個是回調隊列(圖中的replay_to),一個是correlation。 這個請求會發送到rpc_queue隊列,而後到達服務端處理。
  2. 服務端等待rpc_queue隊列的請求。當有請求到來時,它就會開始幹活並將結果經過發送消息來返回,該返回消息發送到replyTo指定的隊列。
  3. 客戶端將等待回調隊列返回數據。當返回的消息到達時,它將檢查correlation id屬性。若是該屬性值和請求匹配,就將響應返回給程序。

2、代碼實現spa

接下來看代碼實現:線程

  1.  客戶端
    public class RpcClient {
    
        Connection connection = null;
        Channel channel = null;
        //回調隊列:用來接收服務端的響應消息
        String queueName = "";
    
        // 定義RpcClient
        public RpcClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            queueName = channel.queueDeclare().getQueue();
        }
    
        // 真正的處理邏輯
        public String call(String msg) throws IOException, InterruptedException {
            final String uuid = UUID.randomUUID().toString();
            //後續,服務端根據"replyTo"來指定將返回信息寫入到哪一個隊列
            //後續,服務端根據關聯標識"correlationId"來指定返回的響應是哪一個請求的
            AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().replyTo(queueName).correlationId(uuid).build();
    
            channel.basicPublish("", RpcServer.QUEUE_NAME, prop, msg.getBytes());
            final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    if (properties.getCorrelationId().equals(uuid)) {
                        String msg = new String(body, "UTF-8");
    
                        blockQueue.offer(msg);
                        System.out.println("**** rpc client reciver response :[" + msg + "]");
                    }
                }
    
            });
    
            return blockQueue.take();
        }
    
        //關閉鏈接
        public void close() throws IOException {
            connection.close();
        }
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            RpcClient client = new RpcClient();
            client.call("4");
            client.close();
        }
    }

    發送請求的時候,它是生產者;接受響應的時候,它是消費者。3d

  2. 服務端
    public class RpcServer {
    
        //RPC隊列名
        public static final String QUEUE_NAME = "rpc_queue";
    
        //斐波那契數列,用來模擬工做任務
        public static int fib(int num) {
            if (num == 0)
                return 0;
            if (num == 1)
                return 1;
            return fib(num - 1) + fib(num - 2);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                final Channel channel = connection.createChannel();
    
                // 2.declare queue
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
                System.out.println("****** rpc server waiting for client request ......");
    
                // 3.每次只接收一個消息(任務)
                channel.basicQos(1);
                //4.獲取消費實例
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        BasicProperties prop = new BasicProperties().builder().correlationId(properties.getCorrelationId())
                                .build();
                        String resp = "";
                        try {
                            String msg = new String(body, "UTF-8");
                            resp = fib(Integer.valueOf(msg)) + "";
                            System.out.println("*** will response to rpc client :" + resp);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        } finally {
                            channel.basicPublish("", properties.getReplyTo(), prop, resp.getBytes());
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
    
                    }
                };
                // 5.消費消息(處理任務)
                channel.basicConsume(QUEUE_NAME, false, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    接受請求的時候,它是消費者;發送響應的時候,它是生產者。code

  3. 運行服務端,開始等待請求

     

  4. 而後運行客戶端,控制檯log:
    服務端(多了一條打印):
    ****** rpc server waiting for client request ......
    *** will response to rpc client :3
    
    客戶端:
    **** rpc client reciver response :[3]

     

3、小插曲server

剛開始我在寫demo的時候,client中沒有用到阻塞隊列final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);,而是直接這樣寫:blog

@Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(uuid)) {
                    String msg = new String(body, "UTF-8");

                    //blockQueue.offer(msg);
                    System.out.println("**** rpc client reciver response :[" + msg + "]");
                }
            }

指望能打印出結果來,可是運行後發現並無打印:**** rpc client reciver response :[" + msg + "]的值。

緣由是handleDelivery()這個方法是在子線程中運行的,這個子線程運行的時候,主線程會繼續日後執行直到執行了client.close();方法而結束了。

因爲主線程終止了,致使沒有打印出結果。加了阻塞隊列以後將主線程阻塞不執行close()方法,問題就解決了。

相關文章
相關標籤/搜索