demo目錄java
貼代碼web
1.ProducerConfig.javaspring
package com.test.config; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by admin on 2017/6/1 13:23. */ @Configuration public class ProducerConfig { @Bean public RabbitMessagingTemplate msgMessageTemplate(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); //參數列表分別是:1.交換器名稱(default.topic 爲默認值),2.是否長期有效,3.若是服務器在再也不使用時自動刪除交換器
TopicExchange exchange = new TopicExchange("default.topic", true, false); rabbitAdmin.declareExchange(exchange); //1.隊列名稱,2.聲明一個持久隊列,3.聲明一個獨立隊列,4.若是服務器在再也不使用時自動刪除隊列
Queue queue = new Queue("test.demo.send", true, false, false); rabbitAdmin.declareQueue(queue); //1.queue:綁定的隊列,2.exchange:綁定到那個交換器,3.test2.send:綁定的路由名稱
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("test2.send")); return RabbitUtil.simpleMessageTemplate(connectionFactory); } }
2.RabbitMQConfig.java服務器
package com.test.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by admin on 2017/6/1 11:26. */ @Configuration public class RabbitMQConfig { /** * 注入配置文件屬性 */ @Value("${spring.rabbitmq.addresses}") String addresses;//MQ地址
@Value("${spring.rabbitmq.username}") String username;//MQ登陸名
@Value("${spring.rabbitmq.password}") String password;//MQ登陸密碼
@Value("${spring.rabbitmq.virtual-host}") String vHost;//MQ的虛擬主機名
/** * 建立 ConnectionFactory * * @return * @throws Exception */ @Bean public ConnectionFactory connectionFactory() throws Exception { return RabbitUtil.connectionFactory(addresses, username, password, vHost); } /** * 建立 RabbitAdmin * * @param connectionFactory * @return * @throws Exception */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) throws Exception { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } }
3.RabbitUtil.javaapp
package com.test.config; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.messaging.converter.GenericMessageConverter; /** * RabbitMQ 公共類 * Created by admin on 2017/6/1 11:25. */
public class RabbitUtil { /** * 初始化 ConnectionFactory * * @param addresses * @param username * @param password * @param vHost * @return * @throws Exception */
public static ConnectionFactory connectionFactory(String addresses, String username, String password, String vHost) throws Exception { CachingConnectionFactory factoryBean = new CachingConnectionFactory(); factoryBean.setVirtualHost(vHost); factoryBean.setAddresses(addresses); factoryBean.setUsername(username); factoryBean.setPassword(password); return factoryBean; } /** * 初始化 RabbitMessagingTemplate * * @param connectionFactory * @return
*/
public static RabbitMessagingTemplate simpleMessageTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate(); rabbitMessagingTemplate.setMessageConverter(new GenericMessageConverter()); rabbitMessagingTemplate.setRabbitTemplate(template); return rabbitMessagingTemplate; } }
4.Student.javamaven
package com.test.model; import java.io.Serializable; /** * Created by admin on 2017/6/1 13:36. */
public class Student implements Serializable { private String name; private Integer age; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
5.Consumers.javaspring-boot
package com.test.task; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; /** * Created by admin on 2017/6/1 13:29. */ @Service public class Consumers { @RabbitListener( //1.rabbitAdmin:RabbitAdmin名稱
admin = "rabbitAdmin", bindings = @QueueBinding( //1.test.demo.send:隊列名,2.true:是否長期有效,3.false:是否自動刪除
value = @Queue(value = "test.demo.send", durable = "true", autoDelete = "false"), //1.default.topic交換器名稱(默認值),2.true:是否長期有效,3.topic:類型是topic
exchange = @Exchange(value = "default.topic", durable = "true", type = "topic"), //test2.send:路由的名稱,ProducerConfig 裏面 綁定的路由名稱(xxxx.to(exchange).with("test2.send")))
key = "test2.send") ) public void test(Object obj) { System.out.println("receive...."); System.out.println("obj:" + obj.toString()); } }
6.Producers.javagradle
package com.test.task; import com.test.model.Student; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * Created by admin on 2017/6/1 13:35. */ @Service public class Producers { @Autowired RabbitMessagingTemplate rabbitSendTemplate; public void send(Student student) { System.out.println("send start....."); rabbitSendTemplate.convertAndSend("default.topic", "test2.send", student); } }
7.TestController.javaui
package com.test.test; import com.test.model.Student; import com.test.task.Producers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; /** * Created by admin on 2017/6/1 13:38. */ @Controller @RequestMapping(value = "/test") public class TestController { @Autowired Producers producers; @RequestMapping(value = "/send", method = RequestMethod.GET) @ResponseBody public void test() { Student s = new Student(); s.setName("zhangsan"); s.setAddress("wuhan"); s.setAge(20); producers.send(s); } }
8.MainApplication.javathis
package com.test; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Created by admin on 2017/6/1 11:19. */ @SpringBootApplication public class MainApplication { public static void main(String[] args) { System.getProperties().put("test", "test"); SpringApplication.run(MainApplication.class, args); } }
9.application.yml
server: address: 192.168.200.117 #本身主機的IP地址 port: 8000 #端口 spring: rabbitmq: addresses: 192.168.200.119:5672 #MQ IP 和 端口 username: admin #MQ登陸名 password: 123456 #MQ登陸密碼 virtual-host: test #MQ的虛擬主機名稱
10.build.gradle
group 'rabbitmqtest' version '1.0-SNAPSHOT' apply plugin: 'java' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { testCompile group: 'junit', name: 'junit', version: '4.11' testCompile("org.springframework.boot:spring-boot-starter-test:1.3.5.RELEASE") compile("org.springframework.boot:spring-boot-starter-web:1.3.5.RELEASE") compile(group: 'org.springframework.amqp', name: 'spring-rabbit', version: "1.6.1.RELEASE") }
11.settings.gradle
rootProject.name = 'rabbitmqtest'
頁面訪問 192.168.200.117:8000/test/send 能夠看到控制檯有日誌輸出,發送的消息當即消費掉了
MQ的隊列裏面也是空的
若是把消費者的代碼注掉,再訪問剛纔的 url 地址 隊列裏面就會多一條