https://github.com/401Studio/WeekLearn/issues/2java
RabbitMQ 即一個消息隊列,_主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。_RabbitMQ使用的是AMQP協議,它是一種二進制協議。默認啓動端口 5672。git
在 RabbitMQ 中,以下圖結構:github
那麼,_其中比較重要的概念有 4 個,分別爲:虛擬主機,交換機,隊列,和綁定。_spring
rabbitmq的message model實際上消息不直接發送到queue中,中間有一個exchange是作消息分發,producer甚至不知道消息發送到那個隊列中去。所以,當exchange收到message時,必須準確知道該如何分發。是append到必定規則的queue,仍是append到多個queue中,仍是被丟棄?這些規則都是經過exchagne的4種type去定義的。安全
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.springboot
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.服務器
exchange是一個消息的agent,每個虛擬的host中都有定義。它的職責是把message路由到不一樣的queue中。app
exchange和queue經過routing-key關聯,這二者之間的關係是就是binding。以下圖所示,X表示交換機,紅色表示隊列,交換機經過一個routing-key去binding一個queue,routing-key有什麼做用呢?看Direct exchange類型交換機。less
路由鍵exchange,該交換機收到消息後會把消息發送到指定routing-key的queue中。那消息交換機是怎麼知道的呢?其實,producer deliver消息的時候會把routing-key add到 message header中。routing-key只是一個messgae的attribute。異步
A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.
Default Exchange
這種是特殊的Direct Exchange,是rabbitmq內部默認的一個交換機。該交換機的name是空字符串,全部queue都默認binding 到該交換機上。全部binding到該交換機上的queue,routing-key都和queue的name同樣。
通配符交換機,exchange會把消息發送到一個或者多個知足通配符規則的routing-key
的queue。其中_表號匹配一個word,#匹配多個word和路徑,路徑之間經過.隔開。如知足a._.c的routing-key有a.hello.c;知足#.hello的routing-key有a.b.c.helo。
扇形交換機,該交換機會把消息發送到全部binding到該交換機上的queue。這種是publisher/subcribe模式。用來作廣播最好。
全部該exchagne上指定的routing-key都會被ignore掉。
The fanout copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. Keys provided will simply be ignored.
設置header attribute參數類型的交換機。
安裝就不說了,建議按照官方文檔上作。先貼代碼,稍後解釋,代碼以下:
配置 交換機,隊列,交換機與隊列的綁定,消息監視容器:
@Configuration @Data public class RabbitMQConfig { final static String queueName = "spring-boot"; @Bean Queue queue() { return new Queue(queueName, false); } @Bean TopicExchange exchange() { return new TopicExchange("spring-boot-exchange"); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(queueName); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Receiver receiver() { return new Receiver(); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }
配置接收信息者(即消費者):
public class Receiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; } }
配置發送信息者(即生產者):
@RestController public class Test { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET) public String test(@PathVariable(value = "abc") String abc){ rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!"); return "abc"; } }
以上即可實現一個簡單的 RabbitMQ Demo,具體代碼在:點這裏
那麼,這裏,分爲三個部分分析:發消息,交換機隊列,收消息。
rabbitTemplate.convertAndSend("spring-boot", xxx);
便可發送信息。TopicExchange
,配置隊列 Queue
,而且配置他們之間的綁定 Binding
container.setMessageListener(listenerAdapter);
其中,MessageListenerAdapter 能夠看作是 咱們接收者的一個包裝類,new MessageListenerAdapter(receiver, "receiveMessage");
指明瞭若是有消息來,那麼調用接收者哪一個方法進行處理。spring xml方式實現RabbitMQ簡單,可讀性較好,配置簡單,配置和實現以下所示。
上文已經講述了rabbitmq的配置,xml方式經過properites文件存放用戶配置信息:
mq.host=127.0.0.1 mq.username=guest mq.password=guest mq.port=5672
配置application-mq.xml配置文件,聲明鏈接、交換機、queue以及consumer監聽。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 鏈接服務配置</description> <!-- 鏈接配置 --> <context:property-placeholder location="classpath:mq.properties" /> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring template聲明--> <rabbit:template exchange="amqpExchange" id="amqpTemplate" connection-factory="connectionFactory" /> <!--申明queue--> <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" /> <!--申明exchange交換機並綁定queue--> <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange"> <rabbit:bindings> <rabbit:binding queue="test_queue_key" key="test_queue_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!--consumer配置監聽--> <bean id="reveiver" class="com.demo.mq.receive.Reveiver" /> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/> </rabbit:listener-container> </beans>
上述代碼中,引入properties文件就很少說了。
<rabbit:connection-factory>
標籤聲明建立connection的factory工廠。
<rabbit-template>
聲明spring template,和上文spring中使用template同樣。template可聲明exchange。
<rabbit:queue>
聲明一個queue並設置queue的配置項,直接看標籤屬性就能夠明白queue的配置項。
<rabbit:direct-exchange>
聲明交換機並綁定queue。
<rabbit:listener-container>
申明監聽container並配置consumer和監聽routing-key。
剩下就簡單了,application-context.xml中把rabbitmq配置import進去。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"> <context:component-scan base-package="com.demo.**" /> <import resource="application-mq.xml" /> </beans>
Producer實現,發送消息仍是使用template的convertAndSend() deliver消息。
@Service public class Producer { @Autowired private AmqpTemplate amqpTemplate; private final static Logger logger = LoggerFactory.getLogger(Producer.class); public void sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend(queueKey, object); } catch (Exception e) { e.printStackTrace(); logger.error("exeception={}",e); } } }
配置consumer
package com.demo.mq.receive; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; @Service public class Reveiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("reveice msg=" + message.toString()); latch.countDown(); } }
測試deliver消息
Controller @RequestMapping("/demo/") public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); @Resource private Producer producer; @RequestMapping("/test/{msg}") public String send(@PathVariable("msg") String msg){ logger.info("#TestController.send#abc={msg}", msg); System.out.println("msg="+msg); producer.sendDataToQueue("test_queue_key",msg); return "index"; } }
在生產環境中,因爲 Spring 對 RabbitMQ 提供了一些方便的註解,因此首先可使用這些註解。例如:
具體這些註解的使用,能夠參考這裏的代碼:點這裏
首先,生產環境下的 RabbitMQ 可能不會在生產者或者消費者本機上,因此須要從新定義 ConnectionFactory,即:
@Bean ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); return connectionFactory; }
這裏,能夠從新設置須要鏈接的 RabbitMQ 的 ip,端口,虛擬主機,用戶名,密碼。
而後,能夠先從生產端考慮,生產端須要鏈接 RabbitMQ,那麼能夠經過 RabbitTemplate 進行鏈接。 Ps:(RabbitTemplate 用於生產端發送消息到交換機中),以下代碼:
@Bean(name="myTemplate") RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(integrationEventMessageConverter()); template.setExchange(exchangeName); return template; }
在該代碼中,new RabbitTemplate(connectionFactory);
設置了生產端鏈接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
設置了 生產端發送給交換機的消息是以什麼格式的,在 integrationEventMessageConverter()
代碼中:
public MessageConverter integrationEventMessageConverter() { Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; }
如上 Jackson2JsonMessageConverter
指明瞭 JSON。上述代碼的最後 template.setExchange(exchangeName);
指明瞭 要把生產者要把消息發送到哪一個交換機上。
有了上述,那麼,咱們便可使用 rabbitTemplate.convertAndSend("spring-boot", xxx);
發送消息,xxx 表示任意類型,由於上述的設置會幫咱們把這些類型轉化成 JSON 傳輸。
接着,生產端發送咱們說過了,那麼如今能夠看看消費端:
對於消費端,咱們能夠只建立 SimpleRabbitListenerContainerFactory
,它可以幫咱們生成 RabbitListenerContainer,而後咱們再使用 @rabbitlistener 指定接收者收到信息時處理的方法。
@Bean(name="myListenContainer") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setMessageConverter(integrationEventMessageConverter()); factory.setConnectionFactory(connectionFactory()); return factory; }
這其中 factory.setMessageConverter(integrationEventMessageConverter());
指定了咱們接受消息的時候,以 JSON 傳輸的消息能夠轉換成對應的類型傳入到方法中。例如:
@Slf4j @Component @RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot") public class Receiver { @RabbitHandler public void receiveTeacher(Teacher teacher) { log.info("##### = {}",teacher); } }
可能出現的問題:
在生產環境中,咱們須要考慮萬一輩子產者掛了,消費者掛了,或者 rabbitmq 掛了怎麼樣。通常來講,若是生產者掛了或者消費者掛了,實際上是沒有影響,由於消息就在隊列裏面。那麼萬一 rabbitmq 掛了,以前在隊列裏面的消息怎麼辦,其實能夠作消息持久化,RabbitMQ 會把信息保存在磁盤上。
作法是能夠先從 Connection 對象中拿到一個 Channel 信道對象,而後再能夠經過該對象設置 消息持久化。
這裏 Spring 有自動重連機制。
每一個Consumer可能須要一段時間才能處理完收到的數據。若是在這個過程當中,Consumer出錯了,異常退出了,而數據尚未處理完成,那麼 很是不幸,這段數據就丟失了。由於咱們採用no-ack的方式進行確認,也就是說,每次Consumer接到數據後,而不論是否處理完 成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。
若是一個Consumer異常退出了,它處理的數據可以被另外的Consumer處理,這樣數據在這種狀況下就不會丟失了(注意是這種狀況下)。
爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。
在處理數據後發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠去安全的刪除它了。
若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的狀況下數據也不會丟失。