RabbitMQ 之topics (通配符)篇 初學

官網地址:https://www.rabbitmq.com/getstarted.htmlhtml

1、RabbitMQ簡介java

  MQ全稱爲Message Queue,即消息隊列, RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message
Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通訊方法,消息隊列在分佈式系統開
發中應用很是普遍。併發

2、開發中消息隊列一般有以下應用場景:
  一、任務異步處理。
將不須要同步處理的而且耗時長的操做由消息隊列通知消息接收方進行異步處理。提升了應用程序的響應時間。
  二、應用程序解耦合
  MQ至關於一箇中介,生產方經過MQ與消費方交互,它將應用程序進行解耦合。
  市場上還有哪些消息隊列?
    ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
3、爲何使用RabbitMQ呢?
  一、使得簡單,功能強大。
  二、基於AMQP協議。
  三、社區活躍,文檔完善。
  四、高併發性能好,這主要得益於Erlang語言。
  五、Spring Boot默認已集成RabbitMQ異步

4、RabbitMQ的全部模式(如下截圖都是官方連接裏面的)maven

 

 

 

 

5、代碼舉例Topics實現tcp

準備工做:下載RabbitMQ客戶端: http://www.rabbitmq.com/download.html分佈式

     下載erlang,由於RabbitMQ是erlang語言開發的,因此須要下載:  http://erlang.org/download/otp_win64_20.3.exe高併發

下載好之後進行安裝。安裝完成後進行開發測出:性能

一、首先第一步須要導入RabbitMQ的java客戶端,我這裏是建立的maven項目,因此直接導入依賴,以下:測試

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.2</version>
<scope>test</scope>
</dependency>

注意:通配符有兩個:
  1. # :表明沒有或一個或多個單詞(單詞與單詞之間用「.」分割);
  2. * :表明一個零個或一個單詞;
例如:
  aa.#.bb.* :
      匹配的: aa.HH.CC.bb 、 aa.bb 、aa.bb.cc
      不匹配: aa.cc 、 aa.bb.cc.dd
二、建立工程項目以及生產者P和消費者C1與C2(假設C1是短信SMS接收者,C2是郵件Email接收者)

 

三、編寫生產者P類:

package com.test.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class P {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//交換機名稱
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
//建立鏈接工廠
try {
//建立一個與MQ的鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立一個tcp鏈接
connection = factory.newConnection();
//經過鏈接建立一個通道,每一個通道表明一個會話
channel = connection.createChannel();
/**
* 聲明交換機:參數以下
* 一、交換機名稱
* 二、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
/**
* 聲明隊列:參數以下:
* 一、隊列名稱
* 二、是否持久化
* 三、是否獨佔此隊列
* 四、隊列不用是否自動刪除
* 五、參數
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//發送郵件Email消息
for (int i = 0; i < 10; i++) {
String message = "email inform to user : " + i;
/**
* 向交換機發送消息 :參數明細
* 一、交換機名稱,不指令使用默認交換機名稱 Default Exchange
* 二、routingKey(路由key),根據key名稱將消息轉發到具體的隊列,這裏填寫隊列名稱表示消息將發到此隊列
* 三、消息屬性
* 四、消息內容
*/
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發送短信消息
for (int i = 0; i < 10; i++) {
String message = "sms inform to user : " + i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發送短信和郵件消息
for (int i = 0; i < 10; i++) {
String message = "sms and email inform to user" + i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.編寫消費者C1類:email接受者
package com.test.rabbitmq;

import com.rabbitmq.client.*;

public class C1 {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
//交換機名稱
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

public static void main(String[] args) {
Connection connection = null;
Channel channel = null;

try {
//建立一個與MQ的鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立一個tcp鏈接
connection = factory.newConnection();
//經過鏈接建立一個通道,每一個通道表明一個會話
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

//綁定email通知隊列
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM, "inform.#.email.#");

//消費消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received Email:'" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback, consumerTag -> {
});

} catch (Exception e) {
e.printStackTrace();
}
}
}
5.編寫消費者C2類 SMS接收者
package com.test.rabbitmq;

import com.rabbitmq.client.*;

public class C2 {
//隊列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//交換機名稱
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

public static void main(String[] args) {
Connection connection = null;
Channel channel = null;

try {
//建立一個與MQ的鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立一個tcp鏈接
connection = factory.newConnection();
//經過鏈接建立一個通道,每一個通道表明一個會話
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//綁定email通知隊列
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM, "inform.#.sms.#");

//消費消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received SMS:'" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_INFORM_SMS, true, deliverCallback, consumerTag -> {
});

} catch (Exception e) {
e.printStackTrace();
}
}
}
6.測試:
啓動所有服務:先發送ssm消息:

 

 

 同時生成ssm消息和email消息併發送:

 

 

測試結果:
相關文章
相關標籤/搜索