rabbitMq與spring boot搭配實現監聽

  在我前面有一篇博客說到了rabbitMq實現與zk相似的watch功能,可是那一篇博客沒有代碼實例,後面本身補了一個demo,便於理解。demo中主要利用spring boot的配置方式,web

1、消費者(也就是watcher)配置spring

配置都採用spring的註解進行配置app

一、建立鏈接dom

  @Bean
    public ConnectionFactory createConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
     //設置rabbitMq的ip和端口 connectionFactory.setAddresses(
"127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }

二、建立交換機spring-boot

    @Bean
    public Exchange fanoutExchange() {
        return new FanoutExchange("ex_rabbit_test");
    }

建立一個名爲ex_rabbit_test的交換機,交換機的類型爲廣播類型(爲了實現消息的廣播)工具

三、建立隊列,並綁定到交換機上測試

    @Bean
    public Queue queueOne() {
        return new Queue("queue_one", false, false, true);
    }

    @Bean
    public Binding bindingOne(Queue queueOne, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueOne)
                .to(fanoutExchange);
    }

每個消費者有本身的隊列,只消費本身隊列的消息;將隊列和交換機綁定以後,交換機會將生產者發出的消息放到全部綁定的隊列中,可是僅限廣播模式,其它模式會按照必定的路由規則進行消息路由,好比topic類型的交換機會按照routingKey路由消息。ui

注意:在廣播模式中,爲了實現消息監聽,每一個消費者須要各自起一個隊列,並且隊列名不相同,好比如今有另一個消費者:this

    @Bean
    public Queue queueTwo() {
        return new Queue("queue_two", false, false, true);
    }

    @Bean
    public Binding BingdingTwo(Queue queueTwo, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueTwo)
                .to(fanoutExchange);
    }

如此一來,當生產者將消息發到交換機ex_rabbit_test中時,交換機就將消息發給queue_one和queue_two兩個隊列中,兩個消費者分別取兩個隊列的消息進行消費。spa

四、消費消息

    @Bean
    public SimpleMessageListenerContainer execMessageContainerOne() {
     //設置監聽者「容器」 SimpleMessageListenerContainer container
= new SimpleMessageListenerContainer(createConnectionFactory());
     //設置隊列名 container.setQueueNames(
"queue_one");
     //設置監聽者數量,即消費線程數 container.setConcurrentConsumers(
1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer one"; consumerService.doProcess(usr, msg);//消費消息 } catch(Exception e) { e.printStackTrace(); } } }); return container; } @Bean public SimpleMessageListenerContainer execMessageContainerTwo() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory()); container.setQueueNames("queue_two"); container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) ->{ byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer two"; consumerService.doProcess(usr, msg);//消費消息 } catch (Exception e) { e.printStackTrace(); } } }); return container; }

consumerService提供消費消息的服務,執行以下方法

    public void doProcess(String usr, String msg) {
        System.out.println(usr + " receive message from producer:" + msg);
    }

2、生產者配置

一、與消費者相同的方式創建rabbitMq的鏈接

二、與消費者相同的方式設置交換機,交換機名稱也爲ex_rabbit_test(若是rabbitmq中已經存在這個交換機,能夠不用建立)

三、關因而否創建隊列以及將隊列與交換機綁定,個人理解是這樣的:

  若是在生產者的代碼裏面創建隊列並將其與交換機綁定,那麼就必須創建全部的消費者的隊列,並將全部隊列與交換機綁定,若是這樣作,消費者中就能夠省掉這個配置。事實上,這樣作是有點得不償失的,我不贊同這樣作,這裏只是說明這樣作也能夠達到目的。

四、建立rabbit模板(org.springframework.amqp.rabbit.core.RabbitTemplate)

    @Bean
    public RabbitTemplate rabbitTemplateProducer() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(this.createConnectionFactory());
        rabbitTemplate.setExchange("ex_rabbit_test");
        return rabbitTemplate;
    }

五、實現消息發送

  demo中使用spring web的方式啓動消息發送,下面是controller和service的代碼

@Controller
@RequestMapping(value="/index")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    @RequestMapping(value = "/send")
    @ResponseBody
    public String sendMsg(@RequestParam String msg) {
        producerService.send(msg);
        return "Success";
    }
}
@Service
public class ProducerService {

    @Resource(name = "rabbitTemplateProducer")
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        String message = "Hello, consumer.Sending:" + msg;
        rabbitTemplate.convertAndSend(message);
    }
}

3、pom文件

在consumer中只須要引入spring ampq的依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>
    </dependencies>

在prudocer中須要引入spring ampq的依賴,另外因爲是啓動了web 項目,因此須要spring web的依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>
    </dependencies>

4、啓動項目和測試結果

使用spring boot能夠快速啓動項目,首先,在8882端口上啓動producer,而後啓動consumer。經過在controller中定義的訪問地址http://localhost:8882/index/send?msg=hello everybody(此處的msg必須有,由於@RequestParam註解),能夠看到兩個消費者都消費了這條消息

Consumer one receive message from producer:Hello, consumer.Sending:hello everybody
Consumer two receive message from producer:Hello, consumer.Sending:hello everybody

從rabbitMq的後臺(http://localhost:15672  usrname:guest  pasword:guest)能夠看到剛纔建立的交換機和隊列。

當消費者變多,或者爲了代碼的統一管理,每一個消費者的代碼須要相同,爲了實現廣播需求,須要爲每一個消費者設置不一樣的隊列名。這種狀況下,能夠採用UUID的方式,每一個消費者能夠建立一個惟一的隨機隊列名。UUID方式建立隊列名的代碼能夠在ampq的jar包中找到org.springframework.amqp.core.AnonymousQueue

     public String generateName() {
            UUID uuid = UUID.randomUUID();
            ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
            bb.putLong(uuid.getMostSignificantBits())
              .putLong(uuid.getLeastSignificantBits());
            // TODO: when Spring 4.2.4 is the minimum Spring Framework version, use encodeToUrlSafeString() SPR-13784.
            return this.prefix + Base64Utils.encodeToString(bb.array())
                                    .replaceAll("\\+", "-")
                                    .replaceAll("/", "_")
                                    // but this will remain
                                    .replaceAll("=", "");
        }

能夠將UUID方法的返回值加在固定隊列名的後面,這樣就生成了一個惟一的隨機隊列名。關於UUID的描述能夠自行百度。

ps:前段時間看了spring cloud,看到其中的一個工具,spring cloud bus也能夠用做消息監聽,細察以後發現,spring cloud bus也是封裝了rabbitMq,實現了消息隊列。

相關文章
相關標籤/搜索