流程: 首先是獲取鏈接工廠 ConnectionFactory --> 獲取一個鏈接 Connection --> 經過鏈接創建數據通訊 信道 Channel,用 Channel 發送或接收消息。java
代碼地址:git
https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下的 quickstart 包下
複製代碼
maven:github
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.15.RELEASE</version>
<!--<version>2.1.0.RELEASE</version>-->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.6</version>
</dependency>
</dependencies>
複製代碼
定義和賦值 RabbitMQ 的配置web
public interface RabbitMQCommon {
final static String RABBITMQ_HOST = "192.168.0.7";
final static int RABBITMQ_PORT = 5672;
final static String RABBITMQ_DEFAULT_VIRTUAL_HOST = "/";
public final static String RABBITMQ_USERNAME = "guest";
public final static String RABBITMQ_PASSWORD = "guest";
}
複製代碼
消費端代碼:spring
/**
* 快速開始:消費者
*/
@Slf4j
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// queueName, durable, exclusive, autoDelete, arguments
channel.queueDeclare("test1001", true, false, false, null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// queueName, autoAck, Consumer callback
channel.basicConsume("test1001", true, queueingConsumer);
log.info("消費端啓動啦~");
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消費端接收到:{}", msg);
}
}
}
複製代碼
生產端代碼:api
/**
* 快速開始:生產者
*/
@Slf4j
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
String msg = "hello RabbitMQ + " + i;
log.info("生產者發送消息:{}", msg);
channel.basicPublish("", "test1001", null, msg.getBytes());
}
log.info("生產者發送消息完畢");
channel.close();
connection.close();
}
}
複製代碼
run運行消費端的代碼bash
打開管控臺,看到這個隊列建立成功了maven
運行生產端的代碼,看到以下日誌ide
點擊這個queue進去查看到剛纔有消息發送過來了spring-boot
在idea查看消費端的日誌
剛纔生產端發送的消息已被消費端消費,至此,快速啓動demo已完畢