RabbitMQ基礎教程之基本使用篇

RabbitMQ基礎教程之基本使用篇

最近由於工做緣由使用到RabbitMQ,以前也接觸過其餘的mq消息中間件,從實際使用感受來看,卻不太同樣,正好趁着週末,能夠好好看一下RabbitMQ的相關知識點;但願能夠經過一些學習,能夠搞清楚如下幾點java

  • 基礎環境搭建
  • 能夠怎麼使用
  • 實現原理是怎樣的
  • 實際工程中的使用(好比結合SpringBoot能夠怎麼玩)

<!-- more -->git

相關博文,歡迎查看:github

I. 前提準備

在開始以前,先得搭建基本的環境,由於我的主要是mac進行的開發,全部寫了一篇mac上如何安裝rabbitmq的教程,能夠經過 《mac下安裝和測試rabbitmq》 查看服務器

1. Centos安裝過程

下面簡單說一下Linux系統下,能夠如何安裝ide

Centos 系統:學習

# 安裝erlang
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang

# 安裝RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

啓動和查看的命令測試

# 完成後啓動服務:
service rabbitmq-server start
# 能夠查看服務狀態:
service rabbitmq-server status

2. 注意

  • 安裝完畢以後,能夠開啓控制檯,主要就是 rabbitmq-plugins enable rabbitmq_management, 默認的端口號爲15672
  • 默認分配的用戶/密碼爲: guest/guest, 只容許本地訪問;若是跨應用讀寫數據時,請添加帳號和設置對應的權限(推薦參考上面mac安裝的博文,裏面有介紹)

II. 基本使用篇

直接使用amqp-client客戶端作基本的數據讀寫,先不考慮Spring容器的場景,咱們能夠怎樣進行塞數據,而後又怎樣能夠從裏面獲取數據;ui

在實際使用以前,有必要了解一下RabbitMQ的幾個基本概念,即什麼是Queue,Exchange,Binding,關於這些基本概念,能夠參考博文:3d

1. 基本使用姿式

首先是創建鏈接,通常須要設置服務器的IP,端口號,用戶名密碼之類的,公共代碼以下code

public class RabbitUtil {

    public static ConnectionFactory getConnectionFactory() {
        //建立鏈接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        return factory;
    }
}

a. 生產者

要使用,基本的就須要一個消息投遞和一個消息消費兩方,線看消息生產者的通常寫法

public class MsgProducer {
    public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message)
            throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立鏈接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();

        // 聲明exchange中的消息爲可持久化,不自動刪除
        channel.exchangeDeclare(exchange, exchangeType, true, false, null);

        // 發佈消息
        channel.basicPublish(exchange, toutingKey, null, message.getBytes());

        channel.close();
        connection.close();
    }
}

針對上面的代碼,結合RabbitMQ的基本概念進行分析

基本結構

  • 不論是幹啥,第一步都是獲取鏈接,也就是上面的Connection
  • 《RabbitMq基礎教程之基本概念》直到,生產者消費者都是藉助Channel與Exchange或者Queue打交道,接下來就是經過Connection建立數據流通訊道Channel
  • Channel準備完畢以後,生產者就能夠向其中投遞數據
  • 投遞完畢以後,回收現場資源

疑問:

  • 在聲明Exchange時,是否就須要選擇消息綁定策略?
  • 不聲明時,默認是什麼策略?

b. 消費者

結合上面的代碼和分析,大膽的預測下消費者的流程

  • 獲取鏈接Connection
  • 建立Channel
  • 將Channel與Queue進行綁定
  • 建立一個Consumer,從Queue中獲取數據
  • 消息消費以後,ack

下面給出一個mq推數據的消費過程

public class MsgConsumer {

    public static void consumerMsg(String exchange, String queue, String routingKey)
            throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();
        //建立鏈接
        Connection connection = factory.newConnection();

        //建立消息信道
        final Channel channel = connection.createChannel();

        //消息隊列
        channel.queueDeclare(queue, true, false, false, null);
        //綁定隊列到交換機
        channel.queueBind(queue, exchange, routingKey);
        System.out.println("[*] Waiting for message. To exist press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                try {
                    System.out.println(" [x] Received '" + message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消自動ack
        channel.basicConsume(queue, false, consumer);
    }
}

2. Direct方式

a. Producer

直接在前面的基礎上進行測試,咱們定義一個新的exchange名爲direct.exchange,而且制定ExchangeType爲直接路由方式 (先無論這種寫法的合理性)

public class DirectProducer {
    private static final String EXCHANGE_NAME = "direct.exchange";

    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        DirectProducer directProducer = new DirectProducer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String msg = "hello >>> ";


