Spring Boot 監聽 Activemq 中的特定 topic ,並將數據經過 RabbitMq 發佈出去

一、Spring Boot 和 ActiveMQ 、RabbitMQ 簡介html

  最近由於公司的項目須要用到 Spring Boot , 因此自學了一下, 發現它與 Spring 相比,最大的優勢就是減小了配置, 看不到 xml 文件的配置, 而是用 appplication.yml 或者 application.propertites 文件來代替 , 不再用配置 tomcat 環境了, 由於 spring boot 已經將 tomcat 環境整合到裏面了。入門能夠去 http://spring.io 官網, 上面有一系列介紹 。java

  本次項目開發中還用到了 ActiveMQ 和 RabbitMQ , 這是兩個消息隊列,我直到完成模塊都不能真正理解消息隊列。 關於消息隊列的定義和使用場景這篇博客寫得十分清楚:git

https://blog.csdn.net/KingCat666/article/details/78660535,幾個不一樣的消息隊列之間的比較 : http://www.javashuo.com/article/p-fcxqowqg-kh.html。我負責的任務是 Spring Boot 監聽 ActiveMQ 中特定的 topic,並將消息使用 RabbitMq 發佈出去。github

 

二、配置環境web

  2.1 ·使用 maven 構建 Spring Boot 運行環境, 在 pom.xml 文件中加入以下依賴:spring

<properties>
<project.build.sourceEncoding>UTF8
</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- Springboot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.0.7.RELEASE</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>2.0.7.RELEASE</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-maven-plugin -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.3.7.RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

  2.2 下載並安裝配置 active mq 和 rabbitmq 的運行環境apache

    activemq下載地址以下 : http://activemq.apache.org/download-archives.htmltomcat

rabbitmq 是使用 erlang 寫的, 因此先安裝 erlang 環境, 再安裝 rabbitmq-server, 如今我將這三個文件整合到了一塊兒, 方便下載 : 服務器

連接: https://pan.baidu.com/s/1qdzMpqFwxR78rW7-ABpbCA  提取碼: 7aqf 。下載完成之後, 其中比較複雜的是安裝 erlang ,安裝完之後新建 ERLAGN_HOME 添加到環境變量。                                  將 %ERLANG_HOME%\bin 添加到 path,而後安裝 rabbit-server.exe, 安裝完之後在進入 rabbit-server\sbin 目錄下, 進入命令行,輸入 rabbitmq-plugins enable rabbitmq_management 完成安裝,
app

打開 sbin 目錄,雙擊rabbitmq-server.bat , 啓動成功以後訪問 http://localhost:15672,默認帳號密碼都屬 guest 。

將下載的 activemq 解壓到某個目錄下,進入該目錄輸入 cmd ,敲擊 bin\activemq start , 有可能會報錯,具體錯誤查看 data\activemq.log 文件。環境搭建成功之後, 開始幹!

三、構建項目

  三、1 新建配置文件:

    新建 application.yml 文件,輸入:

com:
  mqtt:
    inbound:
      url: tcp://127.0.0.1:1883
      clientId: familyServerIn
      topics: hello,topic
    outbound:
      urls: tcp://127.0.0.1:1883
      clientId: familyServerOut
      topic: topic1
      
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    virtualHost: /
  listener:
    concurrency: 2
    max-concurrency: 2
  main:
    web-application-type: none
  mqtt:
    username: admin
#MQTT-密碼
    password: admin
#MQTT-服務器鏈接地址,若是有多個,用逗號隔開,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
    url: tcp://127.0.0.1:1883
#MQTT-鏈接服務器默認客戶端ID
    client:
      id: mqttId
#MQTT-默認的消息推送主題,實際可在調用接口時指定
    default:
      topic: topic
#鏈接超時
    completionTimeout: 3000

  3.2 新建配置類 MQttSenderConfig.java

