使用rabbitmq手動確認消息的,定時獲取隊列消息實現

描述問題

  最近項目中由於有些數據,須要推送到第三方系統中,由於數據會一直增長,而且須要與第三方系統作相關交互。html

相關業務

  本着不影響線上運行效率的思想,咱們將增長的消息放入rabbitmq,使用另外一個應用獲取消費,由於數據只是推送,而且業務的數據有15分鐘左右的更新策略,對實時性不是很高因此咱們須要一個定時任務來主動連接rabbit去消費,而後將數據以網絡方式傳送java

相關分析

  網絡上大體出現了相關的解決辦法,但因爲實現相關數據丟失及處理、性能和效率等相關基礎業務的工做量,望而卻步。。。。。。spring

  還好spring有相關的 org.springframework.amqp 工具包,簡化的大量麻煩>_> 讓咱們開始吧api

  瞭解rabbit的相關幾個概念數組

 瞭解了這幾個概念的時候你可能已經關注到了咱們今天的主題SimpleMessageListenerContainer網絡

 咱們使用SimpleMessageListenerContainer容器設置消費隊列監聽,而後設置具體的監聽Listener進行消息消費具體邏輯的編寫,經過SimpleRabbitListenerContainerFactory咱們能夠完成相關SimpleMessageListenerContainer容器的管理,app

  但對於使用此容器批量消費的方式,官方並無相關說明,網絡上你可能只找到這篇SimpleMessageListenerContainer批量消息處理對於問題描述是很清晰,可是回答只是說的比較簡單ide

  下面咱們就對這個問題的答案來個coding工具

解決辦法

  首先咱們由於須要失敗重試,使用spring的RepublishMessageRecoverer能夠解決這個問題,這顯然有一個缺點,即將在整個重試期間佔用線程。因此咱們使用了死信隊列性能

  相關配置

  1     @Bean
  2     ObjectMapper objectMapper() {
  3         ObjectMapper objectMapper = new ObjectMapper();
  4         DateFormat dateFormat = objectMapper.getDateFormat();
  5         JavaTimeModule javaTimeModule = new JavaTimeModule();
  6 
  7         SimpleModule module = new SimpleModule();
  8         module.addSerializer(new ToStringSerializer(Long.TYPE));
  9         module.addSerializer(new ToStringSerializer(Long.class));
 10         module.addSerializer(new ToStringSerializer(BigInteger.class));
 11 
 12         javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
 13         javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
 14         javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
 15 
 16         objectMapper.registerModule(module);
 17         objectMapper.registerModule(javaTimeModule);
 18         objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化擴展日期格式支持
 19         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
 20         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 21         return objectMapper;
 22 }
 23 
 24 
 25 
 26   @Bean
 27   RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
 28     return new RabbitAdmin(aConnectionFactory);
 29   }
 30 
 31   @Bean
 32   MessageConverter jacksonAmqpMessageConverter( ) {
 33     return new Jackson2JsonMessageConverter(objectMapper());
 34   }
 35 
 36 
 37   @Bean
 38   Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
 39     Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
 40     rabbitAdmin.declareQueue(queue);
 41     return queue;
 42   }
 43   @Bean
 44   Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
 45     Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
 46     rabbitAdmin.declareQueue(queue);
 47     return queue;
 48   }
 49   @Bean
 50   Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
 51     Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
 52     rabbitAdmin.declareQueue(queue);
 53     return queue;
 54   }
 55     /**
 56      * 聲明一個交換機
 57      * @return
 58      */
 59   @Bean
 60   TopicExchange controlExchange () {
 61       return new TopicExchange(Exchanges.ExangeTOPIC);
 62   }
 63 
 64 
 65     /**
 66      * 延時重試隊列
 67      */
 68     @Bean
 69     public Queue bcwPayControlRetryQueue() {
 70         Map<String, Object> arguments = new HashMap<>();
 71         arguments.put("x-message-ttl", 10 * 1000);
 72         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 73 //        若是設置死信會以路由鍵some-routing-key轉發到some.exchange.name,若是沒設默認爲消息發送到本隊列時用的routing key
 74         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 75         return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
 76     }
 77     /**
 78      * 延時重試隊列
 79      */
 80     @Bean
 81     public Queue bcwPushControlRetryQueue() {
 82         Map<String, Object> arguments = new HashMap<>();
 83         arguments.put("x-message-ttl", 10 * 1000);
 84         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 85 //        若是設置死信會以路由鍵some-routing-key轉發到some.exchange.name,若是沒設默認爲消息發送到本隊列時用的routing key
 86         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 87         return new Queue("queue_bcw@push@retry", true, false, false, arguments);
 88     }
 89     /**
 90      * 延時重試隊列
 91      */
 92     @Bean
 93     public Queue bcwPullControlRetryQueue() {
 94         Map<String, Object> arguments = new HashMap<>();
 95         arguments.put("x-message-ttl", 10 * 1000);
 96         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 97 //        若是設置死信會以路由鍵some-routing-key轉發到some.exchange.name,若是沒設默認爲消息發送到本隊列時用的routing key
 98 //        arguments.put("x-dead-letter-routing-key", "queue_bcw");
 99         return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
100     }
101     @Bean
102     public Binding  bcwPayControlRetryBinding() {
103         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
104     }
105     @Bean
106     public Binding  bcwPushControlRetryBinding() {
107         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
108     }
109     @Bean
110     public Binding   bcwPullControlRetryBinding() {
111         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
112     }
113 
114   /**
115    * 隊列綁定並關聯到RoutingKey
116    *
117    * @param queueMessages 隊列名稱
118    * @param exchange      交換機
119    * @return 綁定
120    */
121   @Bean
122   Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
123     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
124   }
125   /**
126    * 隊列綁定並關聯到RoutingKey
127    *
128    * @param queueMessages 隊列名稱
129    * @param exchange      交換機
130    * @return 綁定
131    */
132   @Bean
133   Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
134     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
135   }
136   /**
137    * 隊列綁定並關聯到RoutingKey
138    *
139    * @param queueMessages 隊列名稱
140    * @param exchange      交換機
141    * @return 綁定
142    */
143   @Bean
144   Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
145     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
146   }
147 
148   @Bean
149   @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
150   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
151           SimpleRabbitListenerContainerFactoryConfigurer configurer,
152           ConnectionFactory connectionFactory) {
153     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
154     configurer.configure(factory, connectionFactory);
155     factory.setMessageConverter(jacksonAmqpMessageConverter());
156     return factory;
157   }

