慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結java
課程導航git
初識RabbitMQgithub
RabbitMQ簡介web
AMQP協議模型spring
學習筆記apache
0.安裝準備 官網地址:http://www.rabbitmq.com/ 安裝Linux必要依賴包<Linux7> 下載RabbitMQ安裝包 進行安裝,修改相關配置文件 vim /etc/hostname vim /etc/hosts 1.安裝Erlang wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb sudo apt-get install erlang erlang-nox 2.安裝RabbitMQ echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get install rabbitmq-server 3.安裝RabbitMQ web管理插件 sudo rabbitmq-plugins enable rabbitmq_management sudo systemctl restart rabbitmq-server 訪問:http://localhost:15672 默認用戶名密碼:guest/guest 4.修改RabbitMQ配置 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app 好比修改密碼、配置等等;例如:loopback_users中的<<"guest">>,只保留guest 服務啓動:rabbitmq-server start & 服務中止:rabbitmqctl app_stop
RabbitMQ的總體架構json
RabbitMQ核心概念vim
Channel:網絡信道api
幾乎全部的操做都在Channel中進行
Channel是進行消息讀寫的通道
客戶端可創建多個Channel
每一個Channel表明一個會話任務
Message:消息服務器
服務器和應用程序之間傳送的數據,由Properties和Body組成
Properties能夠對消息進行修飾,好比消息的優先級、延遲等高級特性
Body則就是消息體內容
Virtual host:虛擬機
用於進行邏輯隔離,最上層的消息路由
一個Virtual host裏面能夠有若干個Exchange和Queue
同一個Virtual host裏面不能有相同名稱的Exchange或Queue
RabbitMQ消息的流轉過程
SpringBoot與RabbitMQ集成
建立名爲rabbitmq-producer的maven工程pom以下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>47-rabbitmq</artifactId> <groupId>com.myimooc</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rabbitmq-producer</artifactId> <properties> <spring.boot.version>2.0.4.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--RabbitMQ依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--工具類依賴--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
1.編寫Order類
package com.myimooc.rabbitmq.entity; import java.io.Serializable; /** * <br> * 標題: 訂單實體<br> * 描述: 訂單實體<br> * 時間: 2018/09/06<br> * * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存儲消息發送的惟一標識 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id='" + id + ''' + ", name='" + name + ''' + ", messageId='" + messageId + ''' + '}'; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
2.編寫OrderSender類
package com.myimooc.rabbitmq.producer.producer; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * <br> * 標題: 訂單消息發送者<br> * 描述: 訂單消息發送者<br> * 時間: 2018/09/06<br> * * @author zc */ @Component public class OrderSender { private RabbitTemplate rabbitTemplate; @Autowired public OrderSender( RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * 發送訂單 * * @param order 訂單 * @throws Exception 異常 */ public void send(Order order) throws Exception { CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); // exchange:交換機 // routingKey:路由鍵 // message:消息體內容 // correlationData:消息惟一ID this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }
3.編寫application.properties類
# RabbitMQ配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # Server配置 server.servlet.context-path=/ server.port=8080 spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
4.編寫Application類
package com.myimooc.rabbitmq.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * <br> * 標題: 啓動類<br> * 描述: 啓動類<br> * 時間: 2018/09/06<br> * * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
5.編寫OrderSenderTest類
package com.myimooc.rabbitmq.producer.producer; import com.myimooc.rabbitmq.entity.Order; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; /** * <br> * 標題: 訂單消息發送者測試<br> * 描述: 訂單消息發送者測試<br> * 時間: 2018/09/06<br> * * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class OrderSenderTest { @Autowired private OrderSender orderSender; @Test public void testSend1() throws Exception { Order order = new Order(); order.setId("201809062009010001"); order.setName("測試訂單1"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderSender.send(order); } }
建立名爲rabbitmq-consumer的maven工程pom以下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>47-rabbitmq</artifactId> <groupId>com.myimooc</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rabbitmq-consumer</artifactId> <properties> <spring.boot.version>2.0.4.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--RabbitMQ依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--工具類依賴--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
1.編寫Order類
package com.myimooc.rabbitmq.entity; import java.io.Serializable; /** * <br> * 標題: 訂單實體<br> * 描述: 訂單實體<br> * 時間: 2018/09/06<br> * * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存儲消息發送的惟一標識 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id='" + id + ''' + ", name='" + name + ''' + ", messageId='" + messageId + ''' + '}'; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
2.編寫OrderReceiver類
package com.myimooc.rabbitmq.consumer.consumer; import com.rabbitmq.client.Channel; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** * <br> * 標題: 訂單接收者<br> * 描述: 訂單接收者<br> * 時間: 2018/09/06<br> * * @author zc */ @Component public class OrderReceiver { /** * 接收消息 * * @param order 消息體內容 * @param headers 消息頭內容 * @param channel 網絡信道 * @throws Exception 異常 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true"), exchange = @Exchange(name = "order-exchange",type = "topic"), key = "order.*" )) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 消費者操做 System.out.println("收到消息:"); System.out.println("訂單信息:" + order.toString()); // 手動簽收消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
3.編寫application.properties類
# RabbitMQ鏈接配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # RabbitMQ消費配置 # 基本併發:5 spring.rabbitmq.listener.simple.concurrency=5 # 最大併發:10 spring.rabbitmq.listener.simple.max-concurrency=10 # 簽收模式:手動簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 限流策略:同一時間只有1條消息發送過來消費 spring.rabbitmq.listener.simple.prefetch=1 # Server配置 server.servlet.context-path=/ server.port=8082 spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
4.編寫Application類
package com.myimooc.rabbitmq.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * <br> * 標題: 啓動類<br> * 描述: 啓動類<br> * 時間: 2018/09/06<br> * * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
保障100%消息投遞成功設計方案(一)
因篇幅限制,源碼請到github地址查看,這裏僅展現核心關鍵類
1.編寫OrderSender類
package com.myimooc.rabbitmq.ha.producer; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * <br> * 標題: 訂單消息發送者<br> * 描述: 訂單消息發送者<br> * 時間: 2018/09/06<br> * * @author zc */ @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 回調方法:confirm確認 */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData:" + correlationData); String messageId = correlationData.getId(); if (ack) { // 若是confirm返回成功,則進行更新 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(messageId); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS); brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 失敗則進行具體的後續操做:重試或者補償等 System.out.println("異常處理..."); } } }; /** * 發送訂單 * * @param order 訂單 */ public void send(Order order) { // 設置回調方法 this.rabbitTemplate.setConfirmCallback(confirmCallback); // 消息ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); // 發送消息 this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }
2.編寫OrderService類
package com.myimooc.rabbitmq.ha.service; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.mapper.OrderMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; /** * <br> * 標題: 訂單服務<br> * 描述: 訂單服務<br> * 時間: 2018/09/07<br> * * @author zc */ @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; @Autowired private OrderSender orderSender; /** * 建立訂單 * * @param order 訂單 */ public void create(Order order) { // 當前時間 Date orderTime = new Date(); // 業務數據入庫 this.orderMapper.insert(order); // 消息日誌入庫 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(order.getMessageId()); messageLogPO.setMessage(FastJsonConvertUtils.convertObjectToJson(order)); messageLogPO.setTryCount(0); messageLogPO.setStatus(Constants.OrderSendStatus.SENDING); messageLogPO.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT)); this.brokerMessageLogMapper.insert(messageLogPO); // 發送消息 this.orderSender.send(order); } }
3.編寫RetryMessageTask類
package com.myimooc.rabbitmq.ha.task; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; /** * <br> * 標題: 重發消息定時任務<br> * 描述: 重發消息定時任務<br> * 時間: 2018/09/07<br> * * @author zc */ @Component public class RetryMessageTask { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private OrderSender orderSender; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 啓動完成3秒後開始執行,每隔10秒執行一次 */ @Scheduled(initialDelay = 3000, fixedDelay = 10000) public void retrySend() { logger.debug("重發消息定時任務開始"); // 查詢 status = 0 和 timeout 的消息日誌 List<BrokerMessageLogPO> pos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage(); for (BrokerMessageLogPO po : pos) { logger.debug("處理消息日誌:{}",po); if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) { // 更新狀態爲失敗 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(po.getMessageId()); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE); this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 進行重試,重試次數+1 this.brokerMessageLogMapper.updateRetryCount(po); Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class); try { this.orderSender.send(reSendOrder); } catch (Exception ex) { // 異常處理 logger.error("消息發送異常:{}", ex); } } } logger.debug("重發消息定時任務結束"); } }
4.編寫ApplicationTest類
package com.myimooc.rabbitmq.ha; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.service.OrderService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; /** * <br> * 標題: 訂單建立測試<br> * 描述: 訂單建立測試<br> * 時間: 2018/09/07<br> * * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired private OrderService orderService; @Test public void testCreateOrder(){ Order order = new Order(); order.setId(String.valueOf(System.currentTimeMillis())); order.setName("測試建立訂單"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderService.create(order); } }