消息隊列——RabbitMQ學習筆記

1. 寫在前面

昨天簡單學習了一個消息隊列項目——RabbitMQ,今天趁熱打鐵,將學到的東西記錄下來。html

學習的資料主要是官網給出的6個基本的消息發送/接收模型,或者稱爲6種不一樣的使用場景,本文即是對這6種模型加以敘述。java

2. Tutorials

在學習6種模型以前,咱們首先須要安裝RabbitMQ。RabbitMQ支持多種系統平臺,各平臺的安裝方法能夠點此查看。安裝好以後,咱們使用以下命令啓用Web端的管理插件:rabbitmq-plugins enable rabbitmq_management,而後啓動RabbitMQ。接着用瀏覽器訪問http://localhost:15672/,若能看到RabbitMQ相關Web頁面,說明啓動成功。web

2.1 Hello World

正所謂萬事開頭難,咱們先從最簡單的Hello World開始。首先固然是新建一個項目,導入RabiitMQ相關jar。我採用Maven來構建項目,所以只須要在pom文件中添加以下依賴:瀏覽器

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

接下來學習最簡單的消息隊列模型,以下圖:服務器

Hello World 模型

在圖中,P表明producer,它是消息的生產者C表明consumer,它是消息的消費者;而紅色的矩形正是咱們所謂的消息隊列,它位於RabbitMQ中(RabbitMQ中能夠有不少這樣的隊列,而且每一個隊列都有一個惟一的名字)。生產者(們)能夠將消息發送到消息隊列中,消費者(們)能夠從消息隊列中取出消息。dom

這種模型是否是很簡單呢?下面咱們使用Java,藉助於RabbitMQ來實現這種模型的消息通訊。異步

首先咱們介紹如何send消息到消息隊列。send以前,固然是和RabbitMQ服務器創建鏈接:分佈式

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

接下來咱們建立一個channel,大多數API都是經過這個對象來調用的:ide

Channel channel = connection.createChannel();

以後,咱們即可以調用channel的以下方法去聲明一個隊列:單元測試

channel.queueDeclare("hello", false, false, false, null);

該方法的第一個參數是隊列的名稱,其他的參數先無論,以後會介紹。咱們能夠嘗試着去執行以上的5行代碼,而後打開Web端,能夠看到新建了一個叫做hello的隊列:

有了隊列,咱們即可以向其中發送消息了,一樣仍是調用channel對象的API:

channel.basicPublish("", "hello", null, "Hello World".getBytes());

以上代碼所作的事情就是發送了一條字符串消息「Hello World」(第4個參數)到消息隊列。你可能注意到咱們調用了String對象的getBytes方法,沒錯,咱們發送的實際上二進制數據。所以,理論上你可以發送任何數據到消息隊列中,而不只僅是文本信息。

第2個參數叫作路由鍵(routingKey),在該模型下必須與隊列名相同,至於爲何,和其餘參數同樣,以後會了解到。

咱們能夠修改發送的文本,再次執行上述代碼,而後打開Web端查看,即可以查看到咱們發送的消息:

點擊上圖的name字段下的hello,能夠查看hello隊列中的具體信息:

接下來,咱們去嘗試着去獲取生產者發送的消息,和send方法同樣,咱們一樣須要鏈接服務器,建立channel,聲明隊列:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);

以後咱們能夠調用channel的相關方法去監聽隊列,接收消息:

channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }
});

以上basicConsume方法中,第一個參數是隊列的名字;第二個參數表示是否自動確認消息的接收狀況,咱們使用true,自動確認;第三個參數須要傳入一個實現了Consumer接口的對象,咱們簡單的new一個默認的Consumer的實現類DefaultConsumer,而後在handleDelivery方法中去處理接收到的消息(handleDelivery方法會在接收到消息時被回調)。

運行以上代碼,咱們能夠打印出以前向隊列中send的數據:

Hello World
Hello World2

下面是Hello World的完整代碼:

public class App {

    @Test
    public void send() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);

        channel.basicPublish("", "hello", null, "Hello World2".getBytes());

        channel.close();
        connection.close();
    }

    @Test
    public void receive() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);

        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body, "UTF-8"));
            }

        });
        synchronized (this){
            // 由於以上接收消息的方法是異步的(非阻塞),當採用單元測試方式執行該方法時,程序會在打印消息前結束,所以使用wait來防止程序提早終止。若使用main方法執行,則不須要擔憂該問題。
            wait();
        }
    }
}

2.2 Work queues

