Springboot 整合 MQTT

一、簡述

  MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發佈/訂閱(publish/subscribe)模式的「輕量級」通信協議,該協議構建於TCP/IP協議上,由IBM在1999年發佈。MQTT最大優勢在於,能夠以極少的代碼和有限的帶寬,爲鏈接遠程設備提供實時可靠的消息服務。做爲一種低開銷、低帶寬佔用的即時通信協議,使其在物聯網、小型設備、移動應用等方面有較普遍的應用。 MQTT是一個基於客戶端-服務器的消息發佈/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特色使它適用範圍很是普遍。在不少狀況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。其在,經過衛星鏈路通訊傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已普遍使用。java

二、建立springboot工程

三、添加依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
</dependency>

 

四、MqttInboundProperties

inBound配置屬性web

package com.chen.config;

import lombok.Data;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@Data
public class MqttInboundProperties {
    private String url;
    private String username;
    private String password;
    private String clientId;
    private String topics;
}

五、MqttOutboundProperties

outBound配置屬性

package com.chen.config;

import lombok.Getter;
import lombok.Setter;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@Setter
@Getter
public class MqttOutboundProperties {
    private String urls;
    private String username;
    private String password;
    private String clientId;
    private String topic;
}

六、MqttProperties

配置類

package com.chen.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@ConfigurationProperties(prefix = "com.mqtt")
public class MqttProperties {
    private MqttInboundProperties inbound;
    private MqttOutboundProperties outbound;
    public MqttInboundProperties getInbound() {
        return inbound;
    }
    public void setInbound(MqttInboundProperties inbound) {
        this.inbound = inbound;
    }
    public MqttOutboundProperties getOutbound() {
        return outbound;
    }
    public void setOutbound(MqttOutboundProperties outbound) {
        this.outbound = outbound;
    }
}

七、MqttInboundConfiguration

消息接收處理類

package com.chen.mqtt;

import com.chen.config.MqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@Configuration
@Slf4j
public class MqttInboundConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageProducer inbound(MqttPahoClientFactory mqttPahoClientFactory) {
        String[] inboundTopics = mqttProperties.getInbound().getTopics().split(",");
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getUrl(), mqttProperties.getInbound().getClientId(),
                        mqttPahoClientFactory,inboundTopics);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                log.info("收到消息:"+(String) message.getPayload());
            }

        };
    }
}

八、MqttOutboundConfiguration

消息發送配置類

package com.chen.mqtt;

import com.chen.config.MqttProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@Configuration
public class MqttOutboundConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        String[] serverURIs = mqttProperties.getOutbound().getUrls().split(",");
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
//        factory.setServerURIs("tcp://192.168.10.100:1883", "tcp://host2:1883");
        factory.setServerURIs(serverURIs);
        factory.setCleanSession(false);
        factory.setUserName(mqttProperties.getOutbound().getUsername());
        factory.setPassword(mqttProperties.getOutbound().getPassword());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

九、MqttGateway

消息發送service,能夠直接調用來發送消息

package com.chen.service;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    void sendToMqtt(String data);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

十、http接口觸發mqtt消息分發,  MessageController

package com.chen.controller;

import com.chen.service.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: ChenJie
 * @date 2018/8/21
 */
@RestController
public class MessageController {

    @Autowired
    MqttGateway mqttGateway;

    @RequestMapping(value="/sendMsg")
    public String sendMsg(@RequestParam String message){

        mqttGateway.sendToMqtt("defualt-topic:"+message);
        return "success";
    }
}

十一、springboot啓動類

@SpringBootApplication
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttSpringbootApplication {

	public static void main(String[] args) {
		SpringApplication.run(MqttSpringbootApplication.class, args);
	}
}
相關文章
相關標籤/搜索