SpringBoot集成RabbitMQ(註解+手動確認)

 

1.pom文件spring

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.yml配置文件併發

spring: #MQ配置 rabbitmq: addresses: 127.0.0.1 port: 5672 username: adminmq password: 123456 publisher-confirms: true publisher-returns: true template: retry: enabled: true mandatory: true listener: simple: acknowledge-mode: manual #併發消費者初始化值 concurrency: 10 #併發消費者的最大值 max-concurrency: 20 #每一個消費者每次監聽時可拉取處理的消息數量 prefetch: 5 direct: retry: enabled: true max-attempts: 1

3.消費者代碼(手動確認)spring-boot

/**
* msgByte:報文頭加報文體
* channel和message 消息確認機制
* queuesToDeclare = @Queue("${queueropertie.queue-name}")
*/
//點對點
//@RabbitListener(queuesToDeclare=@Queue(QueueAndExchangeProperties.afsendfirQueue))
//發佈訂閱
@RabbitListener(bindings = @QueueBinding(
 exchange = @Exchange(value = "${queueropertie.exchange}",durable = "true",type = "direct"), value = @Queue(value = "${queueropertie.queue-name}",durable = "true"), key = "${queueropertie.exchangekey}" )) @RabbitHandler public void monitoringMethod(byte[] msgByte, Channel channel, Message message) throws IOException { Map<String, Object> logMap = new ConcurrentHashMap<>(); try { //消息確認
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); monotoringInsertDB(msgByte,new ConcurrentHashMap<>()); } catch (Exception e) { //失敗後消息被確認
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); commonRabbitService.insertLogError(logMap, ERROR_104, e.getMessage()); LOGGER.error("mq接收消息失敗",e); } }

4.生產者fetch

@Autowired RabbitTemplate rabbitTemplate; //發佈訂閱 rabbitTemplate.convertAndSend(QueueAndExchangeProperties.afMQExchange, QueueAndExchangeProperties.afIcf, msgStr.getBytes());
//點對點
//rabbitTemplate.convertAndSend(QueueAndExchangeProperties.afsendsecQueue,msgStr);
相關文章
相關標籤/搜索