接下來咱們學習第二種模型——Work Queues。顧名思義,這種模型描述的是一個生產者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務),以下圖所示:

下面咱們用代碼去實現。先是生產者send消息到隊列,此次咱們多發送些數據:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

    for (int i = 0; i < 9; i++) {
        channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());
    }

    channel.close();
    connection.close();
}

而後是消費者接收數據:

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);

    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
            try {
            //  Thread.sleep(1000);
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    synchronized (this) {
        wait();
    }
}

代碼基本上和Hello World的代碼同樣,只是加上句sleep來模擬消費者(worker)處理消息所花的時間。

咱們能夠先執行三次receive方法(修改sleep的時間,其中消費者1 sleep 10s,消費者2,3 sleep 1s),讓三個消費者(worker)一塊兒等待消息的到來,而後執行send方法發送9條消息,觀察三個消費者收到的消息狀況。

若不出意外,你會看到以下的打印結果:

// --------消費者1--------
0
// 10s 後
3
// 10s 後
6
// --------消費者2--------
1
// 1s 後
4
// 1s 後
7
// --------消費者3--------
2
// 1s 後
5
// 1s 後
8

經過打印結果,咱們能夠總結出Work queues的幾個特色:

  1. 一條消息只會被一個消費者接收;
  2. 消息是平均分配給消費者的;
  3. 消費者只有在處理完某條消息後,纔會收到下一條消息。

事實上,RabbitMQ會循環地(一個接一個地)發送消息給消費者,這種分配消息的方式被稱爲round-robin(輪詢)

2.2.1 消息確認

看到這裏,不知你是否會擔憂:因爲worker(消費者)執行任務須要必定的時間(以上用sleep模擬),要是某個worker在運行過程當中掛掉,那分配給它的任務豈不是丟失了(永遠不可能被執行了)。爲解決這個問題,RabbitMQ提供了消息確認機制,即worker須要主動的去確認消息已經接收了,RabbitMQ才認爲消息被「真正地接收了」,實現代碼以下:

// send的代碼不用變,只需改變basicConsume的第二個參數爲false,表示不要自動確認
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body, "UTF-8"));
        try {
            // 這裏把時間加長了一點便於測試
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 這裏手動地肯定
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
});

下面作測試。首先執行send方法,發送9條消息到隊列,查看web端狀況以下:

此時隊列中有9條未被分發的消息。接着運行改變後的receive方法,而後快速地去Web端查看隊列中的消息狀況(記得刷新):

發現隊列中沒有待分發(Ready字段)的消息了,而有9條未被確認(Unacked字段)的消息。但控制檯打印出數字6時,關閉程序,再次去web端查看:

此時隊列中又有3條待分發的消息了。緣由正是因爲咱們提早終止了receive方法的執行,致使最後3條消息沒有被確認而被從新歸還到Ready中。

2.2.2 消息持久化

若是你不是一次性跟着本文運行代碼到這裏,而是次日接着昨天的工做繼續進行,你可能會發現昨天你建立的隊列和添加到隊列裏的消息沒有了。極可能的緣由就是消息沒有持久化,即按照以上代碼運行生成的隊列和添加到隊列中的消息都是儲存在內存中的,RabbitMQ一旦關閉它們就沒有了。若是你想將下次啓動時還能看到關閉前的消息,你應該將其持久化:

