案例:
經典案例,以目前流行點外賣的案例,用戶下單後,調用訂單服務,讓後訂單服務調用派單系統通知送外賣人員送單,這時候訂單系統與派單系統採用MQ異步通信。java
RabbitMQ解決分佈式事務原理: 採用最終一致性原理。
須要保證如下三要素
一、確認生產者必定要將數據投遞到MQ服務器中(採用MQ消息確認機制)
二、MQ消費者消息可以正確消費消息,採用手動ACK模式,使用不補償機制(注意重試冪等性問題)
三、如何保證第一個事務先執行,採用補償機制(補單機制),在建立一個補單消費者進行監聽,若是訂單沒有建立成功,進行補單。mysql
用傳統的HTTP協議不能解決高併發:web
MQ解決分佈式事務一致性 spring
案例中 訂單表 和 派單表必須一致!sql
用MQ 能夠作流量削峯值數據庫
MQ解決分佈式事務最終一致性思想apache
1. 確保生產者消息 必定要投遞到MQ服務器端成功json
若是生產者投遞消息到MQ服務器成功springboot
場景1 若是消費者消費消息失敗了 服務器
生產者是不須要回滾事務。 消費者採用手動ack應答方式 進行補償機制,補償的過程當中注意 冪等性 問題。
分佈式事務中遵循base理論 遵循cpa理論
如何確保生產者發送消息必定發送到MQ消息服務器端成功? confirm機制 確認應答機制
場景2 若是生產者發送消息到MQ服務器端失敗
使用生產者重試機制進行發消息
派件表:
create TABLE platoon( id INT PRIMARY KEY AUTO_INCREMENT, orderId VARCHAR(255), takeout_userId int )
訂單表:
create TABLE order_info( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(30), order_money INT, orderId VARCHAR(255) );
生產者
1.實現接口 implements RabbitTemplate.ConfirmCallback
2. 重寫回調方法 成功 失敗的 調用
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
send方法裏面調用回調函數:
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
yml須要配置回調機制:
###開啓消息確認機制 confirms
publisher-confirms: true
publisher-returns: true
重試也是有必定次數限制的 若是超過必定次數 就須要進行人工補償了
上面已經實現了確保消息發送給 消費者 此時的數據不一致問題 就是:
場景3. 如何保證第一個事務先執行,生產者投遞消息到MQ服務器成功,消費者消費消息成功了,可是訂單事務回滾了。 (生產者投遞消息給消費者消費成功 而後 生產者回滾了)
MQ解決分佈式原理經過最終一致性解決整體框架圖: 交換機採用路由鍵模式 補單隊列和派但隊列都綁定同一個路由鍵 。
補償過程注意冪等問題!
下單:
pom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mayikt</groupId> <artifactId>rabbitmq_order</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <!-- mysql 依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 阿里巴巴數據源 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.14</version> </dependency> <!-- SpringBoot整合Web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- springboot- 整個 lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project>
基礎包:
public interface ApiConstants { // 響應請求成功 String HTTP_RES_CODE_200_VALUE = "success"; // 系統錯誤 String HTTP_RES_CODE_500_VALUE = "fial"; // 響應請求成功code Integer HTTP_RES_CODE_200 = 200; // 系統錯誤 Integer HTTP_RES_CODE_500 = 500; // 未關聯QQ帳號 Integer HTTP_RES_CODE_201 = 201; }
import org.springframework.stereotype.Component; @Component public class BaseApiService { public ResponseBase setResultError(Integer code, String msg) { return setResult(code, msg, null); } // 返回錯誤,能夠傳msg public ResponseBase setResultError(String msg) { return setResult(ApiConstants.HTTP_RES_CODE_500, msg, null); } // 返回成功,能夠傳data值 public ResponseBase setResultSuccess(Object data) { return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, data); } // 返回成功,沒有data值 public ResponseBase setResultSuccess() { return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, null); } // 返回成功,沒有data值 public ResponseBase setResultSuccess(String msg) { return setResult(ApiConstants.HTTP_RES_CODE_200, msg, null); } // 通用封裝 public ResponseBase setResult(Integer code, String msg, Object data) { return new ResponseBase(code, msg, data); } }
import lombok.Data; @Data public class ResponseBase { private Integer rtnCode; private String msg; private Object data; public ResponseBase() { } public ResponseBase(Integer rtnCode, String msg, Object data) { super(); this.rtnCode = rtnCode; this.msg = msg; this.data = data; } @Override public String toString() { return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]"; } }
補償隊列:
import lombok.Data; @Data public class ResponseBase { private Integer rtnCode; private String msg; private Object data; public ResponseBase() { } public ResponseBase(Integer rtnCode, String msg, Object data) { super(); this.rtnCode = rtnCode; this.msg = msg; this.data = data; } @Override public String toString() { return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]"; } }
entity:
import lombok.Data; @Data public class OrderEntity { private Long id; // 訂單名稱 private String name; // 下單金額 private Double orderMoney; // 訂單id private String orderId; }
mapper:
import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import com.mayikt.entity.OrderEntity; public interface OrderMapper { @Insert(value = "INSERT INTO `order_info` VALUES (#{id}, #{name}, #{orderMoney},#{orderId})") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") public int addOrder(OrderEntity orderEntity); @Select("SELECT id as id ,name as name , order_money as orderMoney,orderId as orderId from order_info where orderId=#{orderId};") public OrderEntity findOrderId(@Param("orderId") String orderId); }
MQ的配置:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component public class RabbitmqConfig { // 下單而且派單存隊列 public static final String ORDER_DIC_QUEUE = "order_dic_queue"; // 補單隊列,判斷訂單是否已經被建立 public static final String ORDER_CREATE_QUEUE = "order_create_queue"; // 下單而且派單交換機 private static final String ORDER_EXCHANGE_NAME = "order_exchange_name"; // 1.定義訂單隊列 @Bean public Queue directOrderDicQueue() { return new Queue(ORDER_DIC_QUEUE); } // 2.定義補訂單隊列 @Bean public Queue directCreateOrderQueue() { return new Queue(ORDER_CREATE_QUEUE); } // 2.定義交換機 @Bean DirectExchange directOrderExchange() { return new DirectExchange(ORDER_EXCHANGE_NAME); } // 3.訂單隊列與交換機綁定 @Bean Binding bindingExchangeOrderDicQueue() { return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); } // 3.補單隊列與交換機綁定 @Bean Binding bindingExchangeCreateOrder() { return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey"); } }
service:
import java.util.UUID; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import com.mayikt.base.BaseApiService; import com.mayikt.base.ResponseBase; import com.mayikt.entity.OrderEntity; import com.mayikt.mapper.OrderMapper; @Service public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback { @Autowired private OrderMapper orderMapper; @Autowired private RabbitTemplate rabbitTemplate; public ResponseBase addOrderAndDispatch() { //先下單 訂單表插入數據 OrderEntity orderEntity = new OrderEntity(); orderEntity.setName("黃燜雞米飯"); // 價格是300元 orderEntity.setOrderMoney(300d); // 商品id String orderId = UUID.randomUUID().toString(); orderEntity.setOrderId(orderId); // 1.先下單,建立訂單 (往訂單數據庫中插入一條數據) int orderResult = orderMapper.addOrder(orderEntity); System.out.println("orderResult:" + orderResult); if (orderResult <= 0) { return setResultError("下單失敗!"); } // 2.訂單表插插入完數據後 訂單表發送 外賣小哥 send(orderId); // int i = 1/0; //發生異常 return setResultSuccess(); } private void send(String orderId) { JSONObject jsonObect = new JSONObject(); jsonObect.put("orderId", orderId); String msg = jsonObect.toJSONString(); System.out.println("msg:" + msg); // 封裝消息 Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(orderId).build(); // 構建回調返回的數據 CorrelationData correlationData = new CorrelationData(orderId); // 發送消息 this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData); } // 生產消息確認機制 生產者往服務器端發送消息的時候 採用應答機制 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String orderId = correlationData.getId(); //id 都是相同的哦 全局ID System.out.println("消息id:" + correlationData.getId()); if (ack) { //消息發送成功 System.out.println("消息發送確認成功"); } else { //重試機制 send(orderId); System.out.println("消息發送確認失敗:" + cause); } } }
controller:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.mayikt.base.BaseApiService; import com.mayikt.base.ResponseBase; import com.mayikt.service.OrderService; @RestController public class OrderController extends BaseApiService { @Autowired private OrderService orderService; @RequestMapping("/addOrder") public ResponseBase addOrder() { return orderService.addOrderAndDispatch(); } }
yml:
spring: rabbitmq: ####鏈接地址 host: 192.168.91.6 ####端口號 port: 5672 ####帳號 username: admin ####密碼 password: admin ### 地址 virtual-host: /admin_toov5 ###開啓消息確認機制 confirms publisher-confirms: true publisher-returns: true #數據庫鏈接信息 datasource: name: test url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE username: root password: root # 使用druid數據源 type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver
啓動類:
package com.mayikt; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @MapperScan("com.mayikt.mapper") @SpringBootApplication public class AppOrder { public static void main(String[] args) { SpringApplication.run(AppOrder.class, args); } }
派單:
Entity:
package com.mayikt.entity; import lombok.Data; @Data public class DispatchEntity { private Long id; // 訂單號 private String orderId; // 外賣員id private Long takeoutUserId; }
Mapper:
package com.mayikt.mapper; import org.apache.ibatis.annotations.Insert; import com.mayikt.entity.DispatchEntity; public interface DispatchMapper { /** * 新增派單任務 */ @Insert("INSERT into platoon values (null,#{orderId},#{takeoutUserId});") public int insertDistribute(DispatchEntity distributeEntity); }
consumer:
作一些路由器 隊列 路由鍵的綁定 聲明工做
package com.mayikt.rabbitmq.consumer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component public class RabbitmqConfig { // 下單而且派單存隊列 public static final String ORDER_DIC_QUEUE = "order_dic_queue"; // 補單隊列,判斷訂單是否已經被建立 public static final String ORDER_CREATE_QUEUE = "order_create_queue"; // 下單而且派單交換機 private static final String ORDER_EXCHANGE_NAME = "order_exchange_name"; // 1.定義派單隊列 @Bean public Queue OrderDicQueue() { return new Queue(ORDER_DIC_QUEUE); } /* // 2.定義補訂單隊列 @Bean public Queue directCreateOrderQueue() { return new Queue(ORDER_CREATE_QUEUE); }*/ // 2.定義交換機 @Bean DirectExchange directOrderExchange() { return new DirectExchange(ORDER_EXCHANGE_NAME); } // 3.訂單隊列與交換機綁定 @Bean Binding bindingExchangeOrderDicQueue() { return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); } /*// 3.補單隊列與交換機綁定 @Bean Binding bindingExchangeCreateOrder() { return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); }*/ }
派件消費:
package com.mayikt.rabbitmq.consumer; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.mayikt.entity.DispatchEntity; import com.mayikt.mapper.DispatchMapper; import com.rabbitmq.client.Channel; /** * 派單服務 */ @Component public class DispatchConsumer { @Autowired private DispatchMapper dispatchMapper; @RabbitListener(queues = "order_dic_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("派單服務平臺" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); String orderId = jsonObject.getString("orderId"); if (StringUtils.isEmpty(orderId)) { // 日誌記錄 return; } DispatchEntity dispatchEntity = new DispatchEntity(); // 訂單id dispatchEntity.setOrderId(orderId); // 外賣員id dispatchEntity.setTakeoutUserId(12l); try { int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity); if (insertDistribute > 0) { // 手動簽收消息,通知mq服務器端刪除該消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (Exception e) { e.printStackTrace(); // // 丟棄該消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
啓動類:
package com.mayikt; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @MapperScan("com.mayikt.mapper") @SpringBootApplication public class AppDispatch { public static void main(String[] args) { SpringApplication.run(AppDispatch.class, args); } }
yml:
spring: rabbitmq: ####鏈接地址 host: 192.168.91.6 ####端口號 port: 5672 ####帳號 username: admin ####密碼 password: admin ### 地址 virtual-host: /admin_toov5 listener: simple: retry: ####開啓消費者(程序出現異常的狀況下會)進行重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 3000 ####開啓手動ack acknowledge-mode: manual #數據庫鏈接信息 datasource: name: test url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE username: root password: root # 使用druid數據源 type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver server: port: 8081
pom:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>rabbitmq_stock</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <!-- mysql 依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 阿里巴巴數據源 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.14</version> </dependency> <!-- SpringBoot整合Web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- springboot- 整個 lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies> </project>