一、背景java
Springboot4RabbitMQ爲消息的生產者git
RabbitProducer爲消息的消費者github
消息隊列爲:order-queuespring
二、問題app
生產者生產的消息發送到RabbitMQ後,消費者在讀取消息消息時反序列化消息失敗ide
消息類:post
public class Order implements Serializable { private static final long serialVersionUID = -7128203829971899888L; private String id; private String name; private String messageId; public String getId() { return id; } public void setId(String id) { this.id = id == null ? null : id.trim(); } public String getName() { return name; } public void setName(String name) { this.name = name == null ? null : name.trim(); } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId == null ? null : messageId.trim(); } @Override public String toString() { return "Order [id=" + id + ", name=" + name + ", messageId=" + messageId + "]"; } }
生產者發送消息後RabbitMQ中的數據爲測試
能夠看到order-queue隊列中有一條待確認的消息,ui
消費者異常信息爲:this
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.rabbit.producer.RabbitProducer.receiver.OrderRecevier.onOrderMessage(com.rabbit.producer.RabbitProducer.entity.Order,com.rabbitmq.client.Channel,java.util.Map<java.lang.String, java.lang.Object>) throws java.lang.Exception] Bean [com.rabbit.producer.RabbitProducer.receiver.OrderRecevier@600b7b3d] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:185) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.rabbit.Springboot4RabbitMQ.entity.Order] to [com.rabbit.producer.RabbitProducer.entity.Order] for GenericMessage [payload=Order [id=RabbitMQTestId0002, name=HelloWorld, messageId=1538919928275$0836e0e7-4976-457e-92fb-44b937255855], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=order.ABC, amqp_receivedExchange=order-exchange, amqp_deliveryTag=1, amqp_consumerQueue=order-queue, amqp_redelivered=false, id=0ffe4dcd-048f-f274-bca9-5550f9ecebb1, amqp_consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, contentType=application/x-java-serialized-object, timestamp=1538919929083}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE] ... 10 common frames omitted 2018-10-07 21:45:29.111 WARN 5264 --- [cTaskExecutor-2] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@75b3a922(byte[206])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order-exchange, receivedRoutingKey=order.ABC, deliveryTag=1, consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, consumerQueue=order-queue])
從異常信息中能夠看到是消費者對消息反序列化的時候失敗了。雖然兩個項目中的Order類是徹底同樣的,但在進行反序列化的時候仍是失敗了
三、解決方案
方案一、消費者引用生產者項目中的消息體即Order.java
在消費者項目上【右鍵】->【Bulid Path】->【Configure Build Path】->【Projects】->【Add】 選擇生產者項目,而後消費者項目就能夠引用生產者項目中類,這樣徹底保證了兩個項目中JavaBean是一致的,因此能解決反序列失敗的問題
方案二、生產者在發送消息前將消息體轉換爲JSONObject,消費者以JSONObject接收消息,再轉換爲對應的JavaBean
四、測試結果
1)生產者將消息轉換爲JSONObject
public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); // 消息惟一ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); rabbitTemplate.convertAndSend("order-exchange", "order.ABC", FastJsonConvertUtil.toJsonObject(order), correlationData); }
2) 消費者使用JSONObject接收消息再轉換成對應的JavaBean
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationeExceptions}"), key = "${spring.rabbitmq.listener.order.key}")) public void onOrderMessage(@Payload JSONObject object, Channel channel, @Headers Map<String, Object> headers) throws Exception { System.err.println("----------------------------------"); Order order = JsonConvertUtils.convertJSONToObject(object); System.err.println("消費端Order: " + order.toString()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); }
3) postman發送用於生產的消息體:
4)消費者控制檯成功打印消息體內容
項目的源代碼地址:https://github.com/KillMonkey/Springboot4RabbitMQ