Spring AMQP 是基於 Spring 框架的AMQP消息解決方案,提供模板化的發送和接收消息的抽象層,提供基於消息驅動的 POJO的消息監聽等,很大方便咱們使用RabbitMQ程序的相關開發。java
Spring AMQP包含一些模塊,如:spring-amqp, spring-rabbit and spring-erlang等,每一個模塊分別由獨立的一些Jar包組成.spring
Spring AMQP模塊主要包含org.springframework.amqp.core這個包中。這個包定義的相關類主要是與前面講的AMQP模型相對應。Spring AMQP的目的是提供不依賴於任何特定的AMQP代理實現或客戶端庫通用的抽象。最終用戶代碼將很容易實現更易替換、添加和刪除AMQP,由於它能夠只針對抽象層來開發。這能夠很方便咱們選擇和使用哪個具體的broker實現,如sping-rabbit實現。express
Spring AMQP定義了Message類, 它定義了一個更通常的AMQP域模型,它是Spring AMQP的很重要的一部分,Message消息是當前模型中所操縱的基本單位,它由Producer產生,通過Broker被Consumer所消費。它是生產者和消費者發送和處理的對象。Message封裝了body及MessageProperties,很方便了API的調用,它的定義以下:緩存
public class Message { private final MessageProperties messageProperties; private final byte[] body; public Message(byte[] body, MessageProperties messageProperties) { this.body = body; this.messageProperties = messageProperties; } public byte[] getBody() { return this.body; } public MessageProperties getMessageProperties() { return this.messageProperties; } }
MessageProperties 定義了多種經常使用的屬性,如:messageId', 'timestamp', 'contentType'等等。app
Exchange接口表明了AMQP Exchange,它是消息發送的地方。在虛擬主機的消息協商器(Broker)中,每一個Exchange都有惟一的名字。框架
Exchange包含4種類型:Direct, Topic, Fanout, Headers。不一樣的類型,他們如何處理綁定到隊列方面的行爲會有所不一樣。異步
1)Direct類型: 容許一個隊列經過一個固定的Routing-key(一般是隊列的名字)進行綁定。 Direct交換器將消息根據其routing-key屬性投遞到包含對應key屬性的綁定器上。ide
2)Topic類型: 支持消息的Routing-key用*或#的模式,進行綁定。*匹配一個單詞,#匹配0個或者多個單詞。例如,binding key *.user.# 匹配routing key爲 usd.user和eur.user.db,可是不匹配user.hello。函數
3)Fanout類型:它只是將消息廣播到全部綁定到它的隊列中,而不考慮routing key的值。
4)Header類型: 它根據應用程序消息的特定屬性進行匹配,這些消息可能在binding key中標記爲可選或者必選
注意:AMQP規範要求,任何的消息協商器(broker)須要提供一個沒有名稱的"default" Direct Exchange。
它的相關定義以下:
public interface Exchange { String getName(); String getExchangeType(); boolean isDurable(); boolean isAutoDelete(); Map getArguments(); }
Queue,隊列,它表明了Message Consumer接收消息的地方,它用來保存消息直到發送給消費者。Queue有如下一些重要的屬性。
1)持久性:若是啓用,隊列將會在消息協商器(Broker)重啓前都有效。
2)自動刪除:若是啓用,那麼隊列將會在全部的消費者中止使用以後自動刪除掉自身。
3)惰性:若是沒有聲明隊列,那麼在執行到使用的時候會致使異常,並不會主動聲明。
4)排他性:若是啓用,隊列只能被聲明它的消費者使用。
public class Queue { private final String name; private volatile boolean durable; private volatile boolean exclusive; private volatile boolean autoDelete; private volatile Map arguments; public Queue(String name) { this(name, true, false, false); } // Getters and Setters omitted for brevity
生產者發送消息到Exchange,接收者從Queue接收消息,而綁定(Binging)是生產者和消費者消息傳遞的重要鏈接,它是鏈接生產者和消費者進行信息交流的關鍵。
Binging實例自己僅僅是表明持有鏈接的數據信息。不過它能夠被AmqpAdmin這個類用來實際觸發broker上的綁定操做。同時它能夠在程序啓動時,簡化 Queues, Exchanges, and Bindings的定義及一些操做。
一、下面咱們看一下綁定Queue到Exchange的一些基本選項及例子。
你能夠用一個固定的RoutingKey將Queue綁定到DirectExchange
例如:new Binding(someQueue, someDirectExchange, "foo.bar")
二、也能夠用*、#模式匹配的RoutingKey方式,綁定到TopicExchange。
例如: new Binding(someQueue, someTopicExchange, "foo.*")
三、綁定到一個廣播的FanoutExchange,它沒有RoutingKey
例如: new Binding(someQueue, someFanoutExchange)
四、固然你也能夠用fluent API的方式。
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
Spring AMQP中鏈接和資源的管理,尤爲是spring-rabbit這個模塊,由於spring-rabbit是 RabbitMQ 的惟一支持的實現。
在spring-rabbit中,管理消息協商器(broker)鏈接的核心組件是ConnectionFactory這個接口。 ConnectionFactory提供了
org.springframework.amqp.rabbit.connection.Connection(com.rabbitmq.client.Connection的包裝類)實例的鏈接與管理。而CachingConnectionFactory是ConnectionFactory的在Spring AMQP中惟一實現,它建立一個鏈接代理,使程序能夠共享的鏈接。
Connection 提供一個createChannel的方法。CachingConnectionFactory 的實現能支持channels的緩存,而且能根據區分是事務性或非事務性各自獨立。同時,CachingConnectionFactory也提供hostname的構造函數,而且能夠設置username、password、setChannelCacheSize等方法。CachingConnectionFactory 默認channel cache 大小爲1,若是想改變能夠用setChannelCacheSize設置channel cache size的大小。
@Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory( environment.getProperty("rabbitmq.host"), environment.getProperty("rabbitmq.port", Integer.class) ); factory.setUsername(environment.getProperty("rabbitmq.username")); factory.setPassword(environment.getProperty("rabbitmq.password")); return factory; } Connection connection = factory.createConnection();
從spring-rabbit 1.3版本開始,AbstractRoutingConnectionFactory 被引入進來,它提供了一個這樣的途徑來配置許多的不一樣的Connection Factory的映射,而且可以根據運行時的lookupKey(經過綁定線程上下文的方式) 來決定使用哪一個具體的Connection Factory。
爲了方便Spring AMQP提供了 AbstractRoutingConnectionFactory 的具體實現SimpleRoutingConnectionFactory。它是從SimpleResourceHolder中得到當前線程綁定的lookupKey。
Map factories = new HashMap<Object, org.springframework.amqp.rabbit.connection.ConnectionFactory>(2); factories.put("foo", connectionFactory1); factories.put("bar", connectionFactory2); AbstractRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory(); connectionFactory.setTargetConnectionFactories(factories); final RabbitTemplate template = new RabbitTemplate(connectionFactory); Expression expression = new SpelExpressionParser() .parseExpression("T(org.springframework.amqp.rabbit.core.RabbitTemplateTests)" + ".LOOKUP_KEY_COUNT.getAndIncrement() % 2 == 0 ? 'foo' : 'bar'");
Spring AMQP提供了一個發送和接收消息的操做模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一個實現。
RabbitTemplate支持消息的確認與返回,爲了返回消息,RabbitTemplate 須要設置mandatory 屬性爲true,而且CachingConnectionFactory 的publisherReturns屬性也須要設置爲true。返回的消息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,
一個RabbitTemplate僅能支持一個ReturnCallback 。
爲了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也須要設置爲true,確認的消息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.
利用AmqpTemplate模板來發送消息。
AmqpTemplate提供了以下的幾個方法來發送消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
發送時,須要指定Exchange和routingKey. 如:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",new Message("12.34".getBytes(), someProperties));
從Spring-Rabbit版本1.3開始,它提供了MessageBuilder 和 MessagePropertiesBuilder兩個類,這能夠方便咱們以fluent流式的方式建立Message及MessageProperties對象。
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
接收消息能夠有兩種方式。比較簡單的一種方式是,直接去查詢獲取消息,即調用receive方法,若是該方法沒有得到消息,receive方法不阻塞,直接返回null。另外一種方式是註冊一個Listener監聽器,一旦有消息到時來,異步接收。
注意: 接收消息,都須要從queue中得到,接收消息時,RabbitTemplate須要指定queue,或者設置默認的queue。
API提供的直接得到消息方法,以下
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
AmqpTemplate也提供了方法來直接接收POJOs對象(代替Message對象),同時也提供了各類的MessageConverter用來處理返回的Object對象。
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
AmqpTemplate 從Spring-Rabbit 1.3版本開始也提供了 receiveAndReply 方法來異步接收、處理及回覆消息。
AmqpTemplate 要注意receive和reply階段,大數時狀況下,你須要提供僅僅一個ReceiveAndReplyCallback 的實現,用它來處理接收到消息和回覆(對象)消息的業務邏輯。同時也要注意,ReceiveAndReplyCallback 可能返回null,在這種狀況下,receiveAndReply 就至關於receive 方法。
boolean receiveAndReply(ReceiveAndReplyCallback callback) throws AmqpException;
boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback) throws AmqpException;
boolean receiveAndReply(ReceiveAndReplyCallback callback, String replyExchange, String replyRoutingKey) throws AmqpException;
boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback,String replyExchange, String replyRoutingKey) throws AmqpException;
爲了異步接收消息,Spring AMQP也提供了多種不一樣的實現方式。如:經過實現MessageListener的方式,或者經過註解@RabbitListener的方式等,來實現異步接受消息及處理。
public interface MessageListener {
void onMessage(Message message);
}
若是你的程序須要依賴Channel,你須要用 ChannelAwareMessageListener這個接口。
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
若是你想將你的程序及Message API嚴格分開的話,能夠用MessageListenerAdapter這個適配器類
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
看一下Container 容器。通常地,容器是處於「主動」的責任,以使偵聽器回調能夠保持被動觸發處理。容器是生命週期的組件,它提供了開始和中止容器的方法。在配置容器時,咱們須要配置AMQP Queue和MessageListener的對應鏈接,須要配置ConnectionFactory 的一個引用以及Listener、以及可以從該Queue中接收消息的Queue的引用,這樣的Container才能通知到對應的Listener。
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
從RabbitMQ 的3.2版本開始, broker支持消費消息的priority 優先級。只須要在SimpleMessageListenerContainer 添加priority屬性的設置。
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
從spring-rabbit 1.4版本開始,可心利用註解的@RabbitListener來異步接收消息,它是更爲簡便的方式。
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) { ... }
}
在容器中,須要配置@EnableRabbit,來支持@RabbitListener起做用。
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
經過上面的配置,就配置好Listener和Container了,就能夠異步接收消息了。
AmqpTemplate 定義提供了各類發送和接收委拖給MessageConverter轉化對象消息的方法。MessageConverter 自己比較簡單,它提供了消息對象的轉化,可將object轉化成Message 對象,或者將Message 對象轉化成Object對象。它提供了默認的SimpleMessageConverter實現,以及第三方的MessageConverter,如Jackson2JsonMessageConverter,MarshallingMessageConverter等,來處理消息與對象之間的轉換。
SimpleMessageConverter是Spring AMQP中MessageConverter的一個默認實現。
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
爲了自定義轉化對象,你也能夠第三方的MessageConverter,如使用 Jackson2JsonMessageConverter 或者MarshallingMessageConverter,其中Jackson2JsonMessageConverter提供了Json對象的轉化,MarshallingMessageConverter對象則提供了對象的Marshaller和 Unmarshaller轉化。
從Spring-Rabbit 1.4.2開始,它提供了ContentTypeDelegatingMessageConverter,它能根據不一樣的MessageProperties屬性(contentType)決定來委託給具體的哪個MessageConverter。
import org.junit.Test; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class CodeTest { @Test public void testSendAndReceive() { //建立ConnectionFactory //注意: guest的用戶只可以在localhost 127.0.0.1進行測試 String hostname = "localhost"; String username = "mytest"; String password = "mytest"; String virtualHost = "/"; CachingConnectionFactory cf = new CachingConnectionFactory(hostname); cf.setUsername(username); cf.setPassword(password); cf.setVirtualHost(virtualHost); RabbitAdmin admin = new RabbitAdmin(cf); //建立Exchange String exchangeName = "direct.test.exchange"; DirectExchange exchange = new DirectExchange(exchangeName); admin.declareExchange(exchange); //建立Queue String queueName = "direct.test.queue"; Queue queue = new Queue(queueName, true, false, false, null); admin.declareQueue(queue); //建立Binding String routingKey = "direct.test.queue"; admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey)); //建立RabbitTemplate RabbitTemplate rabbitTemplate = new RabbitTemplate(cf); rabbitTemplate.setExchange(exchangeName); rabbitTemplate.setQueue(queueName); //建立Message String messageStr = "this is direct message"; Message message = MessageBuilder.withBody(messageStr.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); //根據routingKey發送消息 System.out.println("message=" + message); rabbitTemplate.send(routingKey, message); //接收消息 Message resultMessage = rabbitTemplate.receive(); System.out.println("resultMessage=" + resultMessage); if (resultMessage != null) { System.out.println("receive massage=" + new String(resultMessage.getBody())); } } }
異步接收消息實例
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="direct.test.exchange" queue="direct.test.queue"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:direct-exchange name="direct.test.exchange" durable="true">
<rabbit:bindings>
<rabbit:binding queue="direct.test.queue" key="direct.test.queue">rabbit:binding>
<rabbit:bindings>
<rabbit:direct-exchange/>
<rabbit:queue name="direct.test.queue" durable="true" auto-delete="false" exclusive="false"/>
<rabbit:connection-factory id="connectionFactory" username="mytest" password="mytest" host="127.0.0.1" port="5672"/>
<bean id="myMessageListener" class="com.company.rabbitmqpro.service.MyMessageListener">bean>
<bean id="simpleMessageListenerContainerFactory" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queueNames" >
<list>
<value>direct.test.queuevalue>
<list>
<property/>
<property name="messageListener" ref="myMessageListener">property>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
<bean/>
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class MyMessageListener implements MessageListener{ @Override public void onMessage(Message message) { System.out.println("received: " + message); } } import javax.annotation.Resource; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath*:applicationContext.xml" }) public class ListenerTest { @Resource private RabbitTemplate rabbitTemplate; @Test public void testSendAsynListener() { String sendMsg = "this is direct message"; Message message = MessageBuilder.withBody(sendMsg.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .build(); String routingKey = "direct.test.queue"; rabbitTemplate.send(routingKey, message); System.out.println("send ok"); } }