springboot+rabbitmq整合

1.安裝好rabbitmqjava

2.新建一個springBoot項目:rabbitmq_demoweb

3.添加pom依賴:正則表達式

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-test</artifactId>  
        <scope>test</scope>  
    </dependency>  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

4.application.properties:spring

server.port=8080
spring.application.name=rabbitmq_demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

5.啓動類聲明一個Queue,用於測試:app

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

多場景實現:spring-boot

1.單生產者和單消費者測試

    生產者1:ui

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生產者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }
}

    消費者1:this

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消費者1
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1:" + hello);
    }
}

    測試controller:spa

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 測試類
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        return "ok";
    }
}

    運行項目,訪問http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017

    Receiver1:hello1 Thu May 11 17:23:31 CST 2017

2.單生產者-多消費者

    生產者1不變

    增長消費者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消費者2
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
    @RabbitHandler
    public void process(String mesg) {
        System.out.println("Receiver2:" + mesg);
    }
}

    測試controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 測試類
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender1.send();
        return "ok";
    }
}

    運行項目,訪問http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017
    Sender1:hello1 Thu May 11 17:23:31 CST 2017

    Receiver1:hello1 Thu May 11 17:23:31 CST 2017
    Receiver2:hello1 Thu May 11 17:23:31 CST 2017

    消息會被多個消費者交替消費,每條消息只能被一個消費者所接收。

3.多生產者-多消費者

    增長生產者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生產者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }
}

    消費者一、2不變

    測試controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 測試類
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }
}

    運行項目,訪問http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017
    Sender2:hello2 Thu May 11 17:23:31 CST 2017

    Receiver1:hello2 Thu May 11 17:23:31 CST 2017
    Receiver2:hello1 Thu May 11 17:23:31 CST 2017

    多個生產者將消息放入helloQueue的隊列中,隊列中的消息會被多個消費者交替消費,每條消息只能被一個消費者所接收。

4.實體類傳輸

    支持對象的發送和接收,實體類只須要支持序列化便可。

    實體類

package com.demo.model;

import java.io.Serializable;

/**
 * @Description:
 */
public class User implements Serializable {
    private String userName;

    private String password;

    private String sex;

    private String level;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getLevel() {
        return level;
    }

    public void setLevel(String level) {
        this.level = level;
    }
}

    生產者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生產者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }
}

    生產者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生產者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }
}

    消費者1:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消費者1
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1:" + hello);
    }

    @RabbitHandler
    public void processUser(User user) {
        System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword());
    }
}

    消費者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * helloQueue消費者2
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
    @RabbitHandler
    public void process(String mesg) {
        System.out.println("Receiver2:" + mesg);
    }

    @RabbitHandler
    public void processUser(User user) {
        System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword());
    }
}

    測試的controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 測試類
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    @RequestMapping("/user")
    public String user() {
        User user=new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }
}

    運行項目,訪問http:localhost:8080/user :

    user Sender1:a/1
    user Sender2:a/1

    user receive1:a/1
    user receive2:a/1

5.TopicExchange的使用

    啓動類新增聲明兩個Queue,用於測試:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    /***************************************隊列***********************************************/
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    @Bean
    public Queue topicMessage() {
        return new Queue("topicMessage");
    }

    @Bean
    public Queue topicMessages() {
        return new Queue("topicMessages");
    }
    /***************************************exchange***********************************************/
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    /***************************************將隊列和exchange綁定***********************************************/

    /**
     * 將隊列topicMessage與topicExchange綁定,
     * 只有欄目名爲topic.Message才能匹配,
     * 獲得當前的Queue
     * @param topicMessage
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
    }

    /**
     * 將隊列topicMessages與topicExchange綁定,
     * 以topic開頭的欄目名均會模糊匹配,
     * 獲得當前的Queue
     * @param topicMessages
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

    生產者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生產者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage1:" + msg);
        //第一個參數:指定了exchange
        //第二個參數:指定了接受消息的欄目名
        //第三個參數:消息內容
        //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages1:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }
}

    生產者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生產者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage2:" + msg);
        //第一個參數:指定了exchange
        //第二個參數:指定了接受消息的欄目名
        //第三個參數:消息內容
        //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages2:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }
}

    topicMessage消費者:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * topicMessage消費者
 */
@Component
@RabbitListener(queues = "topicMessage")
public class TopMessageReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("topMessageReceiver:" +msg);
    }
}

     topicMessages消費者:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * topicMessages消費者
 */
@Component
@RabbitListener(queues = "topicMessages")
public class TopMessagesReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("topMessagesReceiver:" +msg);
    }
}

    測試controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 測試類
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    @RequestMapping("/user")
    public String user() {
        User user=new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }

    @RequestMapping("/topMessage")
    public String topMessage() {
        helloSender1.testTopPicMessage();
        helloSender2.testTopPicMessage();
        return "ok";
    }
}

    運行項目,訪問http:localhost:8080/topMessage :

    sendTopPicMessage1:sendTopPicMessage
    sendTopPicMessages1:sendTopPicMessages

    sendTopPicMessage2:sendTopPicMessage
    sendTopPicMessages2:sendTopPicMessages

    topMessageReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessages

    topMessageReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessages

    經過exchange發送的每條消息,全部的消費者都能收到。

