(翻譯)RabbitMQ Java Client教程(一)Hello World

原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.htmlhtml

介紹

RabbitMQ是一個接收並轉發消息的消息代理。你能夠把它當成是一座郵局,當你把想要發出的郵件放到信箱裏以後,郵遞員會把它運送到你指定的收件人那裏。 RabbitMQ就是郵局、信箱和郵遞員的集合。java

RabbitMQ和郵局的最大區別在於它處理的是消息——也就是二進制的數據,而非信件。數組

在RabbitMQ中使用了一些術語:緩存

  • 生產(producing)表示發送消息。一個發送消息的程序被稱做生產者(producer)

  • 隊列(queue)相似於RabbitMQ內部的郵箱。雖然消息能夠在應用和RabbitMQ之間流轉,可是它們只能保存在隊列(queue)中。隊列(queue)本質上是一個很大的消息緩衝區,它只會受到主機內存和磁盤容量的限制。多個生產者(producers)能夠發送消息給同一個隊列,同時多個消費者(consumers )也能夠從同一個隊列中接收消息。咱們用下邊這張圖表示一個隊列:

  • 消費(consuming)表示接收消息。一個等待接收消息的程序被稱做消費者(consumer

注意生產者、消費者以及消息代理不須要部署在同一臺主機上,並且大多數狀況下它們也確實不在一塊兒。另外,一個程序也不能同時當生產者和消費者。併發

「Hello World」

在本章節咱們將編寫兩段Java程序:一個發送消息的生產者和一個接收消息並打印的消費者。咱們將僅着眼於那些須要用到的簡單的東西而忽略一些細節。畢竟這只是一個入門的例子。異步

在下邊這張圖中,「P」是生產者,「C」是消費者。中間的方框是一個隊列—— RabbitMQ爲消費者保留的消息緩衝區。socket

發送消息

咱們將消息發送者成爲==Send==,將消息接收者稱爲==Recv==。消息發送者將鏈接到RabbitMQ併發送一條消息,而後退出。編碼

在==Send.java==中,咱們須要導入一些類3d

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

設置類並命名隊列:代理

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}

而後咱們建立一個鏈接到Rabbitmq服務端:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

}

Connection類抽象了底層的socket鏈接,並幫咱們處理包括協議版本協商和受權的一系列事情。這裏咱們鏈接的是本地的RabbitMQ,因此使用了localhost。若是咱們想要鏈接到其餘機器的RabbitMQ,咱們須要將localhost改爲對應的域名或者IP。

接下來咱們建立一個channel,咱們大多數的工做均可以經過這個類提供的API完成。注意咱們可使用try-with-resources語法來建立,由於==Connection==和==Channel==都實現了==java.io.Closeable==接口。這樣咱們就不須要手動關閉它們了。

爲了發送消息,咱們還須要聲明一個隊列,全部的這些代碼都包裹在try-with-resources語句塊中

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

聲明隊列的代碼是冪等的——它只有在不存在的時候纔會建立。它能夠接收的消息爲字節數組,因此你能夠任意編碼你的消息。

下邊是Send.java的完整代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消息發送不成功!

若是這是你第一次使用RabbitMQ而且沒有看到"Sent"消息,你可能抓耳撓腮想要知道哪裏出了問題。或許是RabbitMQ啓動的時候磁盤空間不足(默認狀況下至少須要200MB的剩餘空間),因此它拒絕接收消息。檢查RabbitMQ的日誌文件查明緣由,並在須要的時候調低這個限制。這份配置文檔會告訴你怎麼設置disk_free_limit參數。

接收消息

咱們的消費者不像生產者那樣只須要發送一條消息,它須要一直監聽來自RabbitMQ的消息,因此咱們須要保持它持續運行接收消息並打印。

==Recv.java==的代碼與Send.java的代碼很相近:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

多出來的DefaultConsumer類實現了 Consumer接口,咱們將會用它來接收來自RabbitMQ的消息。

相應的配置與生產者相同:咱們開啓一個 connection和一個 channel,並聲明咱們想要消費的隊列。注意這裏隊列的名字必須與生產者中的隊列名字相同。

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

注意這裏咱們也聲明瞭一個隊列。由於咱們的消費者可能比生產者先啓動,因此在咱們消費消息以前須要先確保隊列存在。

爲何咱們不使用try-with-resource語句來自動關閉channel和connection?由於若是這樣作的話,咱們的程序會自動關閉關閉這些資源並退出!而實際上咱們須要這個程序持續運行以異步監聽消息的到達。

以後咱們須要告訴RabbitMQ服務端將隊列中的消息發送給咱們。因爲RabbitMQ會將消息異步發送給咱們,因此咱們提供了一個回調方法來幫咱們緩存消息,直到咱們去處理這些消息。這就是DeliverCallback類所作的事情。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

下邊是Recv.java的完整代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
相關文章
相關標籤/搜索