rabbitmq消息隊列

1.Rabbitmq架構

image.png

  • producer: 生產者,發佈消息者,消息的源頭,向exchange發佈消息的應用程序
  • consumer: 消費者,消費消息,消息的接受者,向queue獲取到消息
  • broker: 具體的服務器
  • vhost: 虛擬主機,一個broker能夠構建多個vhost,一個vhost包含exchanger,queue,bingding和權限機制。
  • exchanger: 交換器,接收生產者發佈的消息,而且依據bingding規則發送給具體的隊列。
  • binding: 綁定,隊列與交換器之間的綁定關係。
  • queue: 存放來自交換器的消息,消費者消息的來源。
  • message: 消息,由消息頭和消息體組成,消息體是透明的,消息頭由一些可選屬性組成,包括路由鍵(routing-key),優先級(priority)和是否持久化存儲等。
  • routing-key:

2.Exchanger分發消息類型

exchanger分發消息一共有4種方式:headers,direct,fanout,topic;headers方式採用header匹配方式,同direct,性能不好,因此不被常用。java

  • fanout: 分發模式,一個消息發送到交換器上後,那麼就會將消息發送到全部與該交換器相關的隊列,不進行路由鍵的匹配。
  • direct: 消息的路由鍵和隊列的路由鍵徹底匹配,就將消息發送到該隊列
  • topic: 不一樣於direct的徹底匹配,topic方式依據制定的規則進行匹配。(topic路由鍵規則:*表明一個字符,#表明0個或多個字符)

demo: 參考服務器

引入依賴
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

生產者:架構

package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        //設置 RabbitMQ 地址
        factory.setHost("localhost");
        //創建到代理服務器到鏈接
        Connection conn = factory.newConnection();
        //得到信道
        Channel channel = conn.createChannel();
        //聲明交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";
        //發佈消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}

消費者ide

package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        //創建到代理服務器到鏈接
        Connection conn = factory.newConnection();
        //得到信道
        final Channel channel = conn.createChannel();
        //聲明交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //聲明隊列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        //綁定隊列,經過鍵 hola 將隊列和交換器綁定起來
        channel.queueBind(queueName, exchangeName, routingKey);

        while(true) {
            //消費消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消費的路由鍵:" + routingKey);
                    System.out.println("消費的內容類型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    //確認消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消費的消息體內容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}
相關文章
相關標籤/搜索