spring使用@bean聲明exchange queue binding 例子以下:java
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //隊列持久
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
複製代碼
如下是RabbitTemplate實例化的例子:spring
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(//);
rabbitTemplate.setReturnCallback(//);
return rabbitTemplate;
}
複製代碼
例子:springboot
@Test
public void testSendMessage() throws Exception {
//1 建立消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定義消息類型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加額外的設置---------");
message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性");
return message;
}
});
}
複製代碼
簡單消息容器 功能:架構
屬性併發
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
}
複製代碼
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate);
//設定queue使用哪一個adapter方法處理
Map<String,String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001","method1");
queueOrTagToMethodName.put("queue002","method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
//設置默認處理方法,默認處理方法是handleMessage
adapter.setDefaultListenerMethod("handleMessage");
//設置消息轉換方式
adapter.setMessageConverter(new TextMessageConvert());
//消息監聽
container.setMessageListener(adapter);
複製代碼
自定義經常使用轉換器 都須要實現 MessageConverter接口app
Json convert: Jackson2JsonMessageConverter 能夠進行java對象的轉換功能 DefaultJackson2JavaTypeMapper映射器: 能夠進行java對象的映射關係 二進制 convert: 如Image、PDF、PPT、stream 框架
@EnableRabbit和@Configuration一塊兒使用,能夠加在類或者方法上,這個註解開啓了容器對註冊的bean的@RabbitListener檢查。dom
@RabbitListener用於註冊Listener時使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。ide
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue}", durable = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "${mq.config.key}"), admin = "rabbitAdmin")
複製代碼
掃描到bean帶有該註解後,首先會將註解的內容封裝到Endpoint對象中並和ListenerContainerFactory的實例一塊兒添加到上面的RabbitListenerEndpointRegistry實例中。添加的時候會建立相應的ListenerContainer實例並添加Listener對象。函數
@RabbitListener 和 @RabbitHandler結合使用,不一樣類型的消息使用不一樣的方法來處理。
public class CommandListener{
@RabbitHandler
public void handler1(ApiMessage msg){
System.out.println(msg);
}
@RabbitHandler
public void handler2(Map msg){
System.out.println(msg);
}
}
複製代碼
//回調函數: confirm確認 message ->send —> broker -> ack
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("異常處理....");
}
}
};
//回調函數: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
//發送消息方法調用: 構建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 時間戳 全局惟一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
複製代碼
/** * *spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* * @param order * @param channel * @param headers * @throws Exception */
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
複製代碼
特色: Spring cloud stream 生產端和消費端可使用不一樣的中間件
總體架構核心概念圖:
Barista
Barista接口: Barista接口是定義來做爲後面類的參數. 這一接口定義了通道類型和通道名稱,通道名稱是做爲配置用,通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。
使用Spring Cloud Stream很是簡單,使用好這3個註解便可,在實現高性能消息的生產和消費場景很是適合,可是使用SpringCloudStream框架有一個很是大的問題就是不能實現可靠性的投遞,也就是沒法保證消息的100%可靠性,會存在少許消息丟失的問題。緣由是爲了兼容Kafka。