rabbitMQ是一個開源的消息代理和隊列服務器,用於經過普通協議在徹底不一樣的應用之間共享數據,RabbitMQ是使用Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的。html
AMQP定義:是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計,是一個規範,RabbitMQ就是這個規範的一種實現。node
消息流轉圖 redis
rabbitMQ之安裝和配置(一)
rabbitmq之配置文件詳解(二)數據庫
安裝完成以後咱們能夠看到有三個與rabbitmq相關的命令 api
服務的啓動:rabbitmq-server start &
服務的中止:rabbitmqctl stop_app
管理插件: rabbitmq-plugins enable rabbitmq_management
複製代碼
rabbitmq-plugins list
命令查看全部已經存在的插件
基本操做bash
rabbitmqctl stop_app #關閉應用
rabbitmqctl start_app #啓動應用
rabbitmqctl status #節點狀態
rabbitmqctl add_user username password #增長用戶
rabbitmqctl list_users #列出全部用戶
rabbitmqctl delete_user username #刪除用戶
rabbitmqctl clear_permissions -p vhostpath username #清除用戶權限
rabbitmqctl list_user_permissions username #列出用戶權限
rabbitmqctl change_password username newpassword #修改密碼
# 設置用戶權限
rabbitmqctl set_permissions -p vhostpath username ".*"".*"".*"
rabbitmqctl add_vhost vhostpath #建立虛擬主機
rabbitmqctl list_vhosts #列出全部虛擬主機
rabbitmqctl list_permissions -p vhostpath #列出虛擬主機上全部權限
rabbitmqctl delete_vhost vhostpath #刪除虛擬主機
rabbitmqctl list_queues #查看全部隊列信息
rabbitmqctl -p vhostpath purge_queue blue #清除隊列裏的消息
複製代碼
高級操做服務器
#移除全部數據,要在rabbitmqctl stop_app 以後使用
rabbitmqctl reset
#[--ram] 指定節點的存儲模式,ram是內存基本存儲
rabbitmqctl join_cluster <clusternode> [--ram] #組成集羣命令
rabbitmqctl cluster_status #查看集羣狀態
# 修改集羣節點的存儲形式 disc磁盤 ram內存
rabbitmqctl change_cluster_node_type disc|ram
# 忘記節點(摘除節點)
rabbitmqctl forget_cluster_node [--offline]
#修改節點名稱
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2...]
複製代碼
管控臺有的操做,ctl命令行也都有網絡
maven 依賴架構
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
複製代碼
生產者發佈消息app
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Procuder {
public static void main(String[] args) throws Exception {
// 1 建立一個ConnectionFactory,並進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 經過鏈接工程建立鏈接
Connection connection = connectionFactory.newConnection();
// 3 經過connection建立一個channel
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
String msg = "hello world!";
// 1 exchange 2 routing key
channel.basicPublish("", "test01", null, msg.getBytes());
}
channel.close();
connection.close();
}
}
複製代碼
消費者訂閱處理消息
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.*;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1 建立一個ConnectionFactory,並進行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 經過鏈接工程建立鏈接
Connection connection = connectionFactory.newConnection();
// 3 經過connection建立一個channel
Channel channel = connection.createChannel();
//4 聲明(建立)隊列
String queueName = "test01";
channel.queueDeclare(queueName, true, false, false, null);
//5 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6 設置channel
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費端消息:" + msg);
Envelope envelope = delivery.getEnvelope();
}
}
}
複製代碼
全部發送到Direct Exchange的消息被轉發到RouteKey中指定到Queue
⚠️注意:Direct模式可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RouteKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。
示例代碼
package com.bfxy.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class ConsumerDirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示聲明瞭一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示聲明瞭一個隊列
channel.queueDeclare(queueName, false, false, false, null);
//創建一個綁定關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
複製代碼
package com.bfxy.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerDirectExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct111";
//5 發送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
複製代碼
全部發送到Topic Exchange的消息會被轉發到全部關心該RouteKey中指定Topic的Queue上
Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic
⚠️注意:可使用通配符進行模糊匹配
符號 "#" 匹配一個或多個詞
符號 "*" 匹配很少很多一個詞
例如:"log.#" 可以匹配到 "log.info.oa"
"log.*" 只會匹配到 "log.error"
複製代碼
package com.bfxy.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class ConsumerTopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.*";
// 1 聲明交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 2 聲明隊列
channel.queueDeclare(queueName, false, false, false, null);
// 3 創建交換機和隊列的綁定關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
複製代碼
package com.bfxy.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerTopicExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
複製代碼
示例代碼
package com.bfxy.rabbitmq.api.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class ConsumerFanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不設置路由鍵
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
複製代碼
package com.bfxy.rabbitmq.api.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerFanoutExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
//5 發送
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
複製代碼
Binding-綁定
Queue-消息隊列
Message-消息
Virtual host-虛擬主機
更多內容閱讀:
rabbitmq核心概念總結
中文文檔