在訂閱模式消息通信中,須要配置交換機,交換機有三種模式:fanout廣播、direct定向、topic通配符。咱們這裏使用topic通配符模式,這種在企業應用中最爲普遍。java
1. 建立消息生產者項目:spring_rabbitmq_producerspring
2. 添加依賴 json
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency>
3.編寫spring整合rabbitmq配置文件:spring-rabbitmq-producer.xml app
1 <!-- 1. 配置鏈接 --> 2 <rabbit:connection-factory 3 id="connectionFactory" 4 host="127.0.0.1" 5 port="5672" 6 username="pomelo" 7 password="pomelo" 8 virtual-host="/pomelo" 9 /> 10 <!-- 2. 配置隊列 --> 11 <rabbit:queue name="myQueue"/> 12 <!-- 3.配置rabbitAdmin --> 13 <rabbit:admin connection-factory="connectionFactory"/> 14 <!-- 4. 配置topic類型exchange;隊列綁定到交換機 --> 15 <rabbit:topic-exchange name="myExchange"> 16 <rabbit:bindings> 17 <rabbit:binding queue="myQueue" pattern="msg.#"></rabbit:binding> 18 </rabbit:bindings> 19 </rabbit:topic-exchange> 20 <!-- 5. 配置消息對象json轉換類 --> 21 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> 22 <!-- 6. 配置RabbitTemplate --> 23 <rabbit:template 24 id="rabbitTemplate" 25 connection-factory="connectionFactory" 26 exchange="myExchange" 27 message-converter="jsonMessageConverter" 28 />
4. 發消息測試ide
package com.pomelo.producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; public class Send { public static void main(String[] args) { //建立spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml"); //從容器中獲取對象 RabbitTemplate template = context.getBean(RabbitTemplate.class); // 發送消息 Map<String,String> map = new HashMap<>(); map.put("email","550731230@qq.com"); template.convertAndSend("msg.email",map); context.close(); } }
1.建立消費者項目:spring_rabbitmq_consumer測試
2.添加依賴spa
<dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
3.編寫消息監聽器code
package com.pomelo.listener; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.IOException; @Component public class EmailMessageListener implements MessageListener { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void onMessage(Message message) { try { JsonNode jsonNode = MAPPER.readTree(message.getBody()); String email = jsonNode.get("email").asText(); System.out.println("獲取隊列中消息:" + email); } catch (IOException e) { e.printStackTrace(); } } }
4.編寫spring整合rabbitmq配置文件:spring-rabbitmq-consumer.xmlcomponent
<!-- 1. springIOC註解掃描--> <context:component-scan base-package="com.pomelo.listener"/> <!-- 2. 配置鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="pomelo" password="pomelo" virtual-host="/pomelo" /> <!-- 3. 配置隊列名 --> <rabbit:queue name="myQueue"/> <!-- 4.配置rabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 5.配置監聽 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="emailMessageListener" queue-names="myQueue" /> </rabbit:listener-container>
5.運行項目xml
package com.pomelo.consumer; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.io.IOException; public class Consumer { public static void main(String[] args) throws IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq-consumer.xml"); System.in.read(); } }