場景1:對於消息處理失敗,有可能有因爲網絡波動致使的數據處理異常,待網絡穩定時消息就會正常處理,對於這種處理失敗,應該繼續嘗試去處理消息java
場景2:消息重複處理,例如咱們經過消息隊列向數據庫中添加數據,因爲數據庫網絡波動,致使數據庫鏈接超時,而咱們的系統認爲消息處理失敗,就會把消息回滾到消息隊列,繼續嘗試處理,這時就會形成消息重複處理的現象,對於重要的消息,咱們能夠每處理一條消息,就記錄一下,處理新的消息時,進行判斷消息是否已經處理,若是已經處理,就丟棄消息 設置firstQueue隊列爲手動ack處理數據庫
@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.addQueues(queueConfig.firstQueue()); simpleMessageListenerContainer.setExposeListenerChannel(true); simpleMessageListenerContainer.setMaxConcurrentConsumers(5); simpleMessageListenerContainer.setConcurrentConsumers(1); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 simpleMessageListenerContainer.setMessageListener(firstConsumer); return simpleMessageListenerContainer; }
生產者服務器
public void send(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); for (int i = 0; i < 5; i++) { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1, (Object) (String.valueOf(message)+i), correlationId); } }
@Component public class FirstConsumer implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); // 處理消息 System.out.println("FirstConsumer {} handleMessage :"+msg); } }
執行結果,發現只消費了一條(可是未ack)網絡
因爲未確認ack,故在rabbitmq的界面上看到的firstQueue隊列的信息見下圖ide
從圖中看到隊列中有5個消息,unacked(表示未ack確認的有一個),還有四個準備中,消息就算程序收到了 可是未確認ACK致使消息服務器覺得他是未成功消費的 後續還會再發。ui
此時發現程序重啓,有接受到了ss0的這條數據。3d
@Component public class FirstConsumer implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 每次只接收一個信息 channel.basicQos(1); String msg = null; try { msg = new String(message.getBody()); //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); if (msg.equals("ss1")){ channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, "從新放入隊列中的數據".getBytes()); } } catch (Exception e) { e.printStackTrace(); System.out.println("FirstConsumer consumer fail"); // 丟棄信息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } // 處理消息 System.out.println("FirstConsumer {} handleMessage :"+msg); } }
打斷點查看下code
此時有斷點,再次去rabbitmq的界面查看如圖blog
此時會發現,隊列中只有2個消息了,還有一個未ack(由於打斷點了),故須要手動調用channel.basicAck告知服務器,消費者已經消費了能夠從隊列中刪除了。rabbitmq