RabbitMQ 與 SpringBoot2.X 整合

  • publisher-confirms ,實現一個監聽器用於監聽 broker 端給咱們返回的確認請求: RabbitTemplate.ConfirmCallbackjava

  • publisher-returns, 保證消息對 broker 端是可達的,若是出現路由鍵不可達的狀況,則使用監聽器對不可達的消息進行後續處理,保證消息的路由成功: RabbitTemplate.ReturnCallbackgit

  • 注意一點,在發送消息的時候對 template 進行配置 mandatory = true 保證監聽有效。在生產端還能夠配置其餘屬性,好比發送重試、超時時間、次數、間隔等github

代碼實現:

消費端代碼地址:https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-springboot/rabbitmq-springboot-consumer 項目下web

生產端代碼地址:https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-springboot/rabbitmq-springboot-product 項目下spring

通用依賴:

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
複製代碼
<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.6</version>
        </dependency>
複製代碼

消費端:

核心配置在配置文件裏面springboot

  • 首先配置手工確認簽收模式,用於ACK 的手工處理,這樣咱們能夠保證消息的可靠性送達,或者在消費端消費失敗的時候能夠作到重回隊列(不建議重回隊列)、根據業務記錄日誌等處理。bash

  • 設置消費端的監聽個數和最大個數,用於控制消費端的併發狀況併發

spring.rabbitmq.addresses=192.168.0.7:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
複製代碼

主配置:dom

@Configuration
@ComponentScan({"com.hmily.*"})
public class MainConfig {

}
複製代碼

消費端的監聽 RabbitListener 這個註解很好用!!!ide

RabbitListener 是一個組合註解,裏面能夠註解配置 。 @QueueBinding @Queue @Exchange 直接經過這個組合註解一次性搞定消費端交換機、隊列、綁定、路由、而且配置監聽功能等。

Message 使用的是  org.springframework.messaging.Message
複製代碼
@Slf4j
@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1",
                    durable="true"),
            exchange = @Exchange(value = "exchange-1",
                    durable="true",
                    type= "topic",
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        log.info("--------------------------------------");
        log.info("消費端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
複製代碼

生產端

生產端的核心配置

配置文件:

spring.rabbitmq.addresses=192.168.0.7:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
# return 的時候表明消息不可達,設置 broker 不自動刪除該消息,
# 而是返回到生產端,讓咱們進行一些後續的處理
spring.rabbitmq.template.mandatory=true
複製代碼

spring.rabbitmq.template.mandatory=true 的意思是: return 的時候表明消息不可達,設置 broker 不自動刪除該消息,而是返回到生產端,讓咱們進行一些後續的處理

主配置類:

@Configuration
@ComponentScan({"com.hmily.*"})
public class MainConfig {

}
複製代碼

消費端發送代碼

@Slf4j
@Component
public class RabbitSender {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	//回調函數: confirm確認
	final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			log.info("correlationData: " + correlationData);
			log.info("ack: " + ack);
			if(!ack){
				log.info("異常處理....");
			}
		}
	};
	
    //回調函數: return返回
	final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
				String replyText, String exchange, String routingKey) {
			log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
					exchange, routingKey, replyCode, replyText);
		}
	};
	
	//發送消息方法調用: 構建Message消息
	public void send(Object message, Map<String, Object> properties) throws Exception {
		MessageHeaders mhs = new MessageHeaders(properties);
		Message<Object> msg = MessageBuilder.createMessage(message, mhs);
		rabbitTemplate.setConfirmCallback(confirmCallback);
		rabbitTemplate.setReturnCallback(returnCallback);
		//id + 時間戳 全局惟一
        String id = UUID.randomUUID().toString();
        log.info("id: {}", id);
		CorrelationData correlationData = new CorrelationData(id);
		rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
	}
}
複製代碼

寫個單元測試用例

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqSpringbootProductApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Autowired
	private RabbitSender rabbitSender;

	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
	
	@Test
	public void testSender1() throws Exception {
		 Map<String, Object> properties = new HashMap<>();
		 properties.put("number", "12345");
		 properties.put("send_time", simpleDateFormat.format(new Date()));
		 rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
	}
}
複製代碼

運行單元測試

看消費端的日誌
修改一下發送時的 routingkey ,模擬發送失敗
就進入 returnCallback

發送一個 Java 實體

先在消費端 聲明一些隊列、交換機、routingKey 的配置

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
複製代碼

消費端:

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable="${spring.rabbitmq.listener.order.exchange.durable}",
                    type= "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        log.info("--------------------------------------");
        log.info("消費端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
複製代碼
這裏面有個特別須要注意的地方,Payload 裏面的路徑要跟 生產端的實體路徑徹底一致,要否則會找到不到這個類的!!!
我這裏爲了簡便就不寫一個 common.jar 了,在實際開發裏面,這個 java bean 應該放在 common.jar  裏面
複製代碼

注意實體要實現 Serializable 序列化接口,要否則發送消息會失敗的!!

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {

    private String id;
    private String name;


}
複製代碼

生產端 照樣跟着寫一個發消息的方法

//發送消息方法調用: 構建自定義對象消息
	public void sendOrder(Order order) throws Exception {
		rabbitTemplate.setConfirmCallback(confirmCallback);
		rabbitTemplate.setReturnCallback(returnCallback);
		//id + 時間戳 全局惟一
        String id = UUID.randomUUID().toString();
        log.info("sendOrder id: {}", id);
		CorrelationData correlationData = new CorrelationData(id);
		rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
	}
複製代碼

寫單元測試

@Test
	public void testSender2() throws Exception {
		 Order order = new Order("001", "第一個訂單");
		 rabbitSender.sendOrder(order);
	}
複製代碼

運行單元測試

驗證 Java 實體消息發送成功

相關文章
相關標籤/搜索