rabbitmq 之 ack

場景1:對於消息處理失敗,有可能有因爲網絡波動致使的數據處理異常,待網絡穩定時消息就會正常處理,對於這種處理失敗,應該繼續嘗試去處理消息java

場景2:消息重複處理,例如咱們經過消息隊列向數據庫中添加數據,因爲數據庫網絡波動,致使數據庫鏈接超時,而咱們的系統認爲消息處理失敗,就會把消息回滾到消息隊列,繼續嘗試處理,這時就會形成消息重複處理的現象,對於重要的消息,咱們能夠每處理一條消息,就記錄一下,處理新的消息時,進行判斷消息是否已經處理,若是已經處理,就丟棄消息 設置firstQueue隊列爲手動ack處理數據庫

@Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
        simpleMessageListenerContainer.setExposeListenerChannel(true);
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
        simpleMessageListenerContainer.setMessageListener(firstConsumer);
        return simpleMessageListenerContainer;
    }

 生產者服務器

public void send(String uuid,Object message) {
        CorrelationData correlationId = new CorrelationData(uuid);
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1,
                    (Object) (String.valueOf(message)+i), correlationId);
        }
    }

未確認ack的消費者 

@Component
public class FirstConsumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        String msg = new String(message.getBody());
       
        // 處理消息
        System.out.println("FirstConsumer {} handleMessage :"+msg);
    }
}

執行結果,發現只消費了一條(可是未ack)網絡

因爲未確認ack,故在rabbitmq的界面上看到的firstQueue隊列的信息見下圖ide

從圖中看到隊列中有5個消息,unacked(表示未ack確認的有一個),還有四個準備中,消息就算程序收到了 可是未確認ACK致使消息服務器覺得他是未成功消費的 後續還會再發。ui

此時發現程序重啓,有接受到了ss0的這條數據。3d

手動確認ack消費者

@Component
public class FirstConsumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        // 每次只接收一個信息
        channel.basicQos(1);
        String msg = null;
        try {
             msg = new String(message.getBody());
            //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            if (msg.equals("ss1")){
                channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
                        MessageProperties.PERSISTENT_TEXT_PLAIN, "從新放入隊列中的數據".getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("FirstConsumer  consumer fail");
            // 丟棄信息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
        // 處理消息
        System.out.println("FirstConsumer {} handleMessage :"+msg);
    }
}

打斷點查看下code

 

此時有斷點,再次去rabbitmq的界面查看如圖blog

此時會發現,隊列中只有2個消息了,還有一個未ack(由於打斷點了),故須要手動調用channel.basicAck告知服務器,消費者已經消費了能夠從隊列中刪除了。rabbitmq

相關文章
相關標籤/搜索