沿用昨天的代碼,先定義交換機名稱和routing key名稱spring
DLX,Dead Letter Exchange 的縮寫,又死信郵箱、死信交換機。DLX就是一個普通的交換機,和通常的交換機沒有任何區別。 當消息在一個隊列中變成死信(dead message)時,經過這個交換機將死信發送到死信隊列中(指定好相關參數,rabbitmq會自動發送)。服務器
什麼是死信呢?什麼樣的消息會變成死信呢?app
public interface UserCenterMq { /** * 用戶系統exchange名 */ String MQ_EXCHANGE_USER = "user.topic.exchange"; /** * 發送紅包routing key */ String ROUTING_KEY_POST_REDPACKET = "post.redpacket"; //String ROUTING_KEY_POST_REDPACKET = "post.#"; /** * 死信隊列: */ String deadRoutingKey = "dead_routing_key"; String deadExchangeName = "dead_exchange"; }
資源文件配置dom
spring: rabbitmq: host: 192.168.5.182 port: 5672 username: admin password: admin virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual
寫RabbitMQ的配置文件分佈式
@Configuration public class RabbitmqConfig { /** * 紅包隊列名 */ public static final String RED_PACKET_QUEUE = "red.packet.queue"; /** * 死信隊列: */ public final static String deadQueueName = "dead_queue"; /** * 聲明隊列,此隊列用來接收用戶註冊的消息 * * @return */ @Bean public Queue redPacketQueue() { Queue queue = new Queue(RED_PACKET_QUEUE); return queue; } /** * 死信隊列: */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey); } @Bean public Binding bindingRedPacketDeadExchange() { return BindingBuilder.bind(redPacketQueue()).to(deadExchange()).with(deadRoutingKey); } @Bean public TopicExchange userTopicExchange() { return new TopicExchange(UserCenterMq.MQ_EXCHANGE_USER); } /** * 將紅包隊列和用戶的exchange作個綁定 * * @return */ @Bean public Binding bindingRedPacket() { Binding binding = BindingBuilder.bind(redPacketQueue()).to(userTopicExchange()) .with(UserCenterMq.ROUTING_KEY_POST_REDPACKET); return binding; } }
重寫消息發送生產者ide
@Component public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange, String routingKey,String context) { System.out.println("send content = " + context); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.convertAndSend(exchange, routingKey, context); } /** * 確認後回調: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("send ack fail, cause = " + cause); } else { System.out.println("send ack success"); } } /** * 失敗後return回調: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); } }
修改事務偵聽代碼(事務確認完成後發送消息到MQ)post
@Component public class UserTransactionEventListener { @Autowired private MessageSender messageSender; @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) public void beforeCommit(PayloadApplicationEvent<User> event) { System.out.println("before commit, id: " + event.getPayload().getId()); } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void afterCommit(PayloadApplicationEvent<User> event) { System.out.println("after commit, id: " + event.getPayload().getId()); } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION) public void afterCompletion(PayloadApplicationEvent<User> event) { System.out.println("after completion, id: " + event.getPayload().getId()); messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,UserCenterMq.ROUTING_KEY_POST_REDPACKET, JSONObject.toJSONString(event.getPayload())); //messageSender.send(UserCenterMq.MQ_EXCHANGE_USER,"post.redpacket", JSONObject.toJSONString(event.getPayload())); } @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK) public void afterRollback(PayloadApplicationEvent<User> event) { System.out.println("after rollback, id: " + event.getPayload().getId()); } }
在RabbitMQ的管理界面內能夠看到ui
已經發送了一個消息到該隊列,即這個user對象.this
在紅包模塊中,咱們來監聽這個消息隊列完成分佈式事務.spa
model
@Data public class RedPacket implements Serializable { private long redPacketId; private long userId; private Double redPacketAmount; }
dao(此處有一個red_packet表,3個字段,1個自增)
@Mapper public interface RedPacketDao { @Options(useGeneratedKeys = true, keyProperty = "red_packet_id") @Insert("insert into red_packet (user_id,red_packet_amount) values (#{userId},#{redPacketAmount})") void add(RedPacket redPacket); }
service
public interface RedPacketService { public void add(RedPacket redPacket); }
@Transactional @Service public class RedPacketServiceImpl implements RedPacketService { @Autowired private RedPacketDao redPacketDao; @Override public void add(RedPacket redPacket) { redPacketDao.add(redPacket); } }
RabbitMQ消費者(此處爲一註冊用戶就發一個十塊之內的隨機紅包)
@Component @RabbitListener(queues = RabbitmqConfig.RED_PACKET_QUEUE) public class PostRedPacketConsumer { @Autowired private RedPacketService redPacketService; @RabbitHandler public void postRedPacket(String userStr, Channel channel, Message message) throws IOException { try { //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); User user = JSONObject.parseObject(userStr,User.class); RedPacket redPacket = new RedPacket(); redPacket.setUserId(user.getId()); redPacket.setRedPacketAmount((double)(Math.random() * 10)); redPacketService.add(redPacket); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); System.out.println("receiver fail"); } } }
運行後,該隊列被消費掉
紅包表增長數據