RabbitMQ解決分佈式事務

案例:
經典案例,以目前流行點外賣的案例,用戶下單後,調用訂單服務,讓後訂單服務調用派單系統通知送外賣人員送單,這時候訂單系統與派單系統採用MQ異步通信。java

RabbitMQ解決分佈式事務原理: 採用最終一致性原理。
須要保證如下三要素
一、確認生產者必定要將數據投遞到MQ服務器中(採用MQ消息確認機制)
二、MQ消費者消息可以正確消費消息,採用手動ACK模式,使用不補償機制(注意重試冪等性問題)
三、如何保證第一個事務先執行,採用補償機制(補單機制),在建立一個補單消費者進行監聽,若是訂單沒有建立成功,進行補單。mysql

 

用傳統的HTTP協議不能解決高併發:web

 

 

MQ解決分佈式事務一致性 spring

案例中 訂單表 和 派單表必須一致!sql

用MQ 能夠作流量削峯值數據庫

MQ解決分佈式事務最終一致性思想apache

 1.   確保生產者消息 必定要投遞到MQ服務器端成功json

     

 

 若是生產者投遞消息到MQ服務器成功springboot

  場景1  若是消費者消費消息失敗了 服務器

        生產者是不須要回滾事務。 消費者採用手動ack應答方式  進行補償機制,補償的過程當中注意 冪等性 問題。

       分佈式事務中遵循base理論 遵循cpa理論

 如何確保生產者發送消息必定發送到MQ消息服務器端成功? confirm機制 確認應答機制

 場景2    若是生產者發送消息到MQ服務器端失敗 

      使用生產者重試機制進行發消息

 

派件表:

create TABLE platoon(
  id INT PRIMARY KEY AUTO_INCREMENT,
  orderId VARCHAR(255),
  takeout_userId int
 
)

 

訂單表:

create TABLE order_info(
  id INT PRIMARY KEY AUTO_INCREMENT,
  name VARCHAR(30),  
  order_money INT,
  orderId VARCHAR(255)
  
);

 

生產者   

 1.實現接口 implements RabbitTemplate.ConfirmCallback

 2. 重寫回調方法   成功 失敗的  調用  

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {

 

send方法裏面調用回調函數:

this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);

 

yml須要配置回調機制:

###開啓消息確認機制 confirms
publisher-confirms: true
publisher-returns: true

 

 重試也是有必定次數限制的 若是超過必定次數 就須要進行人工補償了

 

上面已經實現了確保消息發送給 消費者   此時的數據不一致問題 就是:

場景3.  如何保證第一個事務先執行,生產者投遞消息到MQ服務器成功,消費者消費消息成功了,可是訂單事務回滾了。    (生產者投遞消息給消費者消費成功 而後 生產者回滾了)  

    

 

MQ解決分佈式原理經過最終一致性解決整體框架圖:  交換機採用路由鍵模式  補單隊列和派但隊列都綁定同一個路由鍵 。

補償過程注意冪等問題!

 

 

 

 下單:

 

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mayikt</groupId>
    <artifactId>rabbitmq_order</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依賴 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴數據源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整個 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>
</project>

 

基礎包:

public interface ApiConstants {
    // 響應請求成功
    String HTTP_RES_CODE_200_VALUE = "success";
    // 系統錯誤
    String HTTP_RES_CODE_500_VALUE = "fial";
    // 響應請求成功code
    Integer HTTP_RES_CODE_200 = 200;
    // 系統錯誤
    Integer HTTP_RES_CODE_500 = 500;
    // 未關聯QQ帳號
    Integer HTTP_RES_CODE_201 = 201;

}
import org.springframework.stereotype.Component;

@Component
public class BaseApiService {

    public ResponseBase setResultError(Integer code, String msg) {
        return setResult(code, msg, null);
    }

    // 返回錯誤,能夠傳msg
    public ResponseBase setResultError(String msg) {
        return setResult(ApiConstants.HTTP_RES_CODE_500, msg, null);
    }