        for (int i = 0; i < 30; i++) {
            directProducer.publishMsg(routingKey[i % 2], msg + i);
        }
        System.out.println("----over-------");
    }
}

上面的代碼執行一遍以後,看控制檯會發現新增了一個Exchange

exchange

b. consumer

一樣的咱們寫一下對應的消費者,一個用來消費aaa,一個消費bbb

public class DirectConsumer {

    private static final String exchangeName = "direct.exchange";

    public void msgConsumer(String queueName, String routingKey) {
        try {
            MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        DirectConsumer consumer = new DirectConsumer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String[] queueNames = new String[]{"qa", "qb"};


        for (int i = 0; i < 2; i++) {
            consumer.msgConsumer(queueNames[i], routingKey[i]);
        }

        Thread.sleep(1000 * 60 * 10);
    }
}

執行上面的代碼以後,就會多兩個Queue,且增長了Exchange到Queue的綁定

binding

queue

當上面兩個代碼配合起來使用時,就能夠看到對於消費者而言,qa一直消費的是偶數,qb一直消費的是奇數,一次輸出以下:

[qa] Waiting for message. To exist press CTRL+C
[qb] Waiting for message. To exist press CTRL+C
 [qa] Received 'hello >>> 0
 [qb] Received 'hello >>> 1
 [qa] Received 'hello >>> 2
 [qb] Received 'hello >>> 3
 [qa] Received 'hello >>> 4
...

3. Fanout方式

有了上面的case以後,這個的實現和測試就比較簡單了

a. Producer

public class FanoutProducer {
    private static final String EXCHANGE_NAME = "fanout.exchange";

    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        FanoutProducer directProducer = new FanoutProducer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String msg = "hello >>> ";


        for (int i = 0; i < 30; i++) {
            directProducer.publishMsg(routingKey[i % 2], msg + i);
        }
        System.out.println("----over-------");
    }
}

b. consumer

public class FanoutProducer {
    private static final String EXCHANGE_NAME = "fanout.exchange";

    public void publishMsg(String routingKey, String msg) {
        try {
            MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        FanoutProducer directProducer = new FanoutProducer();
        String[] routingKey = new String[]{"aaa", "bbb"};
        String msg = "hello >>> ";


        for (int i = 0; i < 30; i++) {
            directProducer.publishMsg(routingKey[i % 2], msg + i);
        }
        System.out.println("----over-------");
    }
}

這個的輸出就比較有意思了,fa,fb兩個隊列均可以接收到發佈的消息,並且單獨的執行一次上面的投遞數據以後,發現fa/fb兩個隊列的數據都是30條

30

而後消費的結果以下

[qa] Waiting for message. To exist press CTRL+C
[qb] Waiting for message. To exist press CTRL+C
 [qa] Received 'hello >>> 0
 [qb] Received 'hello >>> 0
 [qa] Received 'hello >>> 1
 [qb] Received 'hello >>> 1
 [qb] Received 'hello >>> 2
 [qa] Received 'hello >>> 2
 [qa] Received 'hello >>> 3
 [qb] Received 'hello >>> 3
 [qb] Received 'hello >>> 4
 [qa] Received 'hello >>> 4
 ...

4. Topic方式

代碼和上面差很少,就不重複拷貝了,接下來卡另外幾個問題

III. 基礎進階

在上面的基礎使用中,會有幾個疑問以下:

  • Exchange聲明的問題(是否必須聲明,若是不聲明會怎樣)
  • Exchange聲明的幾個參數(durable, autoDelete)有啥區別
  • 當沒有隊列和Exchange綁定時,直接往隊列中塞數據,好像不會有數據增長(即先塞數據,而後建立queue,創建綁定,從控制檯上看這個queue裏面也不會有數據)
  • 消息消費的兩種姿式(一個主動去拿數據,一個是rabbit推數據)對比
  • ack/nack怎麼用,nack以後消息能夠怎麼處理

以上內容,留待下一篇進行講解

IV. 其餘

1. 相關博文

2. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛

3. 聲明

盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激

4. 掃描關注

QrCode

相關文章
相關標籤/搜索