本文主要研究一下如何使用reactor-rabbitmqreact
<dependency> <groupId>io.projectreactor.rabbitmq</groupId> <artifactId>reactor-rabbitmq</artifactId> <version>1.0.0.M2</version> </dependency>
@Test public void testProducer() throws InterruptedException { int count = 100; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername("myuser"); connectionFactory.setPassword("mypass"); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)}, "reactive-sender")) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count) .map(i -> new OutboundMessage("", QUEUE, ("Message_" + i).getBytes()))); CountDownLatch latch = new CountDownLatch(count); sender.declareQueue(QueueSpecification.queue(QUEUE)) .thenMany(confirmations) .doOnError(e -> LOGGER.error("Send failed", e)) .subscribe(r -> { if (r.isAck()) { LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody())); latch.countDown(); } }); latch.await(10, TimeUnit.SECONDS); sender.close(); } @Test public void testConsumer() throws InterruptedException { int count = 100; CountDownLatch latch = new CountDownLatch(count); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername("myuser"); connectionFactory.setPassword("mypass"); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)}, "reactive-sender")) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE)); ReceiverOptions receiverOptions = new ReceiverOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)}, "reactive-receiver")) .connectionSubscriptionScheduler(Schedulers.elastic()); Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions); Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE); Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> { LOGGER.info("Received message {}", new String(m.getBody())); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); sender.close(); receiver.close(); }
reactor-rabbitmq對rabbitmq的api進行封裝,改造爲reactive streams模式,提供了Non-blocking Back-pressure以及End-to-end Reactive Pipeline特性。git