下面就是咱們的主題,定時任務使用的是org.springframework.scheduling

  1 /**
  2  * 手動確認消息的,定時獲取隊列消息實現
  3  */
  4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
  5     protected final Logger logger = LoggerFactory.getLogger(getClass());
  6     private List<Message> body = new LinkedList<>();
  7     public long start_time;
  8     private Channel channel;
  9     @Autowired
 10     private ObjectMapper objectMapper;
 11     @Autowired
 12     private RabbitTemplate rabbitTemplate;
 13 
 14     public QuartzSimpleMessageListenerContainer() {
 15         // 手動確認
 16         this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 17 
 18         this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
 19             long current_time = System.currentTimeMillis();
 20             int time = (int) ((current_time - start_time)/1000);
 21             logger.info("====接收到{}隊列的消息=====",message.getMessageProperties().getConsumerQueue());
 22             Long retryCount = getRetryCount(message.getMessageProperties());
 23             if (retryCount > 3) {
 24                 logger.info("====此消息失敗超過三次{}從隊列的消息刪除=====",message.getMessageProperties().getConsumerQueue());
 25                 try {
 26                     channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
 27                 } catch (IOException ex) {
 28                     ex.printStackTrace();
 29                 }
 30                 return;
 31             }
 32 
 33             this.body.add(message);
 34             /**
 35              * 判斷數組數據是否滿了,判斷此監聽器時間是否大於執行時間
 36              * 若是在最後延時時間段內沒有業務消息,此監聽器會一直開着
 37              */
 38             if(body.size()>=3 || time>60){
 39                 this.channel = channel;
 40                 callback();
 41             }
 42         });
 43 
 44 
 45 
 46     }
 47     private void callback(){
 48 //         channel = getChannel(getTransactionalResourceHolder());
 49         if(body.size()>0 && channel !=null &&  channel.isOpen()){
 50             try {
 51                 callbackWork();
 52             }catch (Exception e){
 53                 logger.error("推送數據出錯:{}",e.getMessage());
 54 
 55                 body.stream().forEach(message -> {
 56                     Long retryCount = getRetryCount(message.getMessageProperties());
 57                     if (retryCount <= 3) {
 58                         logger.info("將消息置入延時重試隊列,重試次數:" + retryCount);
 59                         rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
 60                     }
 61                 });
 62 
 63             } finally{
 64 
 65                 logger.info("flsher too data");
 66 
 67                 body.stream().forEach(message -> {
 68                     //手動acknowledge
 69                     try {
 70                         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 71                     } catch (IOException e) {
 72                         logger.error("手動確認消息失敗!");
 73                         e.printStackTrace();
 74                     }
 75                 });
 76 
 77                 body.clear();
 78                 this.stop();
 79 
 80             }
 81         }
 82 
 83     }
 84     abstract void callbackWork() throws Exception;
 85     /**
 86      * 獲取消息失敗次數
 87      * @param properties
 88      * @return
 89      */
 90     private long getRetryCount(MessageProperties properties){
 91         long retryCount = 0L;
 92         Map<String,Object> header = properties.getHeaders();
 93         if(header != null && header.containsKey("x-death")){
 94             List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
 95             if(deaths.size()>0){
 96                 Map<String,Object> death = deaths.get(0);
 97                 retryCount = (Long)death.get("count");
 98             }
 99         }
100         return retryCount;
101     }
102 
103     @Override
104     @Scheduled(cron = "0 0/2 * * * ? ")
105     public void start() {
106         logger.info("start push data scheduled!");
107         //初始化數據,將未處理的調用stop方法,返還至rabbit
108         body.clear();
109         super.stop();
110         start_time = System.currentTimeMillis();
111         super.start();
112 
113         logger.info("end push data scheduled!");
114     }
115 
116     public List<WDNJPullOrder> getBody() {
117 
118         List<WDNJPullOrder> collect = body.stream().map(data -> {
119                     byte[] body = data.getBody();
120                     WDNJPullOrder readValue = null;
121                     try {
122                         readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
123                         });
124                     } catch (IOException e) {
125                         logger.error("處理數據出錯{}",e.getMessage());
126                     }
127                     return readValue;
128                 }
129         ).collect(Collectors.toList());
130 
131         return collect;
132 
133 
134     }
135 
136 }

 

後續

 

固然定時任務的啓動,你能夠寫到相關rabbit容器實現的裏面,可是這裏並非很須要,因此對於這個的小改動,同窗你能夠本身實現

 @Scheduled(cron = "0 0/2 * * * ? ")

public void start()
相關文章
相關標籤/搜索