// 將第二個參數設爲true,表示聲明一個須要持久化的隊列。
// 須要注意的是,若你已經定義了一個非持久的,同名字的隊列,要麼將其先刪除(否則會報錯),要麼換一個名字。
channel.queueDeclare("hello", true, false, false, null);
// 修改了第三個參數,這是代表消息須要持久化
channel.basicPublish("", "hello",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

總的來講,Work queues(Task Queuess)的概念在一些Web場景的應用中是頗有用的,好比咱們可以用它來構建一個master-slave結構的分佈式爬蟲系統:系統中有一個master節點和多個slave節點,master節點負責向各個slave節點分配爬取任務。

2.3 Publish/Subscribe

但有些時候,咱們可能但願一條消息可以被多個消費者接受到,好比一些公告信息等,這時候用Work Queue模型顯然不合適,而Publish/Subscribe模型正是對應這種使用場景的。

在介紹Publish/Subscribe以前,咱們快速回顧以前的兩個模型,它們好像都是生產者將消息直接發送到消息隊列,但其實不是這樣的,甚至有可能生產者根本就不知道消息發送到了哪個消息隊列。

先彆着急,下面咱們完整地介紹RabbitMQ消息發送/接受的方式。

事實上,生產者是把消息發送到了交換機(exchange)中,而後交換機負責(決定)將消息發送到(哪個)消息隊列中。其模型以下圖:

這時候你可能會疑惑:既然消息是被髮送到了交換機中,那咱們以前發送的消息是被髮送到了哪個交換機中了?它有沒有機制可以讓特定的消息發送到指定的隊列?

先回答第一個問題。還記得咱們在Hello World中寫的發送消息的代碼嗎?

channel.basicPublish("", "hello", null, message.getBytes());

事實上第一個參數即是指定交換機的名字,即指定消息被髮送到哪個交換機。空字符串表示默認交換機(Default Exchange),即咱們以前發送的消息都是先發送到默認交換機,而後它再路由到相應的隊列中。其實咱們能夠經過Web頁面去查看全部存在的交換機:

接着回答第二個問題。路由的依據即是經過第二個參數——路由鍵(routing key)指定的,以前已經提到過。在以前代碼中,咱們指定第二個參數爲"hello",即是指定消息應該被交換機路由到路由鍵爲hello的隊列中。而默認交換機(Default Exchange)有一個很是有用的性質:

每個被建立的隊列都會被自動的綁定到默認交換機上,而且路由鍵就是隊列的名字。

交換機還有4種不一樣的類型,分別是directfanouttopicheaders,每種類型路由的策略不一樣。

direct類型的交換機要求和它綁定的隊列帶有一個路由鍵K,如有一個帶有路由鍵R的消息到達了交換機,交換機會將此消息路由到路由鍵K = R的隊列。默認交換機即是該類型。所以,在下圖中,消息會沿着綠色箭頭路由:

fanout類型的交換機會路由每一條消息到全部和它綁定的隊列,忽略路由鍵。

剩下的兩種類型以後再作介紹。

在以上概念基礎上,咱們來看第3種消息模型:Publish/Subscribe。以下圖:

該模型是要讓全部的消費者都可以接收到每一條消息。顯然,fanout類型的交換機更符合咱們當前的需求。爲此,先建立一個fanout類型的交換機。

channel.exchangeDeclare("notice", "fanout");

其中,第一個參數是交換機的名稱;第二個參數是交換機的類型。

而後咱們能夠send消息了:

channel.basicPublish( "notice", "", null, message.getBytes());

對於消費者,咱們須要爲每個消費者建立一個獨立的隊列,而後將隊列綁定到剛纔指定的交換機上便可:

// 該方法會建立一個名稱隨機的臨時隊列
String queueName = channel.queueDeclare().getQueue();
// 將隊列綁定到指定的交換機("notice")上
channel.queueBind(queueName, "notice", "");

如下完整的代碼:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice", "fanout");
    channel.basicPublish( "notice", "", null, "Hello China".getBytes());
    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "notice", "");

    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }

    });
    synchronized (this) {
        wait();
    }
}

首先運行兩次receive方法,讓兩個消費者等待接收消息,而後能夠在Web端查看此時的隊列狀況,以下圖所示:

能夠看到圖中有兩個名稱隨機的隊列。接着運行send方法發送一條消息,最終咱們會看到兩個消費者都打印出了Hello China。而後中止虛擬機讓消費者斷開鏈接,再次在Web端查看隊列狀況,會發現剛纔的兩個隊列被自動刪除了。

2.4 Routing

以上是Publish/Subscribe模式,它已經能讓咱們的通知(notice)系統正常運轉了。如今再考慮這樣一個新需求:對於一些機密通知,咱們只想讓部分人看到。這就要求交換機對綁定在其上的隊列進行篩選,因而引出了又一個新的模型:Routing

以前咱們說過,對於direct類型的交換機,它會根據routing key進行路由,所以咱們能夠藉助它來實現咱們的需求,模型結構以下圖:

下面用代碼來實現。先看生產者。

首先要聲明一個direct類型的交換機:

// 這裏名稱改成notice2
channel.exchangeDeclare("notice2", "direct");

須要注意的是,由於咱們以前聲明瞭一個fanout類型的名叫notice的交換機,所以不能再聲明一個同名的類型卻不同的交換機。

而後能夠發送消息了,咱們發送10條消息,其中偶數條消息是祕密消息,只能被routing key 爲s的隊列接受,其他的消息全部的隊列都能接受。

for (int i = 0; i < 10; i++) {
            String routingKey = "n"; // normal
            if (i % 2 == 0) {
                routingKey = "s"; // secret
            }
            channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
        }

接下來是消費者:

