新的閱讀體驗地址:http://www.zhouhong.icu/post/141
本篇文章全部的代碼:https://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
啓動服務
systemctl start rabbitmq-server
查看是否啓動
lsof -i:5672
rabbitmq-plugins enable rabbitmq_management
lsof -i:15672
或者:
netstat -tnlp | grep 15672
#添加用戶 用戶名 admin 密碼 admin web管理工具可用此用戶登陸
sudo rabbitmqctl add_user admin admin
#設置用戶角色 管理員
sudo rabbitmqctl set_user_tags admin administrator
#設置用戶權限(接受來自全部Host的全部操做)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"
#查看用戶權限
sudo rabbitmqctl list_user_permissions admin
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
2.發送端:html
package com.zhouhong.rabbitmq.api.helloworld; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { public static void main(String[] args) throws Exception { // 1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); // 2 建立Connection Connection connection = connectionFactory.newConnection(); // 3 建立Channel Channel channel = connection.createChannel(); // 4 聲明 String queueName = "test001"; // 參數: queue名字,是否持久化,獨佔的queue(僅供此鏈接),不使用時是否自動刪除, 其餘參數 channel.queueDeclare(queueName, false, false, false, null); Map<String, Object> headers = new HashMap<String, Object>(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers).build(); for(int i = 0; i < 5;i++) { String msg = "Hello World RabbitMQ " + i; channel.basicPublish("", queueName , props , msg.getBytes()); } } }
3.接收端java
package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
// durable 是否持久化消息
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循環獲取消息
while(true){
// 獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
4.結果(先啓動接收端進行監控,再啓動發送端)c++
收到消息:Hello World RabbitMQ 0
收到消息:Hello World RabbitMQ 1
收到消息:Hello World RabbitMQ 2
收到消息:Hello World RabbitMQ 3
收到消息:Hello World RabbitMQ 4
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
//必需要和接收端 routingKey 一一對應
String routingKey = "test_direct_routingKey";
//5 發送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test_direct_routingKey";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環獲取消息
while(true){
//獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// 只能匹配一個 例如:user.txt、user.py均可以,可是user.txt.py 不行
//String routingKey = "user.*";
// user.txt、user.py 、user.txt.py 均可以匹配到
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer1 start.. ");
// 循環獲取消息
while(true){
// 獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
}
}
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.121");
connectionFactory.setPort(5672);
connectionFactory.setPassword("admin");
connectionFactory.setUsername("admin");
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}