RabbitMQ 初級教程[1] - "Hello World!"

原文地址:RabbitMQ Tutorials - "Hello World!" - Javahtml

內容根據自身的理解有增刪java

RabbitMQ是一個消息代理(broker):它接受並轉發消息。git

  • 生產者(producer):是指發送消息的程序。
  • 隊列(queue):相似RabbitMQ內部的一個郵箱。能夠用於存儲生產者和消費者之間的消息。隊列只受主機的內存和磁盤限制,本質上是一個巨大的消息緩衝區(message buffer)。生產者們往隊列裏發送消息;消費者們嘗試從隊列裏接收消息。
  • 消費者(consumer):等待接收消息的程序。

三者一般不在同一個host裏。github


由於這是一個簡單的事例,因此忽略了許多 Java API 細節。api

以下圖所示,「P」是生產者,「C」是消費者。中間是隊列。數組

圖片描述

準備工做

建立maven工程,添加相關依賴:amqp-clientslf4j-apislf4j-simplemaven

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.1.1</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
</dependency>

發送

We'll call our message publisher (sender) Send and our message consumer (receiver) Recv. The publisher will connect to RabbitMQ, send a single message, then exit.

生產者鏈接到RabbitMQ,發送發消息,而後推出。ide

public class Send {
    //建立queue的名稱
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立一個broker的本地鏈接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        //建立channel
        Channel channel = connection.createChannel();
        
        //聲明一個queue(冪等),消息可發送至此
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        //message以字節數組的方式傳遞
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
        
        //最後關閉channel和connection
        channel.close();
        connection.close();
    }
}

接收

so unlike the publisher which publishes a single message, we'll keep it running to listen for messages and print them out.

消費者能夠是一致處於監聽狀態,等待消息的到來。函數

Note that we declare the queue here, as well. Because we might start the consumer before the publisher, we want to make sure the queue exists before we try to consume messages from it.

這段描述可能會讓讀者誤覺得必須先啓動消費者,實際上是不必定的。由於生產者發佈消息後,消息存放於 queue 中,消費者隨時均可以來獲取。文檔裏用的是we might start,而不是we must startspa

public class Recv {

    //名字和生產者建立時保持一致
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //建立connection、channel和queue;這一部分和Send.java同樣
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //DefaultConsumer是Cousumer的實現類,經過回調函數接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

查看隊列

使用rabbitmqctl list_queues查看當前 queue 下的消息數量

$ rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello    2
相關文章
相關標籤/搜索