須要注意:

    rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合,所以兩個消費者都收到消息
    rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合,只有topMessages符合接受消息的條件

6.FanoutExchange的使用

    啓動類新增聲明三個Queue,用於測試:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
    /***************************************隊列***********************************************/
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }

    @Bean
    public Queue topicMessage() {
        return new Queue("topicMessage");
    }

    @Bean
    public Queue topicMessages() {
        return new Queue("topicMessages");
    }

    @Bean
    public Queue fanoutA() {
        return new Queue("fanoutA");
    }

    @Bean
    public Queue fanoutB() {
        return new Queue("fanoutB");
    }

    @Bean
    public Queue fanoutC() {
        return new Queue("fanoutC");
    }
    /***************************************exchange***********************************************/
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /***************************************將隊列和exchange綁定***********************************************/

    /**
     * 將隊列topicMessage與topicExchange綁定,
     * 只有欄目名爲topic.Message才能匹配,
     * 獲得當前的Queue
     * @param topicMessage
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
    }

    /**
     * 將隊列topicMessages與topicExchange綁定,
     * 以topic開頭的欄目名均會模糊匹配,
     * 獲得當前的Queue
     * @param topicMessages
     * @param topicExchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
    }

    /**
     * 將隊列fanoutA與fanoutExchange綁定
     *
     * @param fanoutA
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutA).to(fanoutExchange);
    }

    /**
     * 將隊列fanoutA與fanoutExchange綁定
     *
     * @param fanoutB
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutB).to(fanoutExchange);
    }

    /**
     * 將隊列fanoutA與fanoutExchange綁定
     *
     * @param fanoutC
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutC).to(fanoutExchange);
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

    生產者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description:
 * 生產者1
 */
@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user){
        System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage1:" + msg);
        //第一個參數:指定了exchange
        //第二個參數:指定了接受消息的欄目名
        //第三個參數:消息內容
        //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages1:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }

    public void testFanoutMessage(){
        String sendMsg = "sendFanoutMessage";
        System.out.println("fanout Sender1:" + sendMsg);
        //第二個參數不會進行正則表達式的過濾
        //可是必需要填,才能根據exchange找到相關Queue
        rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
    }
}

    生產者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description: 生產者2
 */
@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello2 " + new Date();
        System.out.println("Sender2:" + sendMsg);
        rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

    public void sendUser(User user) {
        System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
        rabbitTemplate.convertAndSend("helloQueue", user);
    }

    public void testTopPicMessage() {
        String msg = "sendTopPicMessage";
        System.out.println("sendTopPicMessage2:" + msg);
        //第一個參數:指定了exchange
        //第二個參數:指定了接受消息的欄目名
        //第三個參數:消息內容
        //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息
        rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合

        msg = "sendTopPicMessages";
        System.out.println("sendTopPicMessages2:" + msg);
        rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
    }

    public void testFanoutMessage(){
        String sendMsg = "sendFanoutMessage";
        System.out.println("fanout Sender2:" + sendMsg);
        //第二個參數不會進行正則表達式的過濾
        //可是必需要填,才能根據exchange找到相關Queue
        rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
    }
}

    fanoutA消費者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutA消費者
 */
@Component
@RabbitListener(queues = "fanoutA")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA:" + msg);
    }

}

    fanoutB消費者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutB消費者
 */
@Component
@RabbitListener(queues = "fanoutB")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB:" + msg);
    }

}

    fanoutC消費者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description:
 * fanoutC消費者
 */
@Component
@RabbitListener(queues = "fanoutC")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC:" + msg);
    }

}

  測試controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 測試類
 */
@RestController
public class RabbitController {

    @Autowired
    private Sender1 helloSender1;

    @Autowired
    private Sender2 helloSender2;

    @RequestMapping("/hello")
    public String hello() {
        helloSender1.send();
        helloSender2.send();
        return "ok";
    }

    @RequestMapping("/user")
    public String user() {
        User user=new User();
        user.setUserName("a");
        user.setPassword("1");
        user.setSex("m");
        user.setLevel("1");
        helloSender1.sendUser(user);
        helloSender2.sendUser(user);
        return "ok";
    }

    @RequestMapping("/topMessage")
    public String topMessage() {
        helloSender1.testTopPicMessage();
        helloSender2.testTopPicMessage();
        return "ok";
    }

    @RequestMapping("/fanoutMessage")
    public String fanoutMessage() {
        helloSender1.testFanoutMessage();
        helloSender2.testFanoutMessage();
        return "ok";
    }
}

    運行項目,訪問http:localhost:8080/fanoutMessage :

    fanout Sender1:sendFanoutMessage
    fanout Sender2:sendFanoutMessage
   
    FanoutReceiverA:sendFanoutMessage
    FanoutReceiverB:sendFanoutMessage
    FanoutReceiverC:sendFanoutMessage

    FanoutReceiverA:sendFanoutMessage
    FanoutReceiverB:sendFanoutMessage
    FanoutReceiverC:sendFanoutMessage

    經過exchange發送的每條消息,全部的消費者都能收到。

相關文章
相關標籤/搜索