SpringBoot整合RabbitMQ

1.pom.xml添加依賴html

<!--RabbitMq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
View Code

 

2.application.ymlspring

spring:
  application:
    admin: springboot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
View Code

 

3.添加配置RabbitMqConfig springboot

@Configuration
public class RabbitMqConfig {
    @Autowired
    public ConnectionFactory connectionFactory;

    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        return admin;
    }
    @Bean
    public Queue kinsonQueue(){//建立隊列
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 40000);
        return new Queue("queue1",true,false,false,arguments);
    }
    @Bean
    public FanoutExchange defaultExchange(){//建立交換機
        return new FanoutExchange("exchange-1");
    }
    @Bean
    public Binding binQueueExchange(){//隊列綁定交換機
        return BindingBuilder.bind(kinsonQueue()).to(defaultExchange());
    }

}
View Code

 

4.添加生產者app

@Component
public class RabbitSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final Logger log = LoggerFactory.getLogger(this.getClass());
    //回調函數: confirm確認
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("correlationData: " + correlationData);
            log.info("ack: " + ack);
            if (!ack) {
                log.info("異常處理....");
            }
        }
    };

    //回調函數: return返回
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
                                    String replyText, String exchange, String routingKey) {
            log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
                    exchange, routingKey, replyCode, replyText);
        }
    };

    //發送消息方法調用: 構建Message消息
    public void send(String queue, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message<Object> msg = MessageBuilder.createMessage("queue1", mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend(queue, msg);
    }
}
View Code

 

5.添加消費者ide

@Component
public class RabbitReceiver {
    @RabbitListener(queues = "queue1")
    @RabbitHandler
    public void onMessage(Message message, Channel channel,@Payload String string) throws Exception {
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
        System.err.println("消費端: " +message);
    }
}
View Code

 

6.測試函數

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private RabbitSender rabbitSender;
    
    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    @Test
    public void testSender1() throws Exception {
        Map<String, Object> properties = new HashMap<>();
        properties.put("number", "12345");
        properties.put("send_time", simpleDateFormat.format(new Date()));
        rabbitSender.send("queue1", properties);
    }
}
View Code

 

 

RabbitMQ介紹:spring-boot

1.交換機:測試

Direct Exchange(直連交換機)

處理路由鍵ui

須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列
綁定到該交換機上要求路由鍵 「 dog」,則只有被標記爲 「 的消息才被轉發,不會轉發 dog.puppy ,也不會轉發dog.guard ,只會轉發 dog 。this

 

Fanout Exchange(扇型交換機)

不處理路由鍵

你只須要將隊列綁定到交換機上, 發送消息 到交換機都會被轉發到與該交換機綁定的全部隊列上。

 

Topic Exchange(主題交換機)

將路由鍵和某模式進行匹配

此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。

 

Headers Exchanges(頭交換機)

不處理路由鍵

而是根據發送的消息內容中的headers屬性進行匹配。在綁定Queue與Exchange時指定一組鍵值對;當消息發送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配;若是徹底匹配則消息會路由到該隊列,不然不會路由到該隊列。headers屬性是一個鍵值對,能夠是Hashtable,鍵值對的值能夠是任何類型。而fanout,direct,topic 的路由鍵都須要要字符串形式的。

 

Default Exchange(默認交換機 )

默認交換機其實是一個由RabbitMQ預先聲明好的名字爲空字符串的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。

 

Dead Letter Exchange(死信交換機)

在默認狀況,若是消息在投遞到交換機時,交換機發現此消息沒有匹配的隊列,則這個消息將被丟棄。爲了解決這個問題,RabbitMQ中有一種交換機叫死信交換機。當消費者不能處理接收到的消息時,將這個消息從新發布到另一個隊列中,等待重試或者人工干預。這個過程當中的exchange和queue就是所謂的」Dead Letter Exchange 和 Queue」

 

交換機的屬性:

Name:交換機名稱
Durability:是否持久化。若是持久性,則RabbitMQ重啓後,交換機還存在
Auto-delete:當全部與之綁定的消息隊列都完成了對此交換機的使用後刪掉它
Arguments:擴展參數

 

 

2.六種消息類型

官網 http://www.rabbitmq.com/getstarted.html

 

3.RabbitMQ配置屬性

參考資料: https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html

 基礎信息

spring.rabbitmq.host: 默認localhost
spring.rabbitmq.port: 默認5672
spring.rabbitmq.username: 用戶名
spring.rabbitmq.password: 密碼
spring.rabbitmq.virtual-host: 鏈接到代理時用的虛擬主機
spring.rabbitmq.addresses: 鏈接到server的地址列表(以逗號分隔),先addresses後host 
spring.rabbitmq.requested-heartbeat: 請求心跳超時時間,0爲不指定,若是不指定時間單位默認爲妙
spring.rabbitmq.publisher-confirms: 是否啓用【發佈確認】,默認false
spring.rabbitmq.publisher-returns: 是否啓用【發佈返回】,默認false
spring.rabbitmq.connection-timeout: 鏈接超時時間,單位毫秒,0表示永不超時 
相關文章
相關標籤/搜索