<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.dynamic=true spring.rabbitmq.cache.connection.mode=channel
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
interface CityRepository extends Repository<City, Long> { Page<City> findAll(Pageable pageable); Page<City> findByNameContainingAndCountryContainingAllIgnoringCase(String name, String country, Pageable pageable); City findByNameAndCountryAllIgnoringCase(String name, String country); }
@Component public class RabbitConsumer { @RabbitHandler @RabbitListener(queues = "hello") public void process(@Payload String foo) { System.out.println(new Date() + ": " + foo); } }
@SpringBootApplication @EnableScheduling public class AmqpApplication { public static void main(String[] args) { SpringApplication.run(AmqpApplication.class, args); } }
控制檯輸出:git
Sun Sep 30 16:30:35 CST 2018: hello
到此,一個簡單的SpringBoot2.0
集成RabbitMQ
就完成了。 熟悉RabbitMQ
的小夥伴們應該知道,RabbitMQ
在通常的隊列基礎上,增長了ExChange
的概念。ExChange
有四種類型:Direct, Topic, Headers and Fanout。其中Headers實際不多使用,Direct較爲簡單。接下來將詳細介紹如何使用topic和Fanout。github
@Configuration public class TopicRabbitConfig { @Bean public Queue queueMessage1() { return new Queue(MQConst.TOPIC_QUEUENAME1); } @Bean public Queue queueMessage2() { return new Queue(MQConst.TOPIC_QUEUENAME2); } @Bean TopicExchange exchange() { return new TopicExchange(MQConst.TOPIC_EXCHANGE); } @Bean Binding bindingExchangeMessage(Queue queueMessage1, TopicExchange exchange) { // 將隊列1綁定到名爲topicKey.A的routingKey return BindingBuilder.bind(queueMessage1).to(exchange).with(MQConst.TOPIC_KEY1); } @Bean Binding bindingExchangeMessages(Queue queueMessage2, TopicExchange exchange) { // 將隊列2綁定到全部topicKey.開頭的routingKey return BindingBuilder.bind(queueMessage2).to(exchange).with(MQConst.TOPIC_KEYS); } }
@Component public class TopicConsumer { @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1) @RabbitHandler public void process1(String message) { System.out.println("queue:topic.message1,message:" + message); } @RabbitListener(queues = MQConst.TOPIC_QUEUENAME2) @RabbitHandler public void process2(String message) { System.out.println("queue:topic.message2,message:" + message); } }
在Producer類中添加:spring
// Topic rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEYS, "from keys"); rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "from key1");
再次啓動主類,控制檯輸出:spring-boot
queue:topic.message2,message:from keys queue:topic.message1,message:from key1 queue:topic.message2,message:from key1
@Configuration public class FanoutRabbitConfig { @Bean public Queue MessageA() { return new Queue(MQConst.FANOUT_QUEUENAME1); } @Bean public Queue MessageB() { return new Queue(MQConst.FANOUT_QUEUENAME2); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(MQConst.FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageB).to(fanoutExchange); } }
@Component public class FanoutConsumer { @RabbitListener(queues = MQConst.FANOUT_QUEUENAME1) @RabbitHandler public void process1(String message) { System.out.println("queue:fanout.message1,message:" + message); } @RabbitListener(queues = MQConst.FANOUT_QUEUENAME2) @RabbitHandler public void process2(String message) { System.out.println("queue:fanout.message2,message:" + message); } }
在Producer類中添加:ui
// FanOut rabbitTemplate.convertAndSend(MQConst.FANOUT_EXCHANGE, "", "fanout");
再次啓動主類,控制檯輸出:spa
queue:fanout.message2,message:fanout queue:fanout.message1,message:fanout
源碼地址:GitHubcode