RabbitMQ安裝請參照RabbitMQ應用html
不囉嗦直接上代碼java
目錄結構以下:node
pom.xml
web
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>RabbitMQ_MQTT</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>RabbitMQ_MQTT</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.6.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- <dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.12</version> </dependency> --> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> </configuration> </plugin> </plugins> </build> </project>
application.properties
spring
servier.port=8080 spring.rabbitmq.queues=topic.1,mqtt.test.*,mqtt.test.dd spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
Application.javaapache
package com.gm; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.gm.rabbit.CallBackSender; @Configuration @RestController @EnableAutoConfiguration @ComponentScan @SpringBootApplication public class Application { @Autowired private CallBackSender sender; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @RequestMapping("/callback") public void callbak() { sender.send("topic.baqgl.admin.1", "測試消息"); } }
RabbitConfig.javaapp
package com.gm.rabbit; import java.util.ArrayList; import java.util.List; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; @Value("${spring.rabbitmq.queues}") private String queues; final static String EXCHANGE_NAME = "amq.topic"; final static String QUEUE_NAME = "topic.baqgl.*.*"; final static String ROUTING_KEY = "topic.baqgl.#"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); /** 若是要進行消息回調,則這裏必需要設置爲true */ connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } @Bean /** 由於要設置回調類,因此應是prototype類型,若是是singleton類型,則回調類爲最後一次設置 */ @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY); } @Bean public SimpleMessageListenerContainer messageContainer() { /*Queue[] q = new Queue[queues.split(",").length]; for (int i = 0; i < queues.split(",").length; i++) { q[i] = new Queue(queues.split(",")[i]); }*/ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { try { System.out.println( "消費端接收到消息:" + message.getMessageProperties() + ":" + new String(message.getBody())); System.out.println("topic:"+message.getMessageProperties().getReceivedRoutingKey()); // deliveryTag是消息傳送的次數,我這裏是爲了讓消息隊列的第一個消息到達的時候拋出異常,處理異常讓消息從新回到隊列,而後再次拋出異常,處理異常拒絕讓消息重回隊列 /*if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2) { throw new Exception(); }*/ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只確認當前一個消息收到,true確認全部consumer得到的消息 } catch (Exception e) { e.printStackTrace(); if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重複處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息 } else { System.out.println("消息即將再次返回隊列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue爲是否從新回到隊列 } } } }); return container; } }
CallBackSender.javadom
package com.gm.rabbit; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class CallBackSender implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String topic, String message) { rabbitTemplate.setConfirmCallback(this); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); System.out.println("消息id:" + correlationData.getId()); //用RabbitMQ發送MQTT需將exchange配置爲amq.topic this.rabbitTemplate.convertAndSend("amq.topic", topic, message, correlationData); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("消息id:" + correlationData.getId()); if (ack) { System.out.println("消息發送確認成功"); } else { System.out.println("消息發送確認失敗:" + cause); } } }
ApplicationTests.javamaven
package com.gm; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { System.out.println("hello world"); } }
TopicTest.javatcp
package com.gm.rabbit; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class TopicTest { @Autowired private CallBackSender sender; @Test public void topic() throws Exception { sender.send("topic.baqgl.admin.1", "測試消息"); } }
建立帳號 rabbitmqctl add_user admin 123456 設置用戶角色 rabbitmqctl set_user_tags admin administrator 設置用戶權限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 設置完成後能夠查看當前用戶和角色(須要開啓服務) rabbitmqctl list_users
安裝插件:
rabbitmq-plugins enable rabbitmq_management rabbitmq-plugins enable rabbitmq_mqtt
默認配置。window下,rabbitmq的配置文件在C:\Users\Administrator\AppData\Roaming\RabbitMQ下。沒配置的狀況下,採用以下配置:
[{rabbit, [{tcp_listeners, [5672]}]}, {rabbitmq_mqtt, [{default_user, <<"admin">>}, {default_pass, <<"123456">>}, {allow_anonymous, true}, {vhost, <<"/">>}, {exchange, <<"amq.topic">>}, {subscription_ttl, 1800000}, {prefetch, 10}, {ssl_listeners, []}, %% Default MQTT with TLS port is 8883 %% {ssl_listeners, [8883]} {tcp_listeners, [1883]}, {tcp_listen_options, [{backlog, 128}, {nodelay, true}]}]} ].
更多說明請參照官方文檔:http://www.rabbitmq.com/mqtt.html