// 聲明一個名稱隨機的臨時的隊列
String queueName = channel.queueDeclare().getQueue();
// 綁定交換機,同時帶上routing key
channel.queueBind(queueName, "notice2", "n");
// 消費者2號運行時,打開如下注釋
// channel.queueBind(queueName, "notice2", "s");

注意,咱們能夠屢次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不一樣,這樣可使一個隊列帶有多個routing key

如下是完整代碼:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("notice2", "direct");
    for (int i = 0; i < 10; i++) {
        String routingKey = "n"; // normal
        if (i % 2 == 0) {
            routingKey = "s"; // secret
        }
        channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
    }
    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("notice2", "direct");

    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "notice2", "n");
    // channel.queueBind(queueName, "notice2", "s");

    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }
    });
    synchronized (this) {
        wait();
    }
}

測試時,咱們能夠先運行一個receive,而後打開channel.queueBind(queueName, "notice2", "s")註釋,再運行一次receive,這樣就有兩個消費者綁定到notice2交換機上,其中消費者1只能收到normal類型的消息,而消費者2既能收到normal類型的消息,又能收到secret類型的消息。接着能夠運行send方法。如不出意外,能夠看到以下打印結果:

// 消費者1
1
3
5
7
9
// 消費者2
0
1
2
3
4
5
6
7
8
9

2.5 Topic

有了以上的改進,咱們的notice系統基本ok了。但有些時候,咱們還須要更加靈活的消息刷選方式。好比咱們對於電影信息,咱們可能須要對它的地區,類型,限制級進行篩選。這時候就要藉助Topics模型了。

Topics模型中,咱們「升級」了routing key,它能夠由多個關鍵詞組成,詞與詞之間由點號(.)隔開。特別地,規定*表示任意的一個詞;#號表示任意的0個或多個詞

假設咱們如今須要接收電影信息,每條電影消息附帶的routingKey有地區、類型、限制級3個關鍵字,即:district.type.age。如今想實現的功能以下圖:

如上圖所示,隊列Q1只關心美國適合13歲以上的電影信息,隊列Q2對動做片感興趣,而隊列Q3喜歡中國電影。

下面用Java代碼去實現上述功能,相較於以前基本上沒有什麼改變,下面直接給出代碼:

@Test
public void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("movie", "topic");

    channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());
    channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());
    channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());

    channel.basicPublish("movie", "Chinese.action.13", null, "臥虎藏龍".getBytes());
    channel.basicPublish("movie", "Chinese.comedy.13", null, "大話西遊".getBytes());
    channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯與祝英臺".getBytes());

    channel.close();
    connection.close();
}

@Test
public void receive() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("movie", "topic");
    // 隊列1
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "movie", "American.*.13");
    // 隊列2
//        String queueName = channel.queueDeclare().getQueue();
//        channel.queueBind(queueName, "movie", "*.action.*");
    // 隊列3
//        String queueName = channel.queueDeclare().getQueue();
//        channel.queueBind(queueName, "movie", "Chinese.#");


    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(new String(body, "UTF-8"));
        }

    });
    synchronized (this) {
        wait();
    }
}

運行3次receive方法,注意打開或關閉相應的註釋;再運行send方法,能夠看到控制檯輸出以下內容:

// 消費者1
The Bourne Ultimatum
Titanic
// 消費者2
The Bourne Ultimatum
臥虎藏龍
// 消費者3
臥虎藏龍
大話西遊
梁山伯與祝英臺

2.6 RPC

第6種模型是用來作RPC(Remote procedure call, 遠程程序調用)的。這裏直接貼上代碼,就不作解釋了,想要了解更多細節,可參考這裏。代碼演示的是,客戶端調用服務端的fib方法,獲得返回結果。

RPCServer.java

import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Description:
 *
 * @author derker
 * @Time 2016-10-26 18:24
 */
public class RPCServer {


    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");
            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                AMQP.BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties
                        .Builder()
                        .correlationId(props.getCorrelationId())
                        .build();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [.] fib(" + message + ")");
                    response = "" + fib(n);
                } catch (Exception e) {
                    System.out.println(" [.] " + e.toString());
                    response = "";
                } finally {
                    channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}

RPCClient.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

/**
 * Description:
 *
 * @author derker
 * @Time 2016-10-26 18:36
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = UUID.randomUUID().toString();

        BasicProperties props = new BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody(), "UTF-8");
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }

    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(10)");
            response = fibonacciRpc.call("10");
            System.out.println(" [.] Got '" + response + "'");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    fibonacciRpc.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}
相關文章
相關標籤/搜索