在這裏主要配置了 connectionFactory 和 channelFactory , 值得注意的是在方法 handler() 裏面經過監聽信道 mqttOutboundChannel 得到了 topic 並將其轉發給 RabbitMQ 隊列中, topicSender.send(message.getPayload().toString()); 這一行代碼將消息發送到 RabbitMQ 隊列中 、/*

/**
 * 〈一句話功能簡述〉<br> 
 * 〈MQTT發送消息配置〉
 *
 * @author root
 * @create 2018/12/20
 * @since 1.0.0
 */
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
 
    @Value("${spring.mqtt.username}")
    private String username;
 
    @Value("${spring.mqtt.password}")
    private String password;
 
    @Value("${spring.mqtt.url}")
    private String hostUrl;
 
    @Value("${spring.mqtt.client.id}")
    private String clientId;
 
    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;
 
    @Value("${spring.mqtt.completionTimeout}")
    private int completionTimeout ;   //鏈接超時
    
    @Autowired
    private TopicSender topicSender;
    
    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }
  
    //mqttOutboundChannel
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }
    
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    
  //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    
  //配置client,監聽的topic 
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                        "topic","hello");
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttOutboundChannel());
        return adapter;
    }
 
    //經過通道獲取數據
  
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
                if("hello".equalsIgnoreCase(topic)){
                    System.out.println("hello,fuckXX," + message.getPayload().toString());
                    topicSender.send(message.getPayload().toString());
                }else if("topic".equalsIgnoreCase(topic)){
                    System.out.println("topic,fuckXX," + message.getPayload().toString());
                    topicSender.send(message.getPayload().toString());
                }
            }
        };
    }

}

  3.2 新建配置類 RabbitConfig.java

配置了兩個隊列 rabbittopic 和 rabbittopic.queue2 , 申明瞭消息交換器 topicExchange, 經過 key 來綁定, 關於 key 和 路由綁定參考這篇文章 : https://www.jianshu.com/p/04f443dcd8bd 。


@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
    
    //聲明隊列
    @Bean
    public Queue queue1() {
        return new Queue("rabbitopic", true); // true表示持久化該隊列
    }
    
    @Bean
    public Queue queue2() {
        return new Queue("rabbitopic.queue2", true);
    }
    
    //聲明交互器
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    //綁定
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
    }
    
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
    }
   
    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    //queue listener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // factory.setPrefetchCount(5);//指定一個請求能處理多少個消息,若是有事務的話,必須大於等於transaction數量.
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //MANUAL:將ACK修改成手動確認,避免消息在處理過程當中發生異常形成被誤認爲已經成功消費的假象。
        //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }
}

  3.3 新建MqttGateway.java

  新建 MqttGateWay 接口,設置默認的信道 。

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}

餘下代碼就再也不一一往上貼了 : 具體 demo:https://github.com/blench/mqtt.git

 

四、遇到的錯誤及解決辦法

    一、發送數據後 rabbitmq  一直在接收數據,緣由是監聽 RabbitMQ 隊列消息的方法寫錯了, 例如:

 

 @RabbitListener(queues = "rabbitopic")
    public void processMessage1(String msg) {
//        Message message = rabbitTemplate.receive(10000);
        System.out.println(" 接收到來自rabbitopic隊列的消息:" + msg);
        return;
    }

 

接收監聽的方法不能有返回值, 只能爲 void .

  二、配置錯誤, 中途有一次啓動失敗,是因爲代碼的配置問題。

最後啓動項目, 在 active mq 中新建 topic 和 hello 主題 , 添加測試內容發送。 控制檯下可打印出相應的消息 。 

 

五、總結

  雖然此次匆匆忙忙寫完了代碼,可是對於 RabbitMQ 和 ActiveMQ 只是有了初步的瞭解, 將來的工做中還會繼續學習的 。

參考文檔:

https://www.jianshu.com/p/6ca34345b796

https://www.jianshu.com/p/db8391dc1f63

http://blog.sina.com.cn/s/blog_7479f7990100zwkp.html

相關文章
相關標籤/搜索