RabbitMQ入門:Hello RabbitMQ 代碼實例

本篇博客圍繞下面幾個方面展開:java

  1. 代碼前的理論熱身
  2. 代碼實例:Hello RabbitMQ
  3. 運行代碼並調試問題

Now, Let's begin !數組

1、代碼前的理論熱身服務器

咱們來看張圖:ide

 

Publisher(生產者)生成消息,而後publish(發佈)消息到exchange(路由器,也有資料翻譯成交換機),而後根據路由規則將消息傳遞到Queue(隊列),最終交由Consumer(消費者)進行消費處理。測試

這裏的生產者和消費者都是咱們的應用,所以咱們的代碼中要實現這兩個部分。spa

中間的節點就是RabbitMQ 提供的內容,須要再生產者和消費者裏面調用其接口來定義和使用這些節點。翻譯

 

2、代碼實例:Hello RabbitMQ調試

package com.sam.hello_rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Provider {

    //定義隊列名
    static String QUEUE_NAME = "helloRabbit";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        Channel channel = null;
        try {
            //1.建立鏈接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();
            
            //2.爲通道聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            //3.發佈消息
            String msg = " hello rabbitmq, welcome to sam's blog.";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("provider send a msg: " + msg);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            //4.關閉鏈接
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

在第2步中,channel.queueDeclare 用來建立隊列,有5個參數:String queue, 隊列名; boolean durable, 該隊列是否須要持久化; boolean exclusive,該隊列是否爲該通道獨佔的(其餘通道是否能夠消費該隊列); boolean autoDelete,該隊列再也不使用的時候,是否讓RabbitMQ服務器自動刪除掉; Map<String, Object> arguments 其餘參數。第3步中,channel.basicPublish 發佈消息(用在生產者),有4個參數:String exchange, 路由器(有的資料翻譯成交換機)的名字,即將消息發到哪一個路由器; String routingKey, 路由鍵,即發佈消息時,該消息的路由鍵是什麼; BasicProperties props, 指定消息的基本屬性; byte[] body 消息體,也就是消息的內容,是字節數組。 可能你會疑惑,爲何沒有exchange呢?由於若是聲明瞭隊列,能夠不聲明路由器。code

2.接着來實現消費者,消費者實現和生產者過程差很少,可是在這裏並無關閉鏈接和通道,是由於要消費者一直等待隨時可能發來的消息。代碼以下:blog

package com.sam.hello_rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class HelloConsumer {

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        Channel channel = null;
        try {
            // 1.建立鏈接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 2.爲通道聲明隊列
            channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null);
            System.out.println(" **** keep alive ,waiting for messages, and then deal them");
            // 3.經過回調生成消費者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    
                    //獲取消息內容而後處理
                    String msg = new String(body, "UTF-8");
                    System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]");
                }
            };
            
            //4.消費消息
            channel.basicConsume(Provider.QUEUE_NAME, true, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

在第4步中,channel.basicConsume 用來接收消息,用在消費者,有3個參數:String queue, 隊列名字,即要從哪一個隊列中接收消息; boolean autoAck, 是否自動確認,默認true; Consumer callback 消費者,即誰接收消息。

3、運行代碼並調試問題

代碼寫好了,接下來進行測試,

  1. 先來執行下Provider.java,,後臺打印了內容,而且隊列中有了一條ready的消息。

     

     

  2. 執行HelloConsumer.java,預想的結果是在啓動後,控制檯直接打印出log而且RabbitMQ管理頁面沒有ready的消息:
相關文章
相關標籤/搜索