    // 返回成功,能夠傳data值
    public ResponseBase setResultSuccess(Object data) {
        return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, data);
    }

    // 返回成功,沒有data值
    public ResponseBase setResultSuccess() {
        return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, null);
    }

    // 返回成功,沒有data值
    public ResponseBase setResultSuccess(String msg) {
        return setResult(ApiConstants.HTTP_RES_CODE_200, msg, null);
    }

    // 通用封裝
    public ResponseBase setResult(Integer code, String msg, Object data) {
        return new ResponseBase(code, msg, data);
    }

}
import lombok.Data;

@Data
public class ResponseBase {

    private Integer rtnCode;
    private String msg;
    private Object data;

    public ResponseBase() {

    }

    public ResponseBase(Integer rtnCode, String msg, Object data) {
        super();
        this.rtnCode = rtnCode;
        this.msg = msg;
        this.data = data;
    }

    @Override
    public String toString() {
        return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }

}

補償隊列:

import lombok.Data;

@Data
public class ResponseBase {

    private Integer rtnCode;
    private String msg;
    private Object data;

    public ResponseBase() {

    }

    public ResponseBase(Integer rtnCode, String msg, Object data) {
        super();
        this.rtnCode = rtnCode;
        this.msg = msg;
        this.data = data;
    }

    @Override
    public String toString() {
        return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
    }

}

entity:

import lombok.Data;

@Data
public class OrderEntity {

    private Long id;
    // 訂單名稱
    private String name;
    // 下單金額
    private Double orderMoney;
    // 訂單id
    private String orderId;
}

mapper:

import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import com.mayikt.entity.OrderEntity;

public interface OrderMapper {

    @Insert(value = "INSERT INTO `order_info` VALUES (#{id}, #{name}, #{orderMoney},#{orderId})")
    @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
    public int addOrder(OrderEntity orderEntity);

    @Select("SELECT id as id ,name as name , order_money as orderMoney,orderId as orderId from order_info where orderId=#{orderId};")
    public OrderEntity findOrderId(@Param("orderId") String orderId);

}

MQ的配置:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitmqConfig {

    // 下單而且派單存隊列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 補單隊列,判斷訂單是否已經被建立
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下單而且派單交換機
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定義訂單隊列
    @Bean
    public Queue directOrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }

    // 2.定義補訂單隊列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }

    // 2.定義交換機
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.訂單隊列與交換機綁定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    // 3.補單隊列與交換機綁定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

}

service:

import java.util.UUID;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSONObject;
import com.mayikt.base.BaseApiService;
import com.mayikt.base.ResponseBase;
import com.mayikt.entity.OrderEntity;
import com.mayikt.mapper.OrderMapper;

@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public ResponseBase addOrderAndDispatch() {
        //先下單 訂單表插入數據
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setName("黃燜雞米飯");
        // 價格是300元
        orderEntity.setOrderMoney(300d);
        // 商品id
        String orderId = UUID.randomUUID().toString();
        orderEntity.setOrderId(orderId);
        // 1.先下單,建立訂單 (往訂單數據庫中插入一條數據)
        int orderResult = orderMapper.addOrder(orderEntity);
        System.out.println("orderResult:" + orderResult);
        if (orderResult <= 0) {
            return setResultError("下單失敗!");
        }
        // 2.訂單表插插入完數據後 訂單表發送 外賣小哥
        send(orderId);
    //    int i = 1/0;   //發生異常
        return setResultSuccess();
    }

    private void send(String orderId) {
        JSONObject jsonObect = new JSONObject();
        jsonObect.put("orderId", orderId);
        String msg = jsonObect.toJSONString();
        System.out.println("msg:" + msg);
        // 封裝消息
        Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8").setMessageId(orderId).build();
        // 構建回調返回的數據
        CorrelationData correlationData = new CorrelationData(orderId);
        // 發送消息
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);

    }

    // 生產消息確認機制 生產者往服務器端發送消息的時候 採用應答機制
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String orderId = correlationData.getId(); //id 都是相同的哦  全局ID
        System.out.println("消息id:" + correlationData.getId()); 
        if (ack) { //消息發送成功
            System.out.println("消息發送確認成功");
        } else {
            //重試機制
            send(orderId); 
            System.out.println("消息發送確認失敗:" + cause);
        }

    }

}

controller:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.mayikt.base.BaseApiService;
import com.mayikt.base.ResponseBase;
import com.mayikt.service.OrderService;

