Erlang和RabbitMQ:http://www.javashuo.com/article/p-wenfljnx-mo.htmlhtml
http://www.javashuo.com/article/p-minjciwm-mk.htmljava
推薦SpringCloud項目在線建立:https://start.spring.io/spring
不用上面這個也行,下面有代碼和依賴;app
gradle的依賴,和maven差很少:eclipse
buildscript { ext { springBootVersion = '2.1.1.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'xy.study' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { implementation('org.springframework.boot:spring-boot-starter-amqp') implementation('org.projectlombok:lombok:1.16.+') runtimeOnly('org.springframework.boot:spring-boot-devtools') testImplementation('org.springframework.boot:spring-boot-starter-test') }
配置文件application.propertiesmaven
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest server.port = 5678
RabbitMQ配置文件類(註釋的代碼能夠直接刪掉):函數
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * topic 是RabbitMQ中最靈活的一種方式,能夠根據routing_key自由的綁定不一樣的隊列 * 首先對topic規則配置 */ //@Configuration public class TopicRabbitConfig { final public static String QUEUE_NAME = "queue.name"; final public static String TEST_TOPIC_ROUTINGKEY = "test.topic.routingKey"; final public static String TEST_EXCHANGE_HAHA = "test.exchange.haha"; /** * 設置交換器的名稱 * @return *//* @Bean TopicExchange exchange() { return new TopicExchange(TopicRabbitConfig.TEST_EXCHANGE_HAHA); } *//** * 隊列名稱 * @return *//* @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.QUEUE_NAME); } *//** * 將指定routing key的名稱綁定交換器的隊列 * @param queueMessage * @param exchange * @return *//* @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY); }*/ /** * 匹配以topic開頭的路由鍵routing key * 交換機綁定多個隊列 */ /*@Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }*/ }
生產者,這裏根據Exchange和Routing Key,直接發送一個字符串:spring-boot
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.TopicRabbitConfig; @Component @Slf4j public class HelloSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 經過exchange和routingKey的方式 * rabbitTemplate.convertAndSend(String exchange, String routingKey, Object object) * @param i */ public void send(int i) { String context = "hello " + i; log.info("Sender : {}", context); this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TEST_EXCHANGE_HAHA,TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY, context); } }
消費者,綁定對應的Exchange,Queue和Routing Key,直接打印獲取的信息:post
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.TopicRabbitConfig; @Component @Slf4j public class HelloReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void onOrgDeleted(@Payload String msg) { log.info("HelloReceiver msg : {}",msg); } }
測試類,調用生產者發送信息的函數send,消費者會監聽消費:測試
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import xy.study.rabbitmq.producer.HelloSender; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private HelloSender sender; @Test public void testSend() { sender.send(666); } }
如圖,控制檯日誌,能生成消息,而且能被對應的消費者消費。
修改消費者的代碼以下:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.TopicRabbitConfig; @Component @Slf4j public class HelloReceiver { /** * 下面四個消費者,exchange和RoutingKey都相同,最後兩個消費者隊列名都相同 * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueName(@Payload String msg) { log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME,msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TopicRabbitConfig.QUEUE_NAME+".test", durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueNameTest(@Payload String msg) { log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME+".test",msg); } /** * 這裏個人消費者隊列名"123445",是亂寫的,也可以接受 * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = 123445+"", durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueNameNumber(@Payload String msg) { log.info("{}-----HelloReceiver msg : {}",123445+""+".test",msg); } /** * 因爲這個和上面的Exchange、RoutingKey、queue徹底相同,因此這兩個消費者,一條消息,只有一個能消費(隨機) * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = 123445+"", durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueNameNumberSame(@Payload String msg) { log.info("same+{}-----HelloReceiver msg : {}",123445+""+".test",msg); } }
再次執行測試,測試結果以下:
上面四個消費者代碼,Exchange和RoutingKey都相同,最後兩個消費者隊列名都相同。
根據結果可知,當Exchange和RoutingKey相同、queue不一樣時,全部消費者都能消費一樣的信息;
當Exchange和RoutingKey、queue都相同時(最後兩個消費者),消費者中只有一個能消費信息,其餘消費者都不能消費該信息。