RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。java
一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。web
一.下載並安裝spring
安裝RabbitMQ以前,要先去下載安裝Erlang。瀏覽器
下載RabbitMQ並安裝,安裝過程直接默認下一步。安裝好後建立用戶綁定。app
使用瀏覽器打開 http://localhost:15672 訪問Rabbit Mq的管理控制檯,使用剛纔建立的帳號登錄系統異步
二.Spring Boot 集成 RabbitMQsocket
1.配置pomspring-boot
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.application.properties配置文件oop
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=xpc spring.rabbitmq.password=123456
3.配置隊列測試
@Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } }
4.生產者
@Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
5.消費者
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
6.測試
@RunWith(SpringRunner.class) @SpringBootTest public class HelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }
準備見證奇蹟,然而,這時候出現了問題報錯以下:
2019-03-20 21:08:44.976 INFO 12948 --- [ main] com.neo.rabbitmq.HelloTest : Starting HelloTest on DESKTOP-6P3OGVF with PID 12948 (started by 79235 in D:\workspace\spring-boot-rabbitmq) 2019-03-20 21:08:44.977 INFO 12948 --- [ main] com.neo.rabbitmq.HelloTest : No active profile set, falling back to default profiles: default 2019-03-20 21:08:45.306 INFO 12948 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$a50a910] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-03-20 21:08:45.781 INFO 12948 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672] 2019-03-20 21:08:45.816 WARN 12948 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset) 2019-03-20 21:08:45.816 INFO 12948 --- [ main] o.s.a.r.l.SimpleMessageListenerContainer : Broker not available; cannot force queue declarations during start 2019-03-20 21:08:45.820 INFO 12948 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672] 2019-03-20 21:08:45.827 ERROR 12948 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured java.net.SocketException: socket closed at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_131] at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_131] at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_131] at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_131] at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_131] at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_131] at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_131] at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.4.3.jar:5.4.3] at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-5.4.3.jar:5.4.3] at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:596) ~[amqp-client-5.4.3.jar:5.4.3] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
後來發現市由於以前建立的用戶沒有受權,打開RabbitMQ web管理界面點擊admin——》set permission就能夠了
再次測試結果
:: Spring Boot :: (v2.1.0.RELEASE) 2019-03-20 21:12:44.699 INFO 11472 --- [ main] com.neo.rabbitmq.HelloTest : Starting HelloTest on DESKTOP-6P3OGVF with PID 11472 (started by 79235 in D:\workspace\spring-boot-rabbitmq) 2019-03-20 21:12:44.700 INFO 11472 --- [ main] com.neo.rabbitmq.HelloTest : No active profile set, falling back to default profiles: default 2019-03-20 21:12:45.024 INFO 11472 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$a50a910] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-03-20 21:12:45.479 INFO 11472 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672] 2019-03-20 21:12:45.523 INFO 11472 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#24d4d7c9:0/SimpleConnection@302fec27 [delegate=amqp://xpc@127.0.0.1:5672/, localPort= 64890] 2019-03-20 21:12:45.615 INFO 11472 --- [ main] com.neo.rabbitmq.HelloTest : Started HelloTest in 1.062 seconds (JVM running for 1.634) Sender : hello Wed Mar 20 21:12:45 CST 2019 2019-03-20 21:12:45.788 INFO 11472 --- [ Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. Receiver : hello Wed Mar 20 21:12:45 CST 2019 2019-03-20 21:12:46.600 INFO 11472 --- [ Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.