消息隊列 - Spring Boot 對rabbitmq批量處理數據的支持html
一丶前言git
在生產中,存在一些場景,須要對數據進行批量操做。如,能夠先將數據存放到redis,而後將數據進行批量寫進數據庫。可是使用redis,不得不面對一個數據容易丟失的問題。也能夠考慮使用消息隊列進行替換,在數據持久化,數據不丟失方面,消息隊列確實比redis好一點,畢竟設計不同。是否是使用消息隊列,就必定好呢?不是的,首先使用消息隊列,不能確保數據百分百不丟失,(若是要作到百分百不丟失,設計上就會比較複雜),除此以外,還要面對數據重複的問題。消息丟失,消息重複,是使用消息隊列必須面對的問題。web
AMQP在協議上規定每次只能傳送一條數據,所以作批量數據操做,須要在應用層上定義,Spring 目前已經提供 (來源資料)redis
M-m-m. No, there is no such a functionality. Only one message can be read at a time from the queue. And it is on the protocol level. That's why we have introduced that artificial
BatchingRabbitTemplate
to batch on the application level, before protocol. – Artem Bilan Nov 30 '16 at 12:02spring
二丶spring rabbit mq 支持批量操做的版本數據庫
關於起始版本,筆者還沒有查找到佐證資料,目前筆者所使用的是2.2.2版本緩存
批量發送app
批量監聽(其實不必定須要實現該接口,目前筆者的實現是使用該接口,其餘能夠自行查看官方文檔)ide
官方文檔資料測試
三丶實現
1. 簡單配置測試隊列
//測試批量 public static final String BATCH_QUEUE_NAME="batch.queue"; @Bean public Queue batchQueue(){ return new Queue(BATCH_QUEUE_NAME); }
2. 配置批量發送template
@Bean("batchQueueTaskScheduler") public TaskScheduler batchQueueTaskScheduler(){ TaskScheduler taskScheduler=new ThreadPoolTaskScheduler(); return taskScheduler; } //批量處理rabbitTemplate @Bean("batchQueueRabbitTemplate") public BatchingRabbitTemplate batchQueueRabbitTemplate(ConnectionFactory connectionFactory, @Qualifier("batchQueueTaskScheduler") TaskScheduler taskScheduler){ //!!!重點: 所謂批量, 就是spring 將多條message從新組成一條message, 發送到mq, 從mq接受到這條message後,在從新解析成多條message //一次批量的數量 int batchSize=10; // 緩存大小限制,單位字節, // simpleBatchingStrategy的策略,是判斷message數量是否超過batchSize限制或者message的大小是否超過緩存限制, // 緩存限制,主要用於限制"組裝後的一條消息的大小" // 若是主要經過數量來作批量("打包"成一條消息), 緩存設置大點 // 詳細邏輯請看simpleBatchingStrategy#addToBatch() int bufferLimit=1024; //1 K long timeout=10000; //注意,該策略只支持一個exchange/routingKey //A simple batching strategy that supports only one exchange/routingKey BatchingStrategy batchingStrategy=new SimpleBatchingStrategy(batchSize,bufferLimit,timeout); return new BatchingRabbitTemplate(connectionFactory,batchingStrategy,taskScheduler); }
3. 批量監聽 (注意, 批量發送和批量監聽能夠各自獨立使用)
a. 配置監聽容器(這裏是必須的!!!)
@Bean("batchQueueRabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //設置批量 factory.setBatchListener(true); factory.setConsumerBatchEnabled(true);//設置BatchMessageListener生效 factory.setBatchSize(10);//設置監聽器一次批量處理的消息數量 return factory; }
b. 配置監聽器
@Slf4j @Component public class BatchQueueListener implements BatchMessageListener { //批量接收處理 @RabbitListener(queues = RabbitMqConfig2.BATCH_QUEUE_NAME,containerFactory = "batchQueueRabbitListenerContainerFactory") @Override public void onMessageBatch(List<Message> messages) { log.info("batch.queue.consumer 收到{}條message", messages.size()); if(messages.size()>0){ log.info("第一條數據是: {}", new String(messages.get(0).getBody())); } } }
4. 測試
// --------------------------- 測試batch @Autowired BatchingRabbitTemplate batchQueueRabbitTemplate; @Test public void batchSend() throws InterruptedException { // 除了send(String exchange, String routingKey, Message message, CorrelationData correlationData)方法是發送單條數據 // 其餘send都是批量發送 //批量發送 long timestamp=System.currentTimeMillis(); String msg; Message message; MessageProperties messageProperties=new MessageProperties(); for(int i=0;i<1000;i++){ msg="batch."+timestamp+"-"+i; message=new Message(msg.getBytes(), messageProperties); batchQueueRabbitTemplate.send(RabbitMqConfig2.BATCH_QUEUE_NAME,message); // defaultRabbitTemplate.convertAndSend(RabbitMqConfig2.BATCH_QUEUE_NAME, msg.getBytes()); } System.out.println("發送數據完畢"); System.out.println("等待30s"); TimeUnit.SECONDS.sleep(30); //等待消費者消費 }
5. 輸出結果
解析:
設置批量監聽處理的數量爲10,爲何輸出是100呢?
由於使用了批量發送, 配置批量發送是將10條數據壓縮成1條, 批量監聽收到的是壓縮後的10條,解析後,變成100條,沒毛病
參考資料: