基於消息隊列實現分佈式事務

 準備工做

changgou_order庫新增數據表

DROP TABLE IF EXISTS `tb_task`;
CREATE TABLE `tb_task` (
  `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '任務id',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `delete_time` datetime DEFAULT NULL,
  `task_type` varchar(32) DEFAULT NULL COMMENT '任務類型',
  `mq_exchange` varchar(64) DEFAULT NULL COMMENT '交換機名稱',
  `mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
  `request_body` varchar(512) DEFAULT NULL COMMENT '任務請求的內容',
  `status` varchar(32) DEFAULT NULL COMMENT '任務狀態',
  `errormsg` varchar(512) DEFAULT NULL COMMENT '任務錯誤信息',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `tb_task_his`;
CREATE TABLE `tb_task_his` (
  `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '任務id',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `delete_time` datetime DEFAULT NULL,
  `task_type` varchar(32) DEFAULT NULL COMMENT '任務類型',
  `mq_exchange` varchar(64) DEFAULT NULL COMMENT '交換機名稱',
  `mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
  `request_body` varchar(512) DEFAULT NULL COMMENT '任務請求的內容',
  `status` varchar(32) DEFAULT NULL COMMENT '任務狀態',
  `errormsg` varchar(512) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

changgou_service_order_api添加相關實體類

@Table(name="tb_task")
public class Task {
​
    @Id
    private Long id;
​
    @Column(name = "create_time")
    private Date createTime;
​
    @Column(name = "update_time")
    private Date updateTime;
​
    @Column(name = "delete_time")
    private Date deleteTime;
​
    @Column(name = "task_type")
    private String taskType;
​
    @Column(name = "mq_exchange")
    private String mqExchange;
​
    @Column(name = "mq_routingkey")
    private String mqRoutingkey;
​
    @Column(name = "request_body")
    private String requestBody;
​
    @Column(name = "status")
    private String status;
​
    @Column(name = "errormsg")
    private String errormsg;
    
    //getter,setter略
}
@Table(name="tb_task_his")
public class TaskHis {
​
    @Id
    private Long id;
​
    @Column(name = "create_time")
    private Date createTime;
​
    @Column(name = "update_time")
    private Date updateTime;
​
    @Column(name = "delete_time")
    private Date deleteTime;
​
    @Column(name = "task_type")
    private String taskType;
​
    @Column(name = "mq_exchange")
    private String mqExchange;
​
    @Column(name = "mq_routingkey")
    private String mqRoutingkey;
​
    @Column(name = "request_body")
    private String requestBody;
​
    @Column(name = "status")
    private String status;
​
    @Column(name = "errormsg")
    private String errormsg;
​
    //getter,setter略
}

changgou_user新增積分日誌表

DROP TABLE IF EXISTS `tb_point_log`;
CREATE TABLE `tb_point_log` (
  `order_id` varchar(200) NOT NULL,
  `user_id` varchar(200) NOT NULL,
  `point` int(11) NOT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

changgou_service_user_api添加實體類 PointLog

@Table(name="tb_point_log")
public class PointLog {
​
    private String orderId;
    private String userId;
    private Integer point;
    //getter,setter略
}

changgou_service_order添加rabbitMQ配置類

@Configuration
public class RabbitMQConfig {
    //添加積分任務交換機
    public static final String EX_BUYING_ADDPOINTUSER = "ex_buying_addpointuser";
​
    //添加積分消息隊列
    public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";
​
    //完成添加積分消息隊列
    public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";
​
    //添加積分路由key
    public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";
​
    //完成添加積分路由key
    public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";
​
    /**
     * 交換機配置
     * @return the exchange
     */
    @Bean(EX_BUYING_ADDPOINTUSER)
    public Exchange EX_BUYING_ADDPOINTUSER() {
        return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
    }
    //聲明隊列
    @Bean(CG_BUYING_FINISHADDPOINT)
    public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
        Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
        return queue;
    }
    //聲明隊列
    @Bean(CG_BUYING_ADDPOINT)
    public Queue QUEUE_CG_BUYING_ADDPOINT() {
        Queue queue = new Queue(CG_BUYING_ADDPOINT);
        return queue;
    }
    /**
     * 綁定隊列到交換機 .
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
    }
    @Bean
    public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
    }
}

訂單服務添加任務併發送

修改添加訂單方法

當添加訂單的時候,添加任務表中相關數據, 局部代碼以下redis

//增長任務表記錄
Task task = new Task();
task.setCreateTime(new Date());
task.setUpdateTime(new Date());
task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE);
task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY);
​
Map map = new HashMap();
map.put("userName",order.getUsername());
map.put("orderId",order.getId());
map.put("point",order.getPayMoney());
task.setRequestBody(JSON.toJSONString(map));
taskMapper.insertSelective(task);

定時掃描任務表最新數據

訂單服務新增定時任務類,獲取小於系統當前時間的全部任務數據api

修改訂單服務啓動類,添加開啓定時任務註解
@EnableScheduling
 定義定時任務類
查詢最新數據

更新taskMapper新增方法,查詢全部小於系統當前時間的數據併發

public interface TaskMapper extends Mapper<Task> {
​
    @Select("SELECT * from tb_task WHERE update_time<#{currentTime}")
    @Results({@Result(column = "create_time",property = "createTime"),
            @Result(column = "update_time",property = "updateTime"),
            @Result(column = "delete_time",property = "deleteTime"),
            @Result(column = "task_type",property = "taskType"),
            @Result(column = "mq_exchange",property = "mqExchange"),
            @Result(column = "mq_routingkey",property = "mqRoutingkey"),
            @Result(column = "request_body",property = "requestBody"),
            @Result(column = "status",property = "status"),
            @Result(column = "errormsg",property = "errormsg")})
    List<Task> findTaskLessTanCurrentTime(Date currentTime);
}

任務類實現app

@Component
public class QueryPointTask {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @Autowired
    private TaskMapper taskMapper;
​
    @Scheduled(cron = "0 0/2 * * * ?")
    public void queryTask(){
​
        //1.獲取小於系統當前時間數據
        List<Task> taskList = taskMapper.findTaskLessTanCurrentTime(new Date());
​
        if (taskList!=null && taskList.size()>0){
            //將任務數據發送到消息隊列
            for (Task task : taskList) {
 rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(task));
            }
        }
    }
}

用戶服務更改積分

添加rabbitmq配置類(與訂單服務相同)

@Configuration
public class RabbitMQConfig {
    //添加積分任務交換機
    public static final String EX_BUYING_ADDPOINTURSE = "ex_buying_addpointurse";
​
    //添加積分消息隊列
    public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";
​
    //完成添加積分消息隊列
    public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";
​
    //添加積分路由key
    public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";
​
    //完成添加積分路由key
    public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";
​
    /**
     * 交換機配置
     * @return the exchange
     */
    @Bean(EX_BUYING_ADDPOINTURSE)
    public Exchange EX_DECLARE() {
        return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTURSE).durable(true).build();
    }
    //聲明隊列
    @Bean(CG_BUYING_FINISHADDPOINT)
    public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
        Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
        return queue;
    }
    //聲明隊列
    @Bean(CG_BUYING_ADDPOINT)
    public Queue QUEUE_CG_BUYING_ADDPOINT() {
        Queue queue = new Queue(CG_BUYING_ADDPOINT);
        return queue;
    }
    /**
     * 綁定隊列到交換機 .
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
    }
    @Bean
    public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
    }
}

定義消息監聽類

@Component
public class AddPointListener {
​
    @Autowired
    private UserService userService;
​
    @Autowired
    private RedisTemplate redisTemplate;
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @RabbitListener(queues = RabbitMQConfig.CG_BUYING_ADDPOINT)
    public void receiveMessage(String message){
​
        Task task = JSON.parseObject(message, Task.class);
​
        if (task == null || StringUtils.isEmpty(task.getRequestBody())){
            return;
        }
​
        //判斷redis中是否存在內容
        Object value = redisTemplate.boundValueOps(task.getId()).get();
        if (value != null){
            return;
        }
​
        //更新用戶積分
        int result = userService.updateUserPoints(task);
​
        if (result<=0){
            return;
        }
​
        //返回通知
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_FINISHADDPOINT_KEY,JSON.toJSONString(task));
​
    }
}

定義修改用戶積分實現

實現思路:ide

1)判斷當前訂單是否操做過ui

2)將任務存入redisspa

3)修改用戶積分日誌

4)添加積分日誌表記錄code

5)刪除redis中記錄orm

@Autowired
private PointLogMapper pointLogMapper;
​
    /**
     * 修改用戶積分
     * @param task
     * @return
     */
    @Override
    @Transactional
    public int updateUserPoints(Task task) {
        Map info = JSON.parseObject(task.getRequestBody(), Map.class);
        String userName = info.get("userName").toString();
        String orderId = info.get("orderId").toString();
        int point = (int) info.get("point");
​
        //判斷當前訂單是否操做過
        PointLog pointLog = pointLogMapper.findLogInfoByOrderId(orderId);
        if (pointLog != null){
            return 0;
        }
​
        //將任務存入redis
        redisTemplate.boundValueOps(task.getId()).set("exist",1,TimeUnit.MINUTES);
​
        //修改用戶積分
        int result = userMapper.updateUserPoint(userName, point);
        if (result<=0){
            return result;
        }
​
        //添加積分日誌表記錄
        pointLog = new PointLog();
        pointLog.setOrderId(orderId);
        pointLog.setPoint(point);
        pointLog.setUserId(userName);
        result = pointLogMapper.insertSelective(pointLog);
        if (result<=0){
            return result;
        }
​
        //刪除redis中的記錄
        redisTemplate.delete(task.getId());
​
        return 1;
    }

定義根據訂單id查詢積分日誌表

定義PointLogMapper,實現根據訂單id查詢

public interface PointLogMapper extends Mapper<PointLog> {
​
    @Select("select * from tb_point_log where order_id=#{orderId}")
    PointLog findLogInfoByOrderId(@Param("orderId") String orderId);
}

訂單服務刪除原任務

定義監聽類

在訂單服務中定義監聽類,用於監聽隊列,若是隊列中有消息,則刪除原任務防止消息重複發送,並對任務信息進行記錄

@Component
public class DelTaskListener {
​
    @Autowired
    private TaskService taskService;
​
    @RabbitListener(queues = RabbitMQConfig.CG_BUYING_FINISHADDPOINT)
    public void receiveMessage(String message){
​
        Task task = JSON.parseObject(message, Task.class);
​
        taskService.delTask(task);
    }
}

定義任務service

public interface TaskService {
​
    void delTask(Task task);
}
@Service
@Transactional
public class TaskServiceImpl implements TaskService {
​
    @Autowired
    private TaskMapper taskMapper;
​
    @Autowired
    private TaskHisMapper taskHisMapper;
​
​
    @Override
    public void delTask(Task task) {
        //1. 設置刪除時間
        task.setDeleteTime(new Date());
        Long id = task.getId();
        task.setId(null);
​
        //bean複製
        TaskHis taskHis = new TaskHis();
        BeanUtils.copyProperties(task,taskHis);
​
       //記錄任務信息
        taskHisMapper.insertSelective(taskHis);
​
        //刪除原任務
        task.setId(id);
        taskMapper.deleteByPrimaryKey(task);
    }
}
相關文章
相關標籤/搜索