目前RabbitMQ是AMQP 0-9-1(高級消息隊列協議)的一個實現,使用Erlang語言編寫,利用了Erlang的分佈式特性。java
代碼地址spring
AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概以下:apache
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。 exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。服務器
Exchange路由消息的集中類型:app
名稱框架 |
默認的預先定義exchange名字maven |
做用描述分佈式 |
Direct exchangeide |
(Empty string) and amq.direct測試 |
根據Binding指定的Routing Key,將符合Key的消息發送到Binding的Queue |
Fanout exchange |
amq.fanout |
將同一個message發送到全部同該Exchange bingding的queue |
Topic exchange |
amq.topic |
根據Binding指定的Routing Key,Exchange對key進行模式匹配後路由到相應的Queue,模式匹配時符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。 |
Headers exchange |
amq.match (and amq.headers in RabbitMQ) |
同direct exchange相似,不一樣之處是再也不使用Routing Key路由,而是使用headers(message attributes)進行匹配路由到指定Queue。 |
本文基於Spring rabbitmq搭建基礎框架,
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mq</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- <dependency> <groupId>com.rabbitmq</groupId> <artifactId>rabbitmq-client</artifactId> <version>1.3.0</version> </dependency> --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.4.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- Junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> </project>
1,鏈接rabbitmq
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="admin" password="admin" port="5672" virtual-host="/" /> <!-- 鏈接配置 --> <rabbit:admin connection-factory="connectionFactory" />
2,聲明
<!-- spring template聲明 --> <rabbit:template exchange="test-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="messageConverter" /> <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
3,隊列
<!-- 標準的創建Queue的參數 --> <rabbit:queue-arguments id="amqpQueueArguments"> <!-- 暫時沒有 --> </rabbit:queue-arguments> <!-- queue --> <rabbit:queue id="test_queue_key" name="test_queue_key" queue-arguments="amqpQueueArguments" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="test_queue_key2" name="test_queue_key2" queue-arguments="amqpQueueArguments" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="test_queue_key3" name="test_queue_key3" queue-arguments="amqpQueueArguments" durable="true" auto-delete="false" exclusive="false" /> <!-- durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 -->
4,交換器
<!-- exchange --> <rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="test_queue_key2" key="test_queue_key2" /> <rabbit:binding queue="test_queue_key" key="test_queue_key" /> <rabbit:binding queue="test_queue_key3" key="test_queue_key3" /> </rabbit:bindings> </rabbit:direct-exchange>
5,監聽消費者
<bean name="Consummer" class="com.mq.rabbitmq.Consummer" /> <bean name="Consummer2" class="com.mq.rabbitmq.Consummer" /> <bean name="Consummer3" class="com.mq.rabbitmq.Consummer" /> <!-- 配置監聽 消費者 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 監聽隊列,多個用逗號分隔 ref 監聽器 --> <rabbit:listener queues="test_queue_key2" ref="Consummer2" /> <rabbit:listener queues="test_queue_key" ref="Consummer" /> <rabbit:listener queues="test_queue_key3" ref="Consummer3" /> </rabbit:listener-container>
6,Spring包掃描
<context:component-scan base-package="com.mq.rabbitmq"></context:component-scan>
完整的applicationcontext.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <beans default-lazy-init="false" xmlns:p="http://www.springframework.org/schema/p" xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:util="http://www.springframework.org/schema/util" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd "> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="admin" password="admin" port="5672" virtual-host="/" /> <!-- 鏈接配置 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- spring template聲明 --> <rabbit:template exchange="test-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="messageConverter" /> <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" /> <!-- 標準的創建Queue的參數 --> <rabbit:queue-arguments id="amqpQueueArguments"> <!-- 暫時沒有 --> </rabbit:queue-arguments> <!-- queue --> <rabbit:queue id="test_queue_key" name="test_queue_key" queue-arguments="amqpQueueArguments" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="test_queue_key2" name="test_queue_key2" queue-arguments="amqpQueueArguments" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="test_queue_key3" name="test_queue_key3" queue-arguments="amqpQueueArguments" durable="true" auto-delete="false" exclusive="false" /> <!-- durable:是否持久化 exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除 auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 --> <!-- exchange --> <rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="test_queue_key2" key="test_queue_key2" /> <rabbit:binding queue="test_queue_key" key="test_queue_key" /> <rabbit:binding queue="test_queue_key3" key="test_queue_key3" /> </rabbit:bindings> </rabbit:direct-exchange> <bean name="Consummer" class="com.mq.rabbitmq.Consummer" /> <bean name="Consummer2" class="com.mq.rabbitmq.Consummer" /> <bean name="Consummer3" class="com.mq.rabbitmq.Consummer" /> <!-- 配置監聽 消費者 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 監聽隊列,多個用逗號分隔 ref 監聽器 --> <rabbit:listener queues="test_queue_key2" ref="Consummer2" /> <rabbit:listener queues="test_queue_key" ref="Consummer" /> <rabbit:listener queues="test_queue_key3" ref="Consummer3" /> </rabbit:listener-container> <context:component-scan base-package="com.mq.rabbitmq"></context:component-scan> </beans>
發送消息接口
package com.mq.rabbitmq; public interface MqInt { public void sendDataToQueue(String queueKey, Object object); }
接口實現類
package com.mq.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MqImpl implements MqInt { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendDataToQueue(String queueKey, Object object) { amqpTemplate.convertAndSend(queueKey, object); } }
package com.mq.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; public class Consummer implements MessageListener{ @Autowired private AmqpTemplate amqpTemplate; public int count =1; @Override public void onMessage(Message message) { System.out.println(count+"-----------"); count++; System.out.println(message); } }
package test; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.mq.rabbitmq.MqImpl; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationcontext.xml" }) public class TestQueue { @Autowired MqImpl mqImpl; final String queue_key = "test_queue_key"; final String queue_key2 = "test_queue_key2"; final String queue_key3 = "test_queue_key3"; @Test public void send() { Map<String, String> msg = new HashMap<String, String>(); msg.put("data", "hello,rabbmitmq!"); System.out.println("--+amqpTemplate"); for (int i = 0; i < 200; i++) { mqImpl.sendDataToQueue(queue_key, msg); } System.out.println("------------------"); } }
能夠登陸localhost:5672/看交換器和queue的效果