在我前面有一篇博客說到了rabbitMq實現與zk相似的watch功能,可是那一篇博客沒有代碼實例,後面本身補了一個demo,便於理解。demo中主要利用spring boot的配置方式,web
1、消費者(也就是watcher)配置spring
配置都採用spring的註解進行配置app
一、建立鏈接dom
@Bean public ConnectionFactory createConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//設置rabbitMq的ip和端口 connectionFactory.setAddresses("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }
二、建立交換機spring-boot
@Bean public Exchange fanoutExchange() { return new FanoutExchange("ex_rabbit_test"); }
建立一個名爲ex_rabbit_test的交換機,交換機的類型爲廣播類型(爲了實現消息的廣播)工具
三、建立隊列,並綁定到交換機上測試
@Bean public Queue queueOne() { return new Queue("queue_one", false, false, true); } @Bean public Binding bindingOne(Queue queueOne, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueOne) .to(fanoutExchange); }
每個消費者有本身的隊列,只消費本身隊列的消息;將隊列和交換機綁定以後,交換機會將生產者發出的消息放到全部綁定的隊列中,可是僅限廣播模式,其它模式會按照必定的路由規則進行消息路由,好比topic類型的交換機會按照routingKey路由消息。ui
注意:在廣播模式中,爲了實現消息監聽,每一個消費者須要各自起一個隊列,並且隊列名不相同,好比如今有另一個消費者:this
@Bean public Queue queueTwo() { return new Queue("queue_two", false, false, true); } @Bean public Binding BingdingTwo(Queue queueTwo, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueTwo) .to(fanoutExchange); }
如此一來,當生產者將消息發到交換機ex_rabbit_test中時,交換機就將消息發給queue_one和queue_two兩個隊列中,兩個消費者分別取兩個隊列的消息進行消費。spa
四、消費消息
@Bean public SimpleMessageListenerContainer execMessageContainerOne() {
//設置監聽者「容器」 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory());
//設置隊列名 container.setQueueNames("queue_one");
//設置監聽者數量,即消費線程數 container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer one"; consumerService.doProcess(usr, msg);//消費消息 } catch(Exception e) { e.printStackTrace(); } } }); return container; } @Bean public SimpleMessageListenerContainer execMessageContainerTwo() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory()); container.setQueueNames("queue_two"); container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) ->{ byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer two"; consumerService.doProcess(usr, msg);//消費消息 } catch (Exception e) { e.printStackTrace(); } } }); return container; }
consumerService提供消費消息的服務,執行以下方法
public void doProcess(String usr, String msg) { System.out.println(usr + " receive message from producer:" + msg); }
2、生產者配置
一、與消費者相同的方式創建rabbitMq的鏈接
二、與消費者相同的方式設置交換機,交換機名稱也爲ex_rabbit_test(若是rabbitmq中已經存在這個交換機,能夠不用建立)
三、關因而否創建隊列以及將隊列與交換機綁定,個人理解是這樣的:
若是在生產者的代碼裏面創建隊列並將其與交換機綁定,那麼就必須創建全部的消費者的隊列,並將全部隊列與交換機綁定,若是這樣作,消費者中就能夠省掉這個配置。事實上,這樣作是有點得不償失的,我不贊同這樣作,這裏只是說明這樣作也能夠達到目的。
四、建立rabbit模板(org.springframework.amqp.rabbit.core.RabbitTemplate)
@Bean public RabbitTemplate rabbitTemplateProducer() { RabbitTemplate rabbitTemplate = new RabbitTemplate(this.createConnectionFactory()); rabbitTemplate.setExchange("ex_rabbit_test"); return rabbitTemplate; }
五、實現消息發送
demo中使用spring web的方式啓動消息發送,下面是controller和service的代碼
@Controller @RequestMapping(value="/index") public class ProducerController { @Autowired private ProducerService producerService; @RequestMapping(value = "/send") @ResponseBody public String sendMsg(@RequestParam String msg) { producerService.send(msg); return "Success"; } }
@Service public class ProducerService { @Resource(name = "rabbitTemplateProducer") private RabbitTemplate rabbitTemplate; public void send(String msg) { String message = "Hello, consumer.Sending:" + msg; rabbitTemplate.convertAndSend(message); } }
3、pom文件
在consumer中只須要引入spring ampq的依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.3.RELEASE</version> </dependency> </dependencies>
在prudocer中須要引入spring ampq的依賴,另外因爲是啓動了web 項目,因此須要spring web的依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>1.5.3.RELEASE</version> </dependency> </dependencies>
4、啓動項目和測試結果
使用spring boot能夠快速啓動項目,首先,在8882端口上啓動producer,而後啓動consumer。經過在controller中定義的訪問地址http://localhost:8882/index/send?msg=hello everybody(此處的msg必須有,由於@RequestParam註解),能夠看到兩個消費者都消費了這條消息
Consumer one receive message from producer:Hello, consumer.Sending:hello everybody
Consumer two receive message from producer:Hello, consumer.Sending:hello everybody
從rabbitMq的後臺(http://localhost:15672 usrname:guest pasword:guest)能夠看到剛纔建立的交換機和隊列。
當消費者變多,或者爲了代碼的統一管理,每一個消費者的代碼須要相同,爲了實現廣播需求,須要爲每一個消費者設置不一樣的隊列名。這種狀況下,能夠採用UUID的方式,每一個消費者能夠建立一個惟一的隨機隊列名。UUID方式建立隊列名的代碼能夠在ampq的jar包中找到org.springframework.amqp.core.AnonymousQueue
public String generateName() { UUID uuid = UUID.randomUUID(); ByteBuffer bb = ByteBuffer.wrap(new byte[16]); bb.putLong(uuid.getMostSignificantBits()) .putLong(uuid.getLeastSignificantBits()); // TODO: when Spring 4.2.4 is the minimum Spring Framework version, use encodeToUrlSafeString() SPR-13784. return this.prefix + Base64Utils.encodeToString(bb.array()) .replaceAll("\\+", "-") .replaceAll("/", "_") // but this will remain .replaceAll("=", ""); }
能夠將UUID方法的返回值加在固定隊列名的後面,這樣就生成了一個惟一的隨機隊列名。關於UUID的描述能夠自行百度。
ps:前段時間看了spring cloud,看到其中的一個工具,spring cloud bus也能夠用做消息監聽,細察以後發現,spring cloud bus也是封裝了rabbitMq,實現了消息隊列。