RabbitMQ 系列一:Hello World

生活不止眼前的苟且,還有永遠讀不懂的詩和到不了的遠方。html

https://user-gold-cdn.xitu.io/2019/6/19/16b6ea0613715a3b?w=1880&h=1249&f=jpeg&s=117932
https://user-gold-cdn.xitu.io/2019/6/19/16b6ea0613715a3b?w=1880&h=1249&f=jpeg&s=117932

概述

RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集羣和故障轉移是構建在開放電信平臺框架上的。全部主要的編程語言均有與代理接口通信的客戶端庫。java

安裝RabbitMQ

RabbitMQ下載地址,本系列咱們採用docker的方式來安裝RabbitMQRabbitMQdocker鏡像地址。關於如何docker的安裝和使用可參考如下連接;git

啓動RabbitMQgithub

docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone daocloud.io/library/rabbitmq:3.7.14-management-alpinedocker

安裝完成以後可經過http://localhost:15672/#/訪問圖形界面(用戶名密碼均爲admin)編程

RabbitMQ 架構圖以下

https://user-gold-cdn.xitu.io/2019/6/19/16b6ea0613842390?w=640&h=434&f=jpeg&s=103636
https://user-gold-cdn.xitu.io/2019/6/19/16b6ea0613842390?w=640&h=434&f=jpeg&s=103636

  1. Server 簡單來講就是消息隊列服務器實體
  2. Exchange 消息交換機,它指定消息按什麼規則,路由到哪一個隊列
  3. Queue 消息隊列載體,每一個消息都會被投入到一個或多個隊列
  4. Binding: 綁定,它的做用就是把exchangequeue按照路由規則綁定起來
  5. Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞
  6. VHost: 虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
  7. Producer: 消息生產者,就是投遞消息的程序
  8. Consumer: 消息消費者,就是接受消息的程序
  9. Channel: 消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務

注意:由ExchangeQueueRoutingKey三個才能決定一個從ExchangeQueue的惟一的線路。服務器

RabbitMQ 消息模型

關於RabbitMQ 消息模型架構

  • Direct交換機:徹底根據key進行投遞。
  • Topic交換機:在key進行模式匹配後進行投遞。
  • Fanout交換機:它採起廣播模式,消息進來時,將會被投遞到與改交換機綁定的全部隊列中。

RabbitMQ Hello World

生產者 Procuder

package com.niocoder.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Procuder {

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1. 建立ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        // 2. 經過連接工廠建立鏈接
        Connection connection = factory.newConnection();

        // 3. 經過connection 建立一個channel
        Channel channel = connection.createChannel();

        // 4. 經過channel發送數據
        for (int i = 0; i < 3; i++) {
            String msg = "Hello World";
            /** * * @param exchange the exchange to publish the message to * * @param routingKey the routing key * * @param props other properties for the message - routing headers etc * * @param body the message body */
            channel.basicPublish("", "hello", null, msg.getBytes());
        }

        // 5. 關閉鏈接
        channel.close();
        connection.close();
    }
}

複製代碼

消費者 Consumer

package com.niocoder.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 建立ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        // 2. 經過連接工廠建立鏈接
        Connection connection = factory.newConnection();

        // 3. 經過connection 建立一個channel
        Channel channel = connection.createChannel();

        // 4. 建立一個隊列
        String queueName = "hello";
        /** * * @param queue the name of the queue * * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * * @param arguments other properties (construction arguments) for the queue */
        channel.queueDeclare(queueName, true, false, false, null);

        // 5. 建立消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 6. 設置channel
        channel.basicConsume(queueName,true,consumer);

        while (true){
            // 7. 獲取消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消費端: " + msg);
        }
    }
}

複製代碼

效果以下:

先啓動消費者,再啓動生產者框架

https://user-gold-cdn.xitu.io/2019/6/19/16b6eb2d77fd16e0?w=1155&h=667&f=gif&s=1315918
https://user-gold-cdn.xitu.io/2019/6/19/16b6eb2d77fd16e0?w=1155&h=667&f=gif&s=1315918

代碼下載

相關文章
相關標籤/搜索