給註冊用戶發紅包,RabbitMQ實現(分佈式事務2)

沿用昨天的代碼,先定義交換機名稱和routing key名稱spring

死信隊列

DLX,Dead Letter Exchange 的縮寫,又死信郵箱、死信交換機。DLX就是一個普通的交換機,和通常的交換機沒有任何區別。 當消息在一個隊列中變成死信(dead message)時,經過這個交換機將死信發送到死信隊列中(指定好相關參數,rabbitmq會自動發送)。服務器

什麼是死信呢?什麼樣的消息會變成死信呢?app

  • 消息被拒絕(basic.reject或basic.nack)而且requeue=false.
  • 消息TTL過時
  • 隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)
public interface UserCenterMq {

   /**
    * 用戶系統exchange名
    */
   String MQ_EXCHANGE_USER = "user.topic.exchange";

   /**
    * 發送紅包routing key
    */
   String ROUTING_KEY_POST_REDPACKET = "post.redpacket";
   //String ROUTING_KEY_POST_REDPACKET = "post.#";
   /**
    * 死信隊列:
    */
   String deadRoutingKey = "dead_routing_key";
   String deadExchangeName = "dead_exchange";
}

資源文件配置dom

spring:
  rabbitmq:
      host: 192.168.5.182
      port: 5672
      username: admin
      password: admin
      virtual-host: /
      publisher-confirms: true
      publisher-returns: true
      listener:
        simple:
          acknowledge-mode: manual

寫RabbitMQ的配置文件分佈式

@Configuration
public class RabbitmqConfig {

   /**
    * 紅包隊列名
    */
   public static final String RED_PACKET_QUEUE = "red.packet.queue";
   /**
    * 死信隊列:
    */
   public final static String deadQueueName = "dead_queue";
   /**
    * 聲明隊列,此隊列用來接收用戶註冊的消息
    * 
    * @return
    */
   @Bean
   public Queue redPacketQueue() {
      Queue queue = new Queue(RED_PACKET_QUEUE);
      return queue;
   }
   /**
    * 死信隊列:
    */

   @Bean
   public Queue deadQueue() {
      Queue queue = new Queue(deadQueueName, true);
      return queue;
   }

   @Bean
   public DirectExchange deadExchange() {
      return new DirectExchange(deadExchangeName);
   }

   @Bean
   public Binding bindingDeadExchange() {
      return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey);
   }

   @Bean
   public Binding bindingRedPacketDeadExchange() {
      return BindingBuilder.bind(redPacketQueue()).to(deadExchange()).with(deadRoutingKey);
   }

   @Bean
   public TopicExchange userTopicExchange() {
      return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER);
   }

   /**
    * 將紅包隊列和用戶的exchange作個綁定
    * 
    * @return
    */
   @Bean
   public Binding bindingRedPacket() {
      Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange())
            .with(UserCenterMq.ROUTING_KEY_POST_REDPACKET);
      return binding;
   }
}

重寫消息發送生產者ide

@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange, String routingKey,String context) {
        System.out.println("send content = " + context);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend(exchange, routingKey, context);
    }

    /**
     * 確認後回調:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.out.println("send ack fail, cause = " + cause);
        } else {
            System.out.println("send ack success");
        }
    }

    /**
     * 失敗後return回調:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }
}

修改事務偵聽代碼(事務確認完成後發送消息到MQ)post

@Component
public class UserTransactionEventListener {
    @Autowired
    private MessageSender messageSender;

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void beforeCommit(PayloadApplicationEvent<User> event) {
        System.out.println("before commit, id: " + event.getPayload().getId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void afterCommit(PayloadApplicationEvent<User> event) {
        System.out.println("after commit, id: " + event.getPayload().getId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
    public void afterCompletion(PayloadApplicationEvent<User> event) {
        System.out.println("after completion, id: " + event.getPayload().getId());
        messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload()));
        //messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,"post.redpacket", JSONObject.toJSONString(event.getPayload()));
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void afterRollback(PayloadApplicationEvent<User> event) {
        System.out.println("after rollback, id: " + event.getPayload().getId());
    }
}

在RabbitMQ的管理界面內能夠看到ui

已經發送了一個消息到該隊列,即這個user對象.this

在紅包模塊中,咱們來監聽這個消息隊列完成分佈式事務.spa

model

@Data
public class RedPacket implements Serializable {
    private long redPacketId;
    private long userId;
    private Double redPacketAmount;
}

dao(此處有一個red_packet表,3個字段,1個自增)

@Mapper
public interface RedPacketDao {
    @Options(useGeneratedKeys = true, keyProperty = "red_packet_id")
    @Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})")
    void add(RedPacket redPacket);
}

service

public interface RedPacketService {
    public void add(RedPacket redPacket);
}
@Transactional
@Service
public class RedPacketServiceImpl implements RedPacketService {
    @Autowired
    private RedPacketDao redPacketDao;
    @Override
    public void add(RedPacket redPacket) {
        redPacketDao.add(redPacket);
    }
}

RabbitMQ消費者(此處爲一註冊用戶就發一個十塊之內的隨機紅包)

@Component
@RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE)
public class PostRedPacketConsumer {
    @Autowired
    private RedPacketService redPacketService;

    @RabbitHandler
    public void postRedPacket(String userStr, Channel channel, Message message) throws IOException {
        try {
            //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            User user = JSONObject.parseObject(userStr,User.class);
            RedPacket redPacket = new RedPacket();
            redPacket.setUserId(user.getId());
            redPacket.setRedPacketAmount((double)(Math.random() * 10));
            redPacketService.add(redPacket);
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }
    }
}

運行後,該隊列被消費掉

紅包表增長數據

相關文章
相關標籤/搜索