@RestController
public class OrderController extends BaseApiService {
    @Autowired
    private OrderService orderService;

    @RequestMapping("/addOrder")
    public ResponseBase addOrder() {
        return orderService.addOrderAndDispatch();
    }

}

yml:

spring:
  rabbitmq:
  ####鏈接地址
    host: 192.168.91.6
   ####端口號   
    port: 5672
   ####帳號 
    username: admin
   ####密碼  
    password: admin
   ### 地址
    virtual-host: /admin_toov5
    ###開啓消息確認機制 confirms
    publisher-confirms: true
    publisher-returns: true 
  #數據庫鏈接信息
  datasource: 
        name: test
        url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
        username: root
        password: root
        # 使用druid數據源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver

啓動類:

package com.mayikt;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan("com.mayikt.mapper")
@SpringBootApplication
public class AppOrder {

    public static void main(String[] args) {
        SpringApplication.run(AppOrder.class, args);
    }

}

 

派單:

 

 

Entity:

package com.mayikt.entity;

import lombok.Data;

@Data
public class DispatchEntity {

    private Long id;
    // 訂單號
    private String orderId;
    // 外賣員id
    private Long takeoutUserId;
    
}

Mapper:

package com.mayikt.mapper;

import org.apache.ibatis.annotations.Insert;

import com.mayikt.entity.DispatchEntity;

public interface DispatchMapper {

    /**
     * 新增派單任務
     */
    @Insert("INSERT into platoon values (null,#{orderId},#{takeoutUserId});")
    public int insertDistribute(DispatchEntity distributeEntity);

}

consumer:

 作一些路由器 隊列 路由鍵的綁定 聲明工做

package com.mayikt.rabbitmq.consumer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitmqConfig {

    // 下單而且派單存隊列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 補單隊列,判斷訂單是否已經被建立
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下單而且派單交換機
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定義派單隊列
    @Bean
    public Queue OrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }
/*
    // 2.定義補訂單隊列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }*/

    // 2.定義交換機
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.訂單隊列與交換機綁定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    /*// 3.補單隊列與交換機綁定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }*/

}

派件消費:

package com.mayikt.rabbitmq.consumer;

import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.mayikt.entity.DispatchEntity;
import com.mayikt.mapper.DispatchMapper;
import com.rabbitmq.client.Channel;

/**
 * 派單服務
 */
@Component
public class DispatchConsumer {
    @Autowired
    private DispatchMapper dispatchMapper;

    @RabbitListener(queues = "order_dic_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("派單服務平臺" + msg + ",消息id:" + messageId);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        if (StringUtils.isEmpty(orderId)) {
            // 日誌記錄
            return;
        }
        DispatchEntity dispatchEntity = new DispatchEntity();
        // 訂單id
        dispatchEntity.setOrderId(orderId);
        // 外賣員id
        dispatchEntity.setTakeoutUserId(12l);
        
        try {
            int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
            if (insertDistribute > 0) {
                // 手動簽收消息,通知mq服務器端刪除該消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // // 丟棄該消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

}

啓動類:

package com.mayikt;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan("com.mayikt.mapper")
@SpringBootApplication
public class AppDispatch {

    public static void main(String[] args) {
        SpringApplication.run(AppDispatch.class, args);
    }

}

yml:

spring:
  rabbitmq:
  ####鏈接地址
    host: 192.168.91.6
   ####端口號   
    port: 5672
   ####帳號 
    username: admin
   ####密碼  
    password: admin
   ### 地址
    virtual-host: /admin_toov5
    listener:
      simple:
        retry:
        ####開啓消費者(程序出現異常的狀況下會)進行重試
          enabled: true
         ####最大重試次數
          max-attempts: 5
        ####重試間隔次數
          initial-interval: 3000
        ####開啓手動ack  
        acknowledge-mode: manual
         
  #數據庫鏈接信息
  datasource: 
        name: test
        url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
        username: root
        password: root
        # 使用druid數據源
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
server:
  port: 8081

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itmayiedu</groupId>
    <artifactId>rabbitmq_stock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依賴 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴數據源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整個 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>
</project>
相關文章
相